ctdb-daemon: Fix signed/unsigned comparisons by casting
[samba.git] / ctdb / server / ctdb_client.c
1 /*
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/time.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 /*
45   allocate a packet for use in client<->daemon communication
46  */
47 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
48                                             TALLOC_CTX *mem_ctx,
49                                             enum ctdb_operation operation,
50                                             size_t length, size_t slength,
51                                             const char *type)
52 {
53         int size;
54         struct ctdb_req_header *hdr;
55
56         length = MAX(length, slength);
57         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
58
59         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
60         if (hdr == NULL) {
61                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
62                          operation, (unsigned)length));
63                 return NULL;
64         }
65         talloc_set_name_const(hdr, type);
66         hdr->length       = length;
67         hdr->operation    = operation;
68         hdr->ctdb_magic   = CTDB_MAGIC;
69         hdr->ctdb_version = CTDB_PROTOCOL;
70         hdr->srcnode      = ctdb->pnn;
71         if (ctdb->vnn_map) {
72                 hdr->generation = ctdb->vnn_map->generation;
73         }
74
75         return hdr;
76 }
77
78 /*
79   local version of ctdb_call
80 */
81 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
82                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
83                     TDB_DATA *data, bool updatetdb)
84 {
85         struct ctdb_call_info *c;
86         struct ctdb_registered_call *fn;
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88
89         c = talloc_zero(mem_ctx, struct ctdb_call_info);
90         CTDB_NO_MEMORY(ctdb, c);
91
92         c->key = call->key;
93         c->call_data = &call->call_data;
94         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
95         c->record_data.dsize = data->dsize;
96         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
97         c->header = header;
98
99         for (fn=ctdb_db->calls;fn;fn=fn->next) {
100                 if (fn->id == (uint32_t)call->call_id) {
101                         break;
102                 }
103         }
104         if (fn == NULL) {
105                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
106                 talloc_free(c);
107                 return -1;
108         }
109
110         if (fn->fn(c) != 0) {
111                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
112                 talloc_free(c);
113                 return -1;
114         }
115
116         /* we need to force the record to be written out if this was a remote access */
117         if (c->new_data == NULL) {
118                 c->new_data = &c->record_data;
119         }
120
121         if (c->new_data && updatetdb) {
122                 /* XXX check that we always have the lock here? */
123                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
124                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
125                         talloc_free(c);
126                         return -1;
127                 }
128         }
129
130         if (c->reply_data) {
131                 call->reply_data = *c->reply_data;
132
133                 talloc_steal(call, call->reply_data.dptr);
134                 talloc_set_name_const(call->reply_data.dptr, __location__);
135         } else {
136                 call->reply_data.dptr = NULL;
137                 call->reply_data.dsize = 0;
138         }
139         call->status = c->status;
140
141         talloc_free(c);
142
143         return 0;
144 }
145
146
147 /*
148   queue a packet for sending from client to daemon
149 */
150 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
151 {
152         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
153 }
154
155
156 /*
157   called when a CTDB_REPLY_CALL packet comes in in the client
158
159   This packet comes in response to a CTDB_REQ_CALL request packet. It
160   contains any reply data from the call
161 */
162 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
163 {
164         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
165         struct ctdb_client_call_state *state;
166
167         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
168         if (state == NULL) {
169                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
170                 return;
171         }
172
173         if (hdr->reqid != state->reqid) {
174                 /* we found a record  but it was the wrong one */
175                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
176                 return;
177         }
178
179         state->call->reply_data.dptr = c->data;
180         state->call->reply_data.dsize = c->datalen;
181         state->call->status = c->status;
182
183         talloc_steal(state, c);
184
185         state->state = CTDB_CALL_DONE;
186
187         if (state->async.fn) {
188                 state->async.fn(state);
189         }
190 }
191
192 void ctdb_request_message(struct ctdb_context *ctdb,
193                           struct ctdb_req_header *hdr)
194 {
195         struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
196         TDB_DATA data;
197
198         data.dsize = c->datalen;
199         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
200         if (data.dptr == NULL) {
201                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
202                 return;
203         }
204
205         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
206 }
207
208 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
209
210 /*
211   this is called in the client, when data comes in from the daemon
212  */
213 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
214 {
215         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
216         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
217         TALLOC_CTX *tmp_ctx;
218
219         /* place the packet as a child of a tmp_ctx. We then use
220            talloc_free() below to free it. If any of the calls want
221            to keep it, then they will steal it somewhere else, and the
222            talloc_free() will be a no-op */
223         tmp_ctx = talloc_new(ctdb);
224         talloc_steal(tmp_ctx, hdr);
225
226         if (cnt == 0) {
227                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
228                 exit(1);
229         }
230
231         if (cnt < sizeof(*hdr)) {
232                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
233                 goto done;
234         }
235         if (cnt != hdr->length) {
236                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
237                                (unsigned)hdr->length, (unsigned)cnt);
238                 goto done;
239         }
240
241         if (hdr->ctdb_magic != CTDB_MAGIC) {
242                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
243                 goto done;
244         }
245
246         if (hdr->ctdb_version != CTDB_PROTOCOL) {
247                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
248                 goto done;
249         }
250
251         switch (hdr->operation) {
252         case CTDB_REPLY_CALL:
253                 ctdb_client_reply_call(ctdb, hdr);
254                 break;
255
256         case CTDB_REQ_MESSAGE:
257                 ctdb_request_message(ctdb, hdr);
258                 break;
259
260         case CTDB_REPLY_CONTROL:
261                 ctdb_client_reply_control(ctdb, hdr);
262                 break;
263
264         default:
265                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
266         }
267
268 done:
269         talloc_free(tmp_ctx);
270 }
271
272 /*
273   connect to a unix domain socket
274 */
275 int ctdb_socket_connect(struct ctdb_context *ctdb)
276 {
277         struct sockaddr_un addr;
278         int ret;
279
280         memset(&addr, 0, sizeof(addr));
281         addr.sun_family = AF_UNIX;
282         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
283
284         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285         if (ctdb->daemon.sd == -1) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
287                 return -1;
288         }
289
290         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
291                 DEBUG(DEBUG_ERR,
292                       (__location__
293                        "Failed to connect client socket to daemon (%s)\n",
294                        strerror(errno)));
295                 close(ctdb->daemon.sd);
296                 ctdb->daemon.sd = -1;
297                 return -1;
298         }
299
300         ret = set_blocking(ctdb->daemon.sd, false);
301         if (ret != 0) {
302                 DEBUG(DEBUG_ERR,
303                       (__location__
304                        " failed to set socket non-blocking (%s)\n",
305                        strerror(errno)));
306                 close(ctdb->daemon.sd);
307                 ctdb->daemon.sd = -1;
308                 return -1;
309         }
310
311         set_close_on_exec(ctdb->daemon.sd);
312
313         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
314                                               CTDB_DS_ALIGNMENT,
315                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
316         return 0;
317 }
318
319
320 struct ctdb_record_handle {
321         struct ctdb_db_context *ctdb_db;
322         TDB_DATA key;
323         TDB_DATA *data;
324         struct ctdb_ltdb_header header;
325 };
326
327
328 /*
329   make a recv call to the local ctdb daemon - called from client context
330
331   This is called when the program wants to wait for a ctdb_call to complete and get the
332   results. This call will block unless the call has already completed.
333 */
334 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
335 {
336         if (state == NULL) {
337                 return -1;
338         }
339
340         while (state->state < CTDB_CALL_DONE) {
341                 tevent_loop_once(state->ctdb_db->ctdb->ev);
342         }
343         if (state->state != CTDB_CALL_DONE) {
344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
345                 talloc_free(state);
346                 return -1;
347         }
348
349         if (state->call->reply_data.dsize) {
350                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
351                                                       state->call->reply_data.dptr,
352                                                       state->call->reply_data.dsize);
353                 call->reply_data.dsize = state->call->reply_data.dsize;
354         } else {
355                 call->reply_data.dptr = NULL;
356                 call->reply_data.dsize = 0;
357         }
358         call->status = state->call->status;
359         talloc_free(state);
360
361         return call->status;
362 }
363
364
365
366
367 /*
368   destroy a ctdb_call in client
369 */
370 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
371 {
372         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
373         return 0;
374 }
375
376 /*
377   construct an event driven local ctdb_call
378
379   this is used so that locally processed ctdb_call requests are processed
380   in an event driven manner
381 */
382 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
383                                                                   struct ctdb_call *call,
384                                                                   struct ctdb_ltdb_header *header,
385                                                                   TDB_DATA *data)
386 {
387         struct ctdb_client_call_state *state;
388         struct ctdb_context *ctdb = ctdb_db->ctdb;
389         int ret;
390
391         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
392         CTDB_NO_MEMORY_NULL(ctdb, state);
393         state->call = talloc_zero(state, struct ctdb_call);
394         CTDB_NO_MEMORY_NULL(ctdb, state->call);
395
396         talloc_steal(state, data->dptr);
397
398         state->state   = CTDB_CALL_DONE;
399         *(state->call) = *call;
400         state->ctdb_db = ctdb_db;
401
402         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
403         if (ret != 0) {
404                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
405         }
406
407         return state;
408 }
409
410 /*
411   make a ctdb call to the local daemon - async send. Called from client context.
412
413   This constructs a ctdb_call request and queues it for processing.
414   This call never blocks.
415 */
416 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
417                                               struct ctdb_call *call)
418 {
419         struct ctdb_client_call_state *state;
420         struct ctdb_context *ctdb = ctdb_db->ctdb;
421         struct ctdb_ltdb_header header;
422         TDB_DATA data;
423         int ret;
424         size_t len;
425         struct ctdb_req_call_old *c;
426
427         /* if the domain socket is not yet open, open it */
428         if (ctdb->daemon.sd==-1) {
429                 ctdb_socket_connect(ctdb);
430         }
431
432         ret = ctdb_ltdb_lock(ctdb_db, call->key);
433         if (ret != 0) {
434                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
435                 return NULL;
436         }
437
438         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
439
440         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
441                 ret = -1;
442         }
443
444         if (ret == 0 && header.dmaster == ctdb->pnn) {
445                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
446                 talloc_free(data.dptr);
447                 ctdb_ltdb_unlock(ctdb_db, call->key);
448                 return state;
449         }
450
451         ctdb_ltdb_unlock(ctdb_db, call->key);
452         talloc_free(data.dptr);
453
454         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
455         if (state == NULL) {
456                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
457                 return NULL;
458         }
459         state->call = talloc_zero(state, struct ctdb_call);
460         if (state->call == NULL) {
461                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
462                 return NULL;
463         }
464
465         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
466         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
467         if (c == NULL) {
468                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
469                 return NULL;
470         }
471
472         state->reqid     = reqid_new(ctdb->idr, state);
473         state->ctdb_db = ctdb_db;
474         talloc_set_destructor(state, ctdb_client_call_destructor);
475
476         c->hdr.reqid     = state->reqid;
477         c->flags         = call->flags;
478         c->db_id         = ctdb_db->db_id;
479         c->callid        = call->call_id;
480         c->hopcount      = 0;
481         c->keylen        = call->key.dsize;
482         c->calldatalen   = call->call_data.dsize;
483         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
484         memcpy(&c->data[call->key.dsize],
485                call->call_data.dptr, call->call_data.dsize);
486         *(state->call)              = *call;
487         state->call->call_data.dptr = &c->data[call->key.dsize];
488         state->call->key.dptr       = &c->data[0];
489
490         state->state  = CTDB_CALL_WAIT;
491
492
493         ctdb_client_queue_pkt(ctdb, &c->hdr);
494
495         return state;
496 }
497
498
499 /*
500   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
501 */
502 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
503 {
504         struct ctdb_client_call_state *state;
505
506         state = ctdb_call_send(ctdb_db, call);
507         return ctdb_call_recv(state, call);
508 }
509
510
511 /*
512   tell the daemon what messaging srvid we will use, and register the message
513   handler function in the client
514 */
515 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
516                                     srvid_handler_fn handler,
517                                     void *private_data)
518 {
519         int res;
520         int32_t status;
521
522         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
523                            CTDB_CONTROL_REGISTER_SRVID, 0,
524                            tdb_null, NULL, NULL, &status, NULL, NULL);
525         if (res != 0 || status != 0) {
526                 DEBUG(DEBUG_ERR,
527                       ("Failed to register srvid %llu\n",
528                        (unsigned long long)srvid));
529                 return -1;
530         }
531
532         /* also need to register the handler with our own ctdb structure */
533         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
534 }
535
536 /*
537   tell the daemon we no longer want a srvid
538 */
539 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
540                                        uint64_t srvid, void *private_data)
541 {
542         int res;
543         int32_t status;
544
545         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
546                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
547                            tdb_null, NULL, NULL, &status, NULL, NULL);
548         if (res != 0 || status != 0) {
549                 DEBUG(DEBUG_ERR,
550                       ("Failed to deregister srvid %llu\n",
551                        (unsigned long long)srvid));
552                 return -1;
553         }
554
555         /* also need to register the handler with our own ctdb structure */
556         srvid_deregister(ctdb->srv, srvid, private_data);
557         return 0;
558 }
559
560 /*
561   send a message - from client context
562  */
563 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
564                       uint64_t srvid, TDB_DATA data)
565 {
566         struct ctdb_req_message_old *r;
567         int len, res;
568
569         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
570         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
571                                len, struct ctdb_req_message_old);
572         CTDB_NO_MEMORY(ctdb, r);
573
574         r->hdr.destnode  = pnn;
575         r->srvid         = srvid;
576         r->datalen       = data.dsize;
577         memcpy(&r->data[0], data.dptr, data.dsize);
578
579         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
580         talloc_free(r);
581         return res;
582 }
583
584
585 /*
586    called when a control completes or timesout to invoke the callback
587    function the user provided
588 */
589 static void invoke_control_callback(struct tevent_context *ev,
590                                     struct tevent_timer *te,
591                                     struct timeval t, void *private_data)
592 {
593         struct ctdb_client_control_state *state;
594         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
595         int ret;
596
597         state = talloc_get_type(private_data, struct ctdb_client_control_state);
598         talloc_steal(tmp_ctx, state);
599
600         ret = ctdb_control_recv(state->ctdb, state, state,
601                         NULL,
602                         NULL,
603                         NULL);
604         if (ret != 0) {
605                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
606         }
607
608         talloc_free(tmp_ctx);
609 }
610
611 /*
612   called when a CTDB_REPLY_CONTROL packet comes in in the client
613
614   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
615   contains any reply data from the control
616 */
617 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
618                                       struct ctdb_req_header *hdr)
619 {
620         struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
621         struct ctdb_client_control_state *state;
622
623         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
624         if (state == NULL) {
625                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
626                 return;
627         }
628
629         if (hdr->reqid != state->reqid) {
630                 /* we found a record  but it was the wrong one */
631                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
632                 return;
633         }
634
635         state->outdata.dptr = c->data;
636         state->outdata.dsize = c->datalen;
637         state->status = c->status;
638         if (c->errorlen) {
639                 state->errormsg = talloc_strndup(state,
640                                                  (char *)&c->data[c->datalen],
641                                                  c->errorlen);
642         }
643
644         /* state->outdata now uses resources from c so we don't want c
645            to just dissappear from under us while state is still alive
646         */
647         talloc_steal(state, c);
648
649         state->state = CTDB_CONTROL_DONE;
650
651         /* if we had a callback registered for this control, pull the response
652            and call the callback.
653         */
654         if (state->async.fn) {
655                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
656                                  invoke_control_callback, state);
657         }
658 }
659
660
661 /*
662   destroy a ctdb_control in client
663 */
664 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
665 {
666         reqid_remove(state->ctdb->idr, state->reqid);
667         return 0;
668 }
669
670
671 /* time out handler for ctdb_control */
672 static void control_timeout_func(struct tevent_context *ev,
673                                  struct tevent_timer *te,
674                                  struct timeval t, void *private_data)
675 {
676         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
677
678         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
679                          "dstnode:%u\n", state->reqid, state->c->opcode,
680                          state->c->hdr.destnode));
681
682         state->state = CTDB_CONTROL_TIMEOUT;
683
684         /* if we had a callback registered for this control, pull the response
685            and call the callback.
686         */
687         if (state->async.fn) {
688                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
689                                  invoke_control_callback, state);
690         }
691 }
692
693 /* async version of send control request */
694 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
695                 uint32_t destnode, uint64_t srvid,
696                 uint32_t opcode, uint32_t flags, TDB_DATA data,
697                 TALLOC_CTX *mem_ctx,
698                 struct timeval *timeout,
699                 char **errormsg)
700 {
701         struct ctdb_client_control_state *state;
702         size_t len;
703         struct ctdb_req_control_old *c;
704         int ret;
705
706         if (errormsg) {
707                 *errormsg = NULL;
708         }
709
710         /* if the domain socket is not yet open, open it */
711         if (ctdb->daemon.sd==-1) {
712                 ctdb_socket_connect(ctdb);
713         }
714
715         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
716         CTDB_NO_MEMORY_NULL(ctdb, state);
717
718         state->ctdb       = ctdb;
719         state->reqid      = reqid_new(ctdb->idr, state);
720         state->state      = CTDB_CONTROL_WAIT;
721         state->errormsg   = NULL;
722
723         talloc_set_destructor(state, ctdb_client_control_destructor);
724
725         len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
726         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
727                                len, struct ctdb_req_control_old);
728         state->c            = c;
729         CTDB_NO_MEMORY_NULL(ctdb, c);
730         c->hdr.reqid        = state->reqid;
731         c->hdr.destnode     = destnode;
732         c->opcode           = opcode;
733         c->client_id        = 0;
734         c->flags            = flags;
735         c->srvid            = srvid;
736         c->datalen          = data.dsize;
737         if (data.dsize) {
738                 memcpy(&c->data[0], data.dptr, data.dsize);
739         }
740
741         /* timeout */
742         if (timeout && !timeval_is_zero(timeout)) {
743                 tevent_add_timer(ctdb->ev, state, *timeout,
744                                  control_timeout_func, state);
745         }
746
747         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
748         if (ret != 0) {
749                 talloc_free(state);
750                 return NULL;
751         }
752
753         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
754                 talloc_free(state);
755                 return NULL;
756         }
757
758         return state;
759 }
760
761
762 /* async version of receive control reply */
763 int ctdb_control_recv(struct ctdb_context *ctdb,
764                 struct ctdb_client_control_state *state,
765                 TALLOC_CTX *mem_ctx,
766                 TDB_DATA *outdata, int32_t *status, char **errormsg)
767 {
768         TALLOC_CTX *tmp_ctx;
769
770         if (status != NULL) {
771                 *status = -1;
772         }
773         if (errormsg != NULL) {
774                 *errormsg = NULL;
775         }
776
777         if (state == NULL) {
778                 return -1;
779         }
780
781         /* prevent double free of state */
782         tmp_ctx = talloc_new(ctdb);
783         talloc_steal(tmp_ctx, state);
784
785         /* loop one event at a time until we either timeout or the control
786            completes.
787         */
788         while (state->state == CTDB_CONTROL_WAIT) {
789                 tevent_loop_once(ctdb->ev);
790         }
791
792         if (state->state != CTDB_CONTROL_DONE) {
793                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
794                 if (state->async.fn) {
795                         state->async.fn(state);
796                 }
797                 talloc_free(tmp_ctx);
798                 return -1;
799         }
800
801         if (state->errormsg) {
802                 int s = (state->status == 0 ? -1 : state->status);
803                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
804                 if (errormsg) {
805                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
806                 }
807                 if (state->async.fn) {
808                         state->async.fn(state);
809                 }
810                 talloc_free(tmp_ctx);
811                 return s;
812         }
813
814         if (outdata) {
815                 *outdata = state->outdata;
816                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
817         }
818
819         if (status) {
820                 *status = state->status;
821         }
822
823         if (state->async.fn) {
824                 state->async.fn(state);
825         }
826
827         talloc_free(tmp_ctx);
828         return 0;
829 }
830
831
832
833 /*
834   send a ctdb control message
835   timeout specifies how long we should wait for a reply.
836   if timeout is NULL we wait indefinitely
837  */
838 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
839                  uint32_t opcode, uint32_t flags, TDB_DATA data,
840                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
841                  struct timeval *timeout,
842                  char **errormsg)
843 {
844         struct ctdb_client_control_state *state;
845
846         state = ctdb_control_send(ctdb, destnode, srvid, opcode,
847                         flags, data, mem_ctx,
848                         timeout, errormsg);
849
850         /* FIXME: Error conditions in ctdb_control_send return NULL without
851          * setting errormsg.  So, there is no way to distinguish between sucess
852          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
853         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
854                 if (status != NULL) {
855                         *status = 0;
856                 }
857                 return 0;
858         }
859
860         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
861                         errormsg);
862 }
863
864 /*
865   get vnn map from a remote node
866  */
867 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
868 {
869         int ret;
870         TDB_DATA outdata;
871         int32_t res;
872         struct ctdb_vnn_map_wire *map;
873
874         ret = ctdb_control(ctdb, destnode, 0,
875                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
876                            mem_ctx, &outdata, &res, &timeout, NULL);
877         if (ret != 0 || res != 0) {
878                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
879                 return -1;
880         }
881
882         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
883         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
884             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
885                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
886                 return -1;
887         }
888
889         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
890         CTDB_NO_MEMORY(ctdb, *vnnmap);
891         (*vnnmap)->generation = map->generation;
892         (*vnnmap)->size       = map->size;
893         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
894
895         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
896         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
897         talloc_free(outdata.dptr);
898
899         return 0;
900 }
901
902
903 /*
904   get the recovery mode of a remote node
905  */
906 struct ctdb_client_control_state *
907 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
908 {
909         return ctdb_control_send(ctdb, destnode, 0,
910                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
911                            mem_ctx, &timeout, NULL);
912 }
913
914 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
915 {
916         int ret;
917         int32_t res;
918
919         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
920         if (ret != 0) {
921                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
922                 return -1;
923         }
924
925         if (recmode) {
926                 *recmode = (uint32_t)res;
927         }
928
929         return 0;
930 }
931
932 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
933 {
934         struct ctdb_client_control_state *state;
935
936         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
937         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
938 }
939
940
941
942
943 /*
944   set the recovery mode of a remote node
945  */
946 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
947 {
948         int ret;
949         TDB_DATA data;
950         int32_t res;
951
952         data.dsize = sizeof(uint32_t);
953         data.dptr = (unsigned char *)&recmode;
954
955         ret = ctdb_control(ctdb, destnode, 0,
956                            CTDB_CONTROL_SET_RECMODE, 0, data,
957                            NULL, NULL, &res, &timeout, NULL);
958         if (ret != 0 || res != 0) {
959                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
960                 return -1;
961         }
962
963         return 0;
964 }
965
966
967
968 /*
969   get the recovery master of a remote node
970  */
971 struct ctdb_client_control_state *
972 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
973                         struct timeval timeout, uint32_t destnode)
974 {
975         return ctdb_control_send(ctdb, destnode, 0,
976                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
977                            mem_ctx, &timeout, NULL);
978 }
979
980 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
981 {
982         int ret;
983         int32_t res;
984
985         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
986         if (ret != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
988                 return -1;
989         }
990
991         if (recmaster) {
992                 *recmaster = (uint32_t)res;
993         }
994
995         return 0;
996 }
997
998 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
999 {
1000         struct ctdb_client_control_state *state;
1001
1002         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1003         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1004 }
1005
1006
1007 /*
1008   set the recovery master of a remote node
1009  */
1010 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1011 {
1012         int ret;
1013         TDB_DATA data;
1014         int32_t res;
1015
1016         ZERO_STRUCT(data);
1017         data.dsize = sizeof(uint32_t);
1018         data.dptr = (unsigned char *)&recmaster;
1019
1020         ret = ctdb_control(ctdb, destnode, 0,
1021                            CTDB_CONTROL_SET_RECMASTER, 0, data,
1022                            NULL, NULL, &res, &timeout, NULL);
1023         if (ret != 0 || res != 0) {
1024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1025                 return -1;
1026         }
1027
1028         return 0;
1029 }
1030
1031
1032 /*
1033   get a list of databases off a remote node
1034  */
1035 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1036                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1037 {
1038         int ret;
1039         TDB_DATA outdata;
1040         int32_t res;
1041
1042         ret = ctdb_control(ctdb, destnode, 0,
1043                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1044                            mem_ctx, &outdata, &res, &timeout, NULL);
1045         if (ret != 0 || res != 0) {
1046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1047                 return -1;
1048         }
1049
1050         *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1051         talloc_free(outdata.dptr);
1052
1053         return 0;
1054 }
1055
1056 /*
1057   get a list of nodes (vnn and flags ) from a remote node
1058  */
1059 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1060                 struct timeval timeout, uint32_t destnode,
1061                 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1062 {
1063         int ret;
1064         TDB_DATA outdata;
1065         int32_t res;
1066
1067         ret = ctdb_control(ctdb, destnode, 0,
1068                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1069                            mem_ctx, &outdata, &res, &timeout, NULL);
1070         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1072                 return -1;
1073         }
1074
1075         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1076         talloc_free(outdata.dptr);
1077         return 0;
1078 }
1079
1080 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb,
1081                            struct timeval timeout,
1082                            uint32_t destnode,
1083                            uint32_t *runstate)
1084 {
1085         TDB_DATA outdata;
1086         int32_t res;
1087         int ret;
1088
1089         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1090                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1091         if (ret != 0 || res != 0) {
1092                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1093                 return ret != 0 ? ret : res;
1094         }
1095
1096         if (outdata.dsize != sizeof(uint32_t)) {
1097                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1098                 talloc_free(outdata.dptr);
1099                 return -1;
1100         }
1101
1102         if (runstate != NULL) {
1103                 *runstate = *(uint32_t *)outdata.dptr;
1104         }
1105         talloc_free(outdata.dptr);
1106
1107         return 0;
1108 }
1109
1110 /*
1111   find the real path to a ltdb
1112  */
1113 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1114                    const char **path)
1115 {
1116         int ret;
1117         int32_t res;
1118         TDB_DATA data;
1119
1120         data.dptr = (uint8_t *)&dbid;
1121         data.dsize = sizeof(dbid);
1122
1123         ret = ctdb_control(ctdb, destnode, 0,
1124                            CTDB_CONTROL_GETDBPATH, 0, data,
1125                            mem_ctx, &data, &res, &timeout, NULL);
1126         if (ret != 0 || res != 0) {
1127                 return -1;
1128         }
1129
1130         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1131         if ((*path) == NULL) {
1132                 return -1;
1133         }
1134
1135         talloc_free(data.dptr);
1136
1137         return 0;
1138 }
1139
1140 /*
1141   find the name of a db
1142  */
1143 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1144                    const char **name)
1145 {
1146         int ret;
1147         int32_t res;
1148         TDB_DATA data;
1149
1150         data.dptr = (uint8_t *)&dbid;
1151         data.dsize = sizeof(dbid);
1152
1153         ret = ctdb_control(ctdb, destnode, 0,
1154                            CTDB_CONTROL_GET_DBNAME, 0, data,
1155                            mem_ctx, &data, &res, &timeout, NULL);
1156         if (ret != 0 || res != 0) {
1157                 return -1;
1158         }
1159
1160         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1161         if ((*name) == NULL) {
1162                 return -1;
1163         }
1164
1165         talloc_free(data.dptr);
1166
1167         return 0;
1168 }
1169
1170 /*
1171   create a database
1172  */
1173 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
1174                        uint32_t destnode, TALLOC_CTX *mem_ctx,
1175                        const char *name, uint8_t db_flags, uint32_t *db_id)
1176 {
1177         int ret;
1178         int32_t res;
1179         TDB_DATA data;
1180         uint32_t opcode;
1181
1182         data.dptr = discard_const(name);
1183         data.dsize = strlen(name)+1;
1184
1185         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
1186                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1187         } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
1188                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1189         } else {
1190                 opcode = CTDB_CONTROL_DB_ATTACH;
1191         }
1192
1193         ret = ctdb_control(ctdb, destnode, 0, opcode, 0, data,
1194                            mem_ctx, &data, &res, &timeout, NULL);
1195
1196         if (ret != 0 || res != 0) {
1197                 return -1;
1198         }
1199
1200         if (data.dsize != sizeof(uint32_t)) {
1201                 TALLOC_FREE(data.dptr);
1202                 return -1;
1203         }
1204         if (db_id != NULL) {
1205                 *db_id = *(uint32_t *)data.dptr;
1206         }
1207         talloc_free(data.dptr);
1208
1209         return 0;
1210 }
1211
1212 /*
1213   get debug level on a node
1214  */
1215 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1216 {
1217         int ret;
1218         int32_t res;
1219         TDB_DATA data;
1220
1221         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1222                            ctdb, &data, &res, NULL, NULL);
1223         if (ret != 0 || res != 0) {
1224                 return -1;
1225         }
1226         if (data.dsize != sizeof(int32_t)) {
1227                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1228                          (unsigned)data.dsize));
1229                 return -1;
1230         }
1231         *level = *(int32_t *)data.dptr;
1232         talloc_free(data.dptr);
1233         return 0;
1234 }
1235
1236 /*
1237  * Get db open flags
1238  */
1239 int ctdb_ctrl_db_open_flags(struct ctdb_context *ctdb, uint32_t db_id,
1240                             int *tdb_flags)
1241 {
1242         TDB_DATA indata, outdata;
1243         int ret;
1244         int32_t res;
1245
1246         indata.dptr = (uint8_t *)&db_id;
1247         indata.dsize = sizeof(db_id);
1248
1249         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
1250                            CTDB_CONTROL_DB_OPEN_FLAGS, 0, indata,
1251                            ctdb, &outdata, &res, NULL, NULL);
1252         if (ret != 0 || res != 0) {
1253                 D_ERR("ctdb control for db open flags failed\n");
1254                 return  -1;
1255         }
1256
1257         if (outdata.dsize != sizeof(int32_t)) {
1258                 D_ERR(__location__ " expected %zi bytes, received %zi bytes\n",
1259                       sizeof(int32_t), outdata.dsize);
1260                 talloc_free(outdata.dptr);
1261                 return -1;
1262         }
1263
1264         *tdb_flags = *(int32_t *)outdata.dptr;
1265         talloc_free(outdata.dptr);
1266         return 0;
1267 }
1268
1269 /*
1270   attach to a specific database - client call
1271 */
1272 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1273                                     struct timeval timeout,
1274                                     const char *name,
1275                                     uint8_t db_flags)
1276 {
1277         struct ctdb_db_context *ctdb_db;
1278         int ret;
1279         int tdb_flags;
1280
1281         ctdb_db = ctdb_db_handle(ctdb, name);
1282         if (ctdb_db) {
1283                 return ctdb_db;
1284         }
1285
1286         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1287         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1288
1289         ctdb_db->ctdb = ctdb;
1290         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1291         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1292
1293         /* tell ctdb daemon to attach */
1294         ret = ctdb_ctrl_createdb(ctdb, timeout, CTDB_CURRENT_NODE,
1295                                  ctdb_db, name, db_flags, &ctdb_db->db_id);
1296         if (ret != 0) {
1297                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1298                 talloc_free(ctdb_db);
1299                 return NULL;
1300         }
1301
1302         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1303         if (ret != 0) {
1304                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1305                 talloc_free(ctdb_db);
1306                 return NULL;
1307         }
1308
1309         ret = ctdb_ctrl_db_open_flags(ctdb, ctdb_db->db_id, &tdb_flags);
1310         if (ret != 0) {
1311                 D_ERR("Failed to get tdb_flags for database '%s'\n", name);
1312                 talloc_free(ctdb_db);
1313                 return NULL;
1314         }
1315
1316         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
1317                                       O_RDWR, 0);
1318         if (ctdb_db->ltdb == NULL) {
1319                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1320                 talloc_free(ctdb_db);
1321                 return NULL;
1322         }
1323
1324         ctdb_db->db_flags = db_flags;
1325
1326         DLIST_ADD(ctdb->db_list, ctdb_db);
1327
1328         /* add well known functions */
1329         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1330         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1331         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1332
1333         return ctdb_db;
1334 }
1335
1336 /*
1337   setup a call for a database
1338  */
1339 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1340 {
1341         struct ctdb_registered_call *call;
1342
1343         /* register locally */
1344         call = talloc(ctdb_db, struct ctdb_registered_call);
1345         call->fn = fn;
1346         call->id = id;
1347
1348         DLIST_ADD(ctdb_db->calls, call);
1349         return 0;
1350 }
1351
1352 /* Freeze all databases */
1353 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout,
1354                      uint32_t destnode)
1355 {
1356         int ret;
1357         int32_t res;
1358
1359         ret = ctdb_control(ctdb, destnode, 0,
1360                            CTDB_CONTROL_FREEZE, 0, tdb_null,
1361                            NULL, NULL, &res, &timeout, NULL);
1362         if (ret != 0 || res != 0) {
1363                 DEBUG(DEBUG_ERR, ("ctdb_ctrl_freeze_priority failed\n"));
1364                 return -1;
1365         }
1366
1367         return 0;
1368 }
1369
1370 /*
1371   get pnn of a node, or -1
1372  */
1373 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1374 {
1375         int ret;
1376         int32_t res;
1377
1378         ret = ctdb_control(ctdb, destnode, 0,
1379                            CTDB_CONTROL_GET_PNN, 0, tdb_null,
1380                            NULL, NULL, &res, &timeout, NULL);
1381         if (ret != 0) {
1382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1383                 return -1;
1384         }
1385
1386         return res;
1387 }
1388
1389 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1390                                    struct timeval timeout, uint32_t destnode,
1391                                    TALLOC_CTX *mem_ctx,
1392                                    uint32_t flags,
1393                                    struct ctdb_public_ip_list_old **ips)
1394 {
1395         int ret;
1396         TDB_DATA outdata;
1397         int32_t res;
1398
1399         ret = ctdb_control(ctdb, destnode, 0,
1400                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1401                            mem_ctx, &outdata, &res, &timeout, NULL);
1402         if (ret != 0 || res != 0) {
1403                 DEBUG(DEBUG_ERR,(__location__
1404                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
1405                                  ret, res));
1406                 return -1;
1407         }
1408
1409         *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1410         talloc_free(outdata.dptr);
1411
1412         return 0;
1413 }
1414
1415 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1416                              struct timeval timeout, uint32_t destnode,
1417                              TALLOC_CTX *mem_ctx,
1418                              struct ctdb_public_ip_list_old **ips)
1419 {
1420         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1421                                               destnode, mem_ctx,
1422                                               0, ips);
1423 }
1424
1425 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1426                          struct timeval timeout, uint32_t destnode,
1427                          TALLOC_CTX *mem_ctx,
1428                          struct ctdb_iface_list_old **_ifaces)
1429 {
1430         int ret;
1431         TDB_DATA outdata;
1432         int32_t res;
1433         struct ctdb_iface_list_old *ifaces;
1434         uint32_t len;
1435         uint32_t i;
1436
1437         ret = ctdb_control(ctdb, destnode, 0,
1438                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1439                            mem_ctx, &outdata, &res, &timeout, NULL);
1440         if (ret != 0 || res != 0) {
1441                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1442                                 "failed ret:%d res:%d\n",
1443                                 ret, res));
1444                 return -1;
1445         }
1446
1447         len = offsetof(struct ctdb_iface_list_old, ifaces);
1448         if (len > outdata.dsize) {
1449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1450                                 "returned invalid data with size %u > %u\n",
1451                                 (unsigned int)outdata.dsize,
1452                                 (unsigned int)len));
1453                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1454                 return -1;
1455         }
1456
1457         ifaces = (struct ctdb_iface_list_old *)outdata.dptr;
1458         len += ifaces->num*sizeof(struct ctdb_iface);
1459
1460         if (len > outdata.dsize) {
1461                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1462                                 "returned invalid data with size %u > %u\n",
1463                                 (unsigned int)outdata.dsize,
1464                                 (unsigned int)len));
1465                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1466                 return -1;
1467         }
1468
1469         /* make sure we null terminate the returned strings */
1470         for (i=0; i < ifaces->num; i++) {
1471                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1472         }
1473
1474         *_ifaces = (struct ctdb_iface_list_old *)talloc_memdup(mem_ctx,
1475                                                                   outdata.dptr,
1476                                                                   outdata.dsize);
1477         talloc_free(outdata.dptr);
1478         if (*_ifaces == NULL) {
1479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1480                                 "talloc_memdup size %u failed\n",
1481                                 (unsigned int)outdata.dsize));
1482                 return -1;
1483         }
1484
1485         return 0;
1486 }
1487
1488 /*
1489   set/clear the permanent disabled bit on a remote node
1490  */
1491 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1492                        uint32_t set, uint32_t clear)
1493 {
1494         int ret;
1495         TDB_DATA data;
1496         struct ctdb_node_map_old *nodemap=NULL;
1497         struct ctdb_node_flag_change c;
1498         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1499         uint32_t recmaster;
1500         uint32_t *nodes;
1501
1502
1503         /* find the recovery master */
1504         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1505         if (ret != 0) {
1506                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1507                 talloc_free(tmp_ctx);
1508                 return ret;
1509         }
1510
1511
1512         /* read the node flags from the recmaster */
1513         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1514         if (ret != 0) {
1515                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1516                 talloc_free(tmp_ctx);
1517                 return -1;
1518         }
1519         if (destnode >= nodemap->num) {
1520                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1521                 talloc_free(tmp_ctx);
1522                 return -1;
1523         }
1524
1525         c.pnn       = destnode;
1526         c.old_flags = nodemap->nodes[destnode].flags;
1527         c.new_flags = c.old_flags;
1528         c.new_flags |= set;
1529         c.new_flags &= ~clear;
1530
1531         data.dsize = sizeof(c);
1532         data.dptr = (unsigned char *)&c;
1533
1534         /* send the flags update to all connected nodes */
1535         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1536
1537         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1538                                         nodes, 0,
1539                                         timeout, false, data,
1540                                         NULL, NULL,
1541                                         NULL) != 0) {
1542                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1543
1544                 talloc_free(tmp_ctx);
1545                 return -1;
1546         }
1547
1548         talloc_free(tmp_ctx);
1549         return 0;
1550 }
1551
1552
1553 /*
1554   get all tunables
1555  */
1556 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
1557                                struct timeval timeout,
1558                                uint32_t destnode,
1559                                struct ctdb_tunable_list *tunables)
1560 {
1561         TDB_DATA outdata;
1562         int ret;
1563         int32_t res;
1564
1565         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1566                            &outdata, &res, &timeout, NULL);
1567         if (ret != 0 || res != 0) {
1568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1569                 return -1;
1570         }
1571
1572         if (outdata.dsize != sizeof(*tunables)) {
1573                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1574                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1575                 return -1;
1576         }
1577
1578         *tunables = *(struct ctdb_tunable_list *)outdata.dptr;
1579         talloc_free(outdata.dptr);
1580         return 0;
1581 }
1582
1583 /*
1584   set some ctdb flags
1585 */
1586 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
1587 {
1588         ctdb->flags |= flags;
1589 }
1590
1591 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
1592 {
1593         return ctdb->daemon.name;
1594 }
1595
1596 /*
1597   return the pnn of this node
1598 */
1599 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
1600 {
1601         return ctdb->pnn;
1602 }
1603
1604 /*
1605   callback for the async helpers used when sending the same control
1606   to multiple nodes in parallell.
1607 */
1608 static void async_callback(struct ctdb_client_control_state *state)
1609 {
1610         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
1611         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
1612         int ret;
1613         TDB_DATA outdata;
1614         int32_t res = -1;
1615         uint32_t destnode = state->c->hdr.destnode;
1616
1617         outdata.dsize = 0;
1618         outdata.dptr = NULL;
1619
1620         /* one more node has responded with recmode data */
1621         data->count--;
1622
1623         /* if we failed to push the db, then return an error and let
1624            the main loop try again.
1625         */
1626         if (state->state != CTDB_CONTROL_DONE) {
1627                 if ( !data->dont_log_errors) {
1628                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
1629                 }
1630                 data->fail_count++;
1631                 if (state->state == CTDB_CONTROL_TIMEOUT) {
1632                         res = -ETIMEDOUT;
1633                 } else {
1634                         res = -1;
1635                 }
1636                 if (data->fail_callback) {
1637                         data->fail_callback(ctdb, destnode, res, outdata,
1638                                         data->callback_data);
1639                 }
1640                 return;
1641         }
1642
1643         state->async.fn = NULL;
1644
1645         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
1646         if ((ret != 0) || (res != 0)) {
1647                 if ( !data->dont_log_errors) {
1648                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
1649                 }
1650                 data->fail_count++;
1651                 if (data->fail_callback) {
1652                         data->fail_callback(ctdb, destnode, res, outdata,
1653                                         data->callback_data);
1654                 }
1655         }
1656         if ((ret == 0) && (data->callback != NULL)) {
1657                 data->callback(ctdb, destnode, res, outdata,
1658                                         data->callback_data);
1659         }
1660 }
1661
1662
1663 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
1664 {
1665         /* set up the callback functions */
1666         state->async.fn = async_callback;
1667         state->async.private_data = data;
1668
1669         /* one more control to wait for to complete */
1670         data->count++;
1671 }
1672
1673
1674 /* wait for up to the maximum number of seconds allowed
1675    or until all nodes we expect a response from has replied
1676 */
1677 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
1678 {
1679         while (data->count > 0) {
1680                 tevent_loop_once(ctdb->ev);
1681         }
1682         if (data->fail_count != 0) {
1683                 if (!data->dont_log_errors) {
1684                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
1685                                  data->fail_count));
1686                 }
1687                 return -1;
1688         }
1689         return 0;
1690 }
1691
1692
1693 /*
1694    perform a simple control on the listed nodes
1695    The control cannot return data
1696  */
1697 int ctdb_client_async_control(struct ctdb_context *ctdb,
1698                                 enum ctdb_controls opcode,
1699                                 uint32_t *nodes,
1700                                 uint64_t srvid,
1701                                 struct timeval timeout,
1702                                 bool dont_log_errors,
1703                                 TDB_DATA data,
1704                                 client_async_callback client_callback,
1705                                 client_async_callback fail_callback,
1706                                 void *callback_data)
1707 {
1708         struct client_async_data *async_data;
1709         struct ctdb_client_control_state *state;
1710         int j, num_nodes;
1711
1712         async_data = talloc_zero(ctdb, struct client_async_data);
1713         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
1714         async_data->dont_log_errors = dont_log_errors;
1715         async_data->callback = client_callback;
1716         async_data->fail_callback = fail_callback;
1717         async_data->callback_data = callback_data;
1718         async_data->opcode        = opcode;
1719
1720         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
1721
1722         /* loop over all nodes and send an async control to each of them */
1723         for (j=0; j<num_nodes; j++) {
1724                 uint32_t pnn = nodes[j];
1725
1726                 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
1727                                           0, data, async_data, &timeout, NULL);
1728                 if (state == NULL) {
1729                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
1730                         talloc_free(async_data);
1731                         return -1;
1732                 }
1733
1734                 ctdb_client_async_add(async_data, state);
1735         }
1736
1737         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
1738                 talloc_free(async_data);
1739                 return -1;
1740         }
1741
1742         talloc_free(async_data);
1743         return 0;
1744 }
1745
1746 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
1747                                 struct ctdb_vnn_map *vnn_map,
1748                                 TALLOC_CTX *mem_ctx,
1749                                 bool include_self)
1750 {
1751         unsigned int i, j, num_nodes;
1752         uint32_t *nodes;
1753
1754         for (i=num_nodes=0;i<vnn_map->size;i++) {
1755                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
1756                         continue;
1757                 }
1758                 num_nodes++;
1759         }
1760
1761         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
1762         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
1763
1764         for (i=j=0;i<vnn_map->size;i++) {
1765                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
1766                         continue;
1767                 }
1768                 nodes[j++] = vnn_map->map[i];
1769         }
1770
1771         return nodes;
1772 }
1773
1774 /* Get list of nodes not including those with flags specified by mask */
1775 static uint32_t *list_of_nodes(struct ctdb_context *ctdb,
1776                                struct ctdb_node_map_old *node_map,
1777                                TALLOC_CTX *mem_ctx,
1778                                uint32_t mask,
1779                                bool include_self)
1780 {
1781         unsigned int i, j, num_nodes;
1782         uint32_t exclude_pnn;
1783         uint32_t *nodes;
1784
1785         exclude_pnn = include_self ? CTDB_UNKNOWN_PNN : ctdb->pnn;
1786
1787         for (i=num_nodes=0;i<node_map->num;i++) {
1788                 if (node_map->nodes[i].flags & mask) {
1789                         continue;
1790                 }
1791                 if (node_map->nodes[i].pnn == exclude_pnn) {
1792                         continue;
1793                 }
1794                 num_nodes++;
1795         }
1796
1797         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
1798         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
1799
1800         for (i=j=0;i<node_map->num;i++) {
1801                 if (node_map->nodes[i].flags & mask) {
1802                         continue;
1803                 }
1804                 if (node_map->nodes[i].pnn == exclude_pnn) {
1805                         continue;
1806                 }
1807                 nodes[j++] = node_map->nodes[i].pnn;
1808         }
1809
1810         return nodes;
1811 }
1812
1813 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
1814                                 struct ctdb_node_map_old *node_map,
1815                                 TALLOC_CTX *mem_ctx,
1816                                 bool include_self)
1817 {
1818         return list_of_nodes(ctdb,
1819                              node_map,
1820                              mem_ctx,
1821                              NODE_FLAGS_INACTIVE,
1822                              include_self);
1823 }
1824
1825 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
1826                                 struct ctdb_node_map_old *node_map,
1827                                 TALLOC_CTX *mem_ctx,
1828                                 bool include_self)
1829 {
1830         return list_of_nodes(ctdb,
1831                              node_map,
1832                              mem_ctx,
1833                              NODE_FLAGS_DISCONNECTED,
1834                              include_self);
1835 }
1836
1837 /*
1838   get capabilities of a remote node
1839  */
1840 struct ctdb_client_control_state *
1841 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1842 {
1843         return ctdb_control_send(ctdb, destnode, 0,
1844                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
1845                            mem_ctx, &timeout, NULL);
1846 }
1847
1848 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
1849 {
1850         int ret;
1851         int32_t res;
1852         TDB_DATA outdata;
1853
1854         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
1855         if ( (ret != 0) || (res != 0) ) {
1856                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
1857                 return -1;
1858         }
1859
1860         if (capabilities) {
1861                 *capabilities = *((uint32_t *)outdata.dptr);
1862         }
1863
1864         return 0;
1865 }
1866
1867 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
1868 {
1869         struct ctdb_client_control_state *state;
1870         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1871         int ret;
1872
1873         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
1874         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
1875         talloc_free(tmp_ctx);
1876         return ret;
1877 }
1878
1879 static void get_capabilities_callback(struct ctdb_context *ctdb,
1880                                       uint32_t node_pnn, int32_t res,
1881                                       TDB_DATA outdata, void *callback_data)
1882 {
1883         struct ctdb_node_capabilities *caps =
1884                 talloc_get_type(callback_data,
1885                                 struct ctdb_node_capabilities);
1886
1887         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
1888                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
1889                 return;
1890         }
1891
1892         if (node_pnn >= talloc_array_length(caps)) {
1893                 DEBUG(DEBUG_ERR,
1894                       (__location__ " unexpected PNN %u\n", node_pnn));
1895                 return;
1896         }
1897
1898         caps[node_pnn].retrieved = true;
1899         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
1900 }
1901
1902 struct ctdb_node_capabilities *
1903 ctdb_get_capabilities(struct ctdb_context *ctdb,
1904                       TALLOC_CTX *mem_ctx,
1905                       struct timeval timeout,
1906                       struct ctdb_node_map_old *nodemap)
1907 {
1908         uint32_t *nodes;
1909         uint32_t i, res;
1910         struct ctdb_node_capabilities *ret;
1911
1912         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1913
1914         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
1915                            nodemap->num);
1916         CTDB_NO_MEMORY_NULL(ctdb, ret);
1917         /* Prepopulate the expected PNNs */
1918         for (i = 0; i < talloc_array_length(ret); i++) {
1919                 ret[i].retrieved = false;
1920         }
1921
1922         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
1923                                         nodes, 0, timeout,
1924                                         false, tdb_null,
1925                                         get_capabilities_callback, NULL,
1926                                         ret);
1927         if (res != 0) {
1928                 DEBUG(DEBUG_ERR,
1929                       (__location__ " Failed to read node capabilities.\n"));
1930                 TALLOC_FREE(ret);
1931         }
1932
1933         return ret;
1934 }
1935
1936 uint32_t *
1937 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
1938                            uint32_t pnn)
1939 {
1940         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
1941                 return &caps[pnn].capabilities;
1942         }
1943
1944         return NULL;
1945 }
1946
1947 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
1948                                 uint32_t pnn,
1949                                 uint32_t capabilities_required)
1950 {
1951         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
1952         return (capp != NULL) &&
1953                 ((*capp & capabilities_required) == capabilities_required);
1954 }
1955
1956 /*
1957   recovery daemon ping to main daemon
1958  */
1959 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
1960 {
1961         int ret;
1962         int32_t res;
1963
1964         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
1965                            ctdb, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
1968                 return -1;
1969         }
1970
1971         return 0;
1972 }
1973
1974 /*
1975   tell the main daemon how long it took to lock the reclock file
1976  */
1977 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
1978 {
1979         int ret;
1980         int32_t res;
1981         TDB_DATA data;
1982
1983         data.dptr = (uint8_t *)&latency;
1984         data.dsize = sizeof(latency);
1985
1986         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
1987                            ctdb, NULL, &res, NULL, NULL);
1988         if (ret != 0 || res != 0) {
1989                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
1990                 return -1;
1991         }
1992
1993         return 0;
1994 }
1995
1996 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout,
1997                       uint32_t destnode, struct ctdb_ban_state *bantime)
1998 {
1999         int ret;
2000         TDB_DATA data;
2001         int32_t res;
2002
2003         data.dsize = sizeof(*bantime);
2004         data.dptr  = (uint8_t *)bantime;
2005
2006         ret = ctdb_control(ctdb, destnode, 0,
2007                            CTDB_CONTROL_SET_BAN_STATE, 0, data,
2008                            NULL, NULL, &res, &timeout, NULL);
2009         if (ret != 0 || res != 0) {
2010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
2011                 return -1;
2012         }
2013
2014         return 0;
2015 }
2016
2017 struct ctdb_client_control_state *
2018 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
2019 {
2020         struct ctdb_client_control_state *handle;
2021         struct ctdb_marshall_buffer *m;
2022         struct ctdb_rec_data_old *rec;
2023         TDB_DATA outdata;
2024
2025         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
2026         if (m == NULL) {
2027                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
2028                 return NULL;
2029         }
2030
2031         m->db_id = ctdb_db->db_id;
2032
2033         rec = ctdb_marshall_record(m, 0, key, header, data);
2034         if (rec == NULL) {
2035                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
2036                 talloc_free(m);
2037                 return NULL;
2038         }
2039         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
2040         if (m == NULL) {
2041                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
2042                 talloc_free(m);
2043                 return NULL;
2044         }
2045         m->count++;
2046         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
2047
2048
2049         outdata.dptr = (uint8_t *)m;
2050         outdata.dsize = talloc_get_size(m);
2051
2052         handle = ctdb_control_send(ctdb, destnode, 0,
2053                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
2054                            mem_ctx, &timeout, NULL);
2055         talloc_free(m);
2056         return handle;
2057 }
2058
2059 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
2060 {
2061         int ret;
2062         int32_t res;
2063
2064         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
2065         if ( (ret != 0) || (res != 0) ){
2066                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
2067                 return -1;
2068         }
2069
2070         return 0;
2071 }
2072
2073 int
2074 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
2075 {
2076         struct ctdb_client_control_state *state;
2077
2078         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
2079         return ctdb_ctrl_updaterecord_recv(ctdb, state);
2080 }