419333025cbed1c4f43347ab04a1005cc058bb54
[samba.git] / ctdb / server / ctdb_client.c
1 /*
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/time.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 /*
45   allocate a packet for use in client<->daemon communication
46  */
47 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
48                                             TALLOC_CTX *mem_ctx,
49                                             enum ctdb_operation operation,
50                                             size_t length, size_t slength,
51                                             const char *type)
52 {
53         int size;
54         struct ctdb_req_header *hdr;
55
56         length = MAX(length, slength);
57         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
58
59         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
60         if (hdr == NULL) {
61                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
62                          operation, (unsigned)length));
63                 return NULL;
64         }
65         talloc_set_name_const(hdr, type);
66         hdr->length       = length;
67         hdr->operation    = operation;
68         hdr->ctdb_magic   = CTDB_MAGIC;
69         hdr->ctdb_version = CTDB_PROTOCOL;
70         hdr->srcnode      = ctdb->pnn;
71         if (ctdb->vnn_map) {
72                 hdr->generation = ctdb->vnn_map->generation;
73         }
74
75         return hdr;
76 }
77
78 /*
79   local version of ctdb_call
80 */
81 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
82                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
83                     TDB_DATA *data, bool updatetdb)
84 {
85         struct ctdb_call_info *c;
86         struct ctdb_registered_call *fn;
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88
89         c = talloc_zero(mem_ctx, struct ctdb_call_info);
90         CTDB_NO_MEMORY(ctdb, c);
91
92         c->key = call->key;
93         c->call_data = &call->call_data;
94         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
95         c->record_data.dsize = data->dsize;
96         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
97         c->header = header;
98
99         for (fn=ctdb_db->calls;fn;fn=fn->next) {
100                 if (fn->id == call->call_id) break;
101         }
102         if (fn == NULL) {
103                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (fn->fn(c) != 0) {
109                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
110                 talloc_free(c);
111                 return -1;
112         }
113
114         /* we need to force the record to be written out if this was a remote access */
115         if (c->new_data == NULL) {
116                 c->new_data = &c->record_data;
117         }
118
119         if (c->new_data && updatetdb) {
120                 /* XXX check that we always have the lock here? */
121                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
122                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
123                         talloc_free(c);
124                         return -1;
125                 }
126         }
127
128         if (c->reply_data) {
129                 call->reply_data = *c->reply_data;
130
131                 talloc_steal(call, call->reply_data.dptr);
132                 talloc_set_name_const(call->reply_data.dptr, __location__);
133         } else {
134                 call->reply_data.dptr = NULL;
135                 call->reply_data.dsize = 0;
136         }
137         call->status = c->status;
138
139         talloc_free(c);
140
141         return 0;
142 }
143
144
145 /*
146   queue a packet for sending from client to daemon
147 */
148 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
149 {
150         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
151 }
152
153
154 /*
155   called when a CTDB_REPLY_CALL packet comes in in the client
156
157   This packet comes in response to a CTDB_REQ_CALL request packet. It
158   contains any reply data from the call
159 */
160 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
161 {
162         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
163         struct ctdb_client_call_state *state;
164
165         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
166         if (state == NULL) {
167                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
168                 return;
169         }
170
171         if (hdr->reqid != state->reqid) {
172                 /* we found a record  but it was the wrong one */
173                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
174                 return;
175         }
176
177         state->call->reply_data.dptr = c->data;
178         state->call->reply_data.dsize = c->datalen;
179         state->call->status = c->status;
180
181         talloc_steal(state, c);
182
183         state->state = CTDB_CALL_DONE;
184
185         if (state->async.fn) {
186                 state->async.fn(state);
187         }
188 }
189
190 void ctdb_request_message(struct ctdb_context *ctdb,
191                           struct ctdb_req_header *hdr)
192 {
193         struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
194         TDB_DATA data;
195
196         data.dsize = c->datalen;
197         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
198         if (data.dptr == NULL) {
199                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
200                 return;
201         }
202
203         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
204 }
205
206 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
207
208 /*
209   this is called in the client, when data comes in from the daemon
210  */
211 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
212 {
213         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
214         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
215         TALLOC_CTX *tmp_ctx;
216
217         /* place the packet as a child of a tmp_ctx. We then use
218            talloc_free() below to free it. If any of the calls want
219            to keep it, then they will steal it somewhere else, and the
220            talloc_free() will be a no-op */
221         tmp_ctx = talloc_new(ctdb);
222         talloc_steal(tmp_ctx, hdr);
223
224         if (cnt == 0) {
225                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
226                 exit(1);
227         }
228
229         if (cnt < sizeof(*hdr)) {
230                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
231                 goto done;
232         }
233         if (cnt != hdr->length) {
234                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
235                                (unsigned)hdr->length, (unsigned)cnt);
236                 goto done;
237         }
238
239         if (hdr->ctdb_magic != CTDB_MAGIC) {
240                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
241                 goto done;
242         }
243
244         if (hdr->ctdb_version != CTDB_PROTOCOL) {
245                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
246                 goto done;
247         }
248
249         switch (hdr->operation) {
250         case CTDB_REPLY_CALL:
251                 ctdb_client_reply_call(ctdb, hdr);
252                 break;
253
254         case CTDB_REQ_MESSAGE:
255                 ctdb_request_message(ctdb, hdr);
256                 break;
257
258         case CTDB_REPLY_CONTROL:
259                 ctdb_client_reply_control(ctdb, hdr);
260                 break;
261
262         default:
263                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
264         }
265
266 done:
267         talloc_free(tmp_ctx);
268 }
269
270 /*
271   connect to a unix domain socket
272 */
273 int ctdb_socket_connect(struct ctdb_context *ctdb)
274 {
275         struct sockaddr_un addr;
276         int ret;
277
278         memset(&addr, 0, sizeof(addr));
279         addr.sun_family = AF_UNIX;
280         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
281
282         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
283         if (ctdb->daemon.sd == -1) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
285                 return -1;
286         }
287
288         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
289                 DEBUG(DEBUG_ERR,
290                       (__location__
291                        "Failed to connect client socket to daemon (%s)\n",
292                        strerror(errno)));
293                 close(ctdb->daemon.sd);
294                 ctdb->daemon.sd = -1;
295                 return -1;
296         }
297
298         ret = set_blocking(ctdb->daemon.sd, false);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR,
301                       (__location__
302                        " failed to set socket non-blocking (%s)\n",
303                        strerror(errno)));
304                 close(ctdb->daemon.sd);
305                 ctdb->daemon.sd = -1;
306                 return -1;
307         }
308
309         set_close_on_exec(ctdb->daemon.sd);
310
311         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
312                                               CTDB_DS_ALIGNMENT,
313                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
314         return 0;
315 }
316
317
318 struct ctdb_record_handle {
319         struct ctdb_db_context *ctdb_db;
320         TDB_DATA key;
321         TDB_DATA *data;
322         struct ctdb_ltdb_header header;
323 };
324
325
326 /*
327   make a recv call to the local ctdb daemon - called from client context
328
329   This is called when the program wants to wait for a ctdb_call to complete and get the
330   results. This call will block unless the call has already completed.
331 */
332 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
333 {
334         if (state == NULL) {
335                 return -1;
336         }
337
338         while (state->state < CTDB_CALL_DONE) {
339                 tevent_loop_once(state->ctdb_db->ctdb->ev);
340         }
341         if (state->state != CTDB_CALL_DONE) {
342                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
343                 talloc_free(state);
344                 return -1;
345         }
346
347         if (state->call->reply_data.dsize) {
348                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
349                                                       state->call->reply_data.dptr,
350                                                       state->call->reply_data.dsize);
351                 call->reply_data.dsize = state->call->reply_data.dsize;
352         } else {
353                 call->reply_data.dptr = NULL;
354                 call->reply_data.dsize = 0;
355         }
356         call->status = state->call->status;
357         talloc_free(state);
358
359         return call->status;
360 }
361
362
363
364
365 /*
366   destroy a ctdb_call in client
367 */
368 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
369 {
370         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
371         return 0;
372 }
373
374 /*
375   construct an event driven local ctdb_call
376
377   this is used so that locally processed ctdb_call requests are processed
378   in an event driven manner
379 */
380 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
381                                                                   struct ctdb_call *call,
382                                                                   struct ctdb_ltdb_header *header,
383                                                                   TDB_DATA *data)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         int ret;
388
389         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
390         CTDB_NO_MEMORY_NULL(ctdb, state);
391         state->call = talloc_zero(state, struct ctdb_call);
392         CTDB_NO_MEMORY_NULL(ctdb, state->call);
393
394         talloc_steal(state, data->dptr);
395
396         state->state   = CTDB_CALL_DONE;
397         *(state->call) = *call;
398         state->ctdb_db = ctdb_db;
399
400         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
401         if (ret != 0) {
402                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
403         }
404
405         return state;
406 }
407
408 /*
409   make a ctdb call to the local daemon - async send. Called from client context.
410
411   This constructs a ctdb_call request and queues it for processing.
412   This call never blocks.
413 */
414 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
415                                               struct ctdb_call *call)
416 {
417         struct ctdb_client_call_state *state;
418         struct ctdb_context *ctdb = ctdb_db->ctdb;
419         struct ctdb_ltdb_header header;
420         TDB_DATA data;
421         int ret;
422         size_t len;
423         struct ctdb_req_call_old *c;
424
425         /* if the domain socket is not yet open, open it */
426         if (ctdb->daemon.sd==-1) {
427                 ctdb_socket_connect(ctdb);
428         }
429
430         ret = ctdb_ltdb_lock(ctdb_db, call->key);
431         if (ret != 0) {
432                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
433                 return NULL;
434         }
435
436         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
437
438         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
439                 ret = -1;
440         }
441
442         if (ret == 0 && header.dmaster == ctdb->pnn) {
443                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
444                 talloc_free(data.dptr);
445                 ctdb_ltdb_unlock(ctdb_db, call->key);
446                 return state;
447         }
448
449         ctdb_ltdb_unlock(ctdb_db, call->key);
450         talloc_free(data.dptr);
451
452         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
453         if (state == NULL) {
454                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
455                 return NULL;
456         }
457         state->call = talloc_zero(state, struct ctdb_call);
458         if (state->call == NULL) {
459                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
460                 return NULL;
461         }
462
463         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
464         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
465         if (c == NULL) {
466                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
467                 return NULL;
468         }
469
470         state->reqid     = reqid_new(ctdb->idr, state);
471         state->ctdb_db = ctdb_db;
472         talloc_set_destructor(state, ctdb_client_call_destructor);
473
474         c->hdr.reqid     = state->reqid;
475         c->flags         = call->flags;
476         c->db_id         = ctdb_db->db_id;
477         c->callid        = call->call_id;
478         c->hopcount      = 0;
479         c->keylen        = call->key.dsize;
480         c->calldatalen   = call->call_data.dsize;
481         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
482         memcpy(&c->data[call->key.dsize],
483                call->call_data.dptr, call->call_data.dsize);
484         *(state->call)              = *call;
485         state->call->call_data.dptr = &c->data[call->key.dsize];
486         state->call->key.dptr       = &c->data[0];
487
488         state->state  = CTDB_CALL_WAIT;
489
490
491         ctdb_client_queue_pkt(ctdb, &c->hdr);
492
493         return state;
494 }
495
496
497 /*
498   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
499 */
500 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
501 {
502         struct ctdb_client_call_state *state;
503
504         state = ctdb_call_send(ctdb_db, call);
505         return ctdb_call_recv(state, call);
506 }
507
508
509 /*
510   tell the daemon what messaging srvid we will use, and register the message
511   handler function in the client
512 */
513 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
514                                     srvid_handler_fn handler,
515                                     void *private_data)
516 {
517         int res;
518         int32_t status;
519
520         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
521                            CTDB_CONTROL_REGISTER_SRVID, 0,
522                            tdb_null, NULL, NULL, &status, NULL, NULL);
523         if (res != 0 || status != 0) {
524                 DEBUG(DEBUG_ERR,
525                       ("Failed to register srvid %llu\n",
526                        (unsigned long long)srvid));
527                 return -1;
528         }
529
530         /* also need to register the handler with our own ctdb structure */
531         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
532 }
533
534 /*
535   tell the daemon we no longer want a srvid
536 */
537 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
538                                        uint64_t srvid, void *private_data)
539 {
540         int res;
541         int32_t status;
542
543         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
544                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
545                            tdb_null, NULL, NULL, &status, NULL, NULL);
546         if (res != 0 || status != 0) {
547                 DEBUG(DEBUG_ERR,
548                       ("Failed to deregister srvid %llu\n",
549                        (unsigned long long)srvid));
550                 return -1;
551         }
552
553         /* also need to register the handler with our own ctdb structure */
554         srvid_deregister(ctdb->srv, srvid, private_data);
555         return 0;
556 }
557
558 /*
559   send a message - from client context
560  */
561 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
562                       uint64_t srvid, TDB_DATA data)
563 {
564         struct ctdb_req_message_old *r;
565         int len, res;
566
567         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
568         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
569                                len, struct ctdb_req_message_old);
570         CTDB_NO_MEMORY(ctdb, r);
571
572         r->hdr.destnode  = pnn;
573         r->srvid         = srvid;
574         r->datalen       = data.dsize;
575         memcpy(&r->data[0], data.dptr, data.dsize);
576
577         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
578         talloc_free(r);
579         return res;
580 }
581
582
583 /*
584    called when a control completes or timesout to invoke the callback
585    function the user provided
586 */
587 static void invoke_control_callback(struct tevent_context *ev,
588                                     struct tevent_timer *te,
589                                     struct timeval t, void *private_data)
590 {
591         struct ctdb_client_control_state *state;
592         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
593         int ret;
594
595         state = talloc_get_type(private_data, struct ctdb_client_control_state);
596         talloc_steal(tmp_ctx, state);
597
598         ret = ctdb_control_recv(state->ctdb, state, state,
599                         NULL,
600                         NULL,
601                         NULL);
602         if (ret != 0) {
603                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
604         }
605
606         talloc_free(tmp_ctx);
607 }
608
609 /*
610   called when a CTDB_REPLY_CONTROL packet comes in in the client
611
612   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
613   contains any reply data from the control
614 */
615 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
616                                       struct ctdb_req_header *hdr)
617 {
618         struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
619         struct ctdb_client_control_state *state;
620
621         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
622         if (state == NULL) {
623                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
624                 return;
625         }
626
627         if (hdr->reqid != state->reqid) {
628                 /* we found a record  but it was the wrong one */
629                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
630                 return;
631         }
632
633         state->outdata.dptr = c->data;
634         state->outdata.dsize = c->datalen;
635         state->status = c->status;
636         if (c->errorlen) {
637                 state->errormsg = talloc_strndup(state,
638                                                  (char *)&c->data[c->datalen],
639                                                  c->errorlen);
640         }
641
642         /* state->outdata now uses resources from c so we don't want c
643            to just dissappear from under us while state is still alive
644         */
645         talloc_steal(state, c);
646
647         state->state = CTDB_CONTROL_DONE;
648
649         /* if we had a callback registered for this control, pull the response
650            and call the callback.
651         */
652         if (state->async.fn) {
653                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
654                                  invoke_control_callback, state);
655         }
656 }
657
658
659 /*
660   destroy a ctdb_control in client
661 */
662 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
663 {
664         reqid_remove(state->ctdb->idr, state->reqid);
665         return 0;
666 }
667
668
669 /* time out handler for ctdb_control */
670 static void control_timeout_func(struct tevent_context *ev,
671                                  struct tevent_timer *te,
672                                  struct timeval t, void *private_data)
673 {
674         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
675
676         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
677                          "dstnode:%u\n", state->reqid, state->c->opcode,
678                          state->c->hdr.destnode));
679
680         state->state = CTDB_CONTROL_TIMEOUT;
681
682         /* if we had a callback registered for this control, pull the response
683            and call the callback.
684         */
685         if (state->async.fn) {
686                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
687                                  invoke_control_callback, state);
688         }
689 }
690
691 /* async version of send control request */
692 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
693                 uint32_t destnode, uint64_t srvid,
694                 uint32_t opcode, uint32_t flags, TDB_DATA data,
695                 TALLOC_CTX *mem_ctx,
696                 struct timeval *timeout,
697                 char **errormsg)
698 {
699         struct ctdb_client_control_state *state;
700         size_t len;
701         struct ctdb_req_control_old *c;
702         int ret;
703
704         if (errormsg) {
705                 *errormsg = NULL;
706         }
707
708         /* if the domain socket is not yet open, open it */
709         if (ctdb->daemon.sd==-1) {
710                 ctdb_socket_connect(ctdb);
711         }
712
713         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
714         CTDB_NO_MEMORY_NULL(ctdb, state);
715
716         state->ctdb       = ctdb;
717         state->reqid      = reqid_new(ctdb->idr, state);
718         state->state      = CTDB_CONTROL_WAIT;
719         state->errormsg   = NULL;
720
721         talloc_set_destructor(state, ctdb_client_control_destructor);
722
723         len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
724         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
725                                len, struct ctdb_req_control_old);
726         state->c            = c;
727         CTDB_NO_MEMORY_NULL(ctdb, c);
728         c->hdr.reqid        = state->reqid;
729         c->hdr.destnode     = destnode;
730         c->opcode           = opcode;
731         c->client_id        = 0;
732         c->flags            = flags;
733         c->srvid            = srvid;
734         c->datalen          = data.dsize;
735         if (data.dsize) {
736                 memcpy(&c->data[0], data.dptr, data.dsize);
737         }
738
739         /* timeout */
740         if (timeout && !timeval_is_zero(timeout)) {
741                 tevent_add_timer(ctdb->ev, state, *timeout,
742                                  control_timeout_func, state);
743         }
744
745         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
746         if (ret != 0) {
747                 talloc_free(state);
748                 return NULL;
749         }
750
751         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
752                 talloc_free(state);
753                 return NULL;
754         }
755
756         return state;
757 }
758
759
760 /* async version of receive control reply */
761 int ctdb_control_recv(struct ctdb_context *ctdb,
762                 struct ctdb_client_control_state *state,
763                 TALLOC_CTX *mem_ctx,
764                 TDB_DATA *outdata, int32_t *status, char **errormsg)
765 {
766         TALLOC_CTX *tmp_ctx;
767
768         if (status != NULL) {
769                 *status = -1;
770         }
771         if (errormsg != NULL) {
772                 *errormsg = NULL;
773         }
774
775         if (state == NULL) {
776                 return -1;
777         }
778
779         /* prevent double free of state */
780         tmp_ctx = talloc_new(ctdb);
781         talloc_steal(tmp_ctx, state);
782
783         /* loop one event at a time until we either timeout or the control
784            completes.
785         */
786         while (state->state == CTDB_CONTROL_WAIT) {
787                 tevent_loop_once(ctdb->ev);
788         }
789
790         if (state->state != CTDB_CONTROL_DONE) {
791                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
792                 if (state->async.fn) {
793                         state->async.fn(state);
794                 }
795                 talloc_free(tmp_ctx);
796                 return -1;
797         }
798
799         if (state->errormsg) {
800                 int s = (state->status == 0 ? -1 : state->status);
801                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
802                 if (errormsg) {
803                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
804                 }
805                 if (state->async.fn) {
806                         state->async.fn(state);
807                 }
808                 talloc_free(tmp_ctx);
809                 return s;
810         }
811
812         if (outdata) {
813                 *outdata = state->outdata;
814                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
815         }
816
817         if (status) {
818                 *status = state->status;
819         }
820
821         if (state->async.fn) {
822                 state->async.fn(state);
823         }
824
825         talloc_free(tmp_ctx);
826         return 0;
827 }
828
829
830
831 /*
832   send a ctdb control message
833   timeout specifies how long we should wait for a reply.
834   if timeout is NULL we wait indefinitely
835  */
836 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
837                  uint32_t opcode, uint32_t flags, TDB_DATA data,
838                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
839                  struct timeval *timeout,
840                  char **errormsg)
841 {
842         struct ctdb_client_control_state *state;
843
844         state = ctdb_control_send(ctdb, destnode, srvid, opcode,
845                         flags, data, mem_ctx,
846                         timeout, errormsg);
847
848         /* FIXME: Error conditions in ctdb_control_send return NULL without
849          * setting errormsg.  So, there is no way to distinguish between sucess
850          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
851         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
852                 if (status != NULL) {
853                         *status = 0;
854                 }
855                 return 0;
856         }
857
858         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
859                         errormsg);
860 }
861
862 /*
863   get vnn map from a remote node
864  */
865 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
866 {
867         int ret;
868         TDB_DATA outdata;
869         int32_t res;
870         struct ctdb_vnn_map_wire *map;
871
872         ret = ctdb_control(ctdb, destnode, 0,
873                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
874                            mem_ctx, &outdata, &res, &timeout, NULL);
875         if (ret != 0 || res != 0) {
876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
877                 return -1;
878         }
879
880         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
881         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
882             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
883                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
884                 return -1;
885         }
886
887         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
888         CTDB_NO_MEMORY(ctdb, *vnnmap);
889         (*vnnmap)->generation = map->generation;
890         (*vnnmap)->size       = map->size;
891         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
892
893         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
894         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
895         talloc_free(outdata.dptr);
896
897         return 0;
898 }
899
900
901 /*
902   get the recovery mode of a remote node
903  */
904 struct ctdb_client_control_state *
905 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
906 {
907         return ctdb_control_send(ctdb, destnode, 0,
908                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
909                            mem_ctx, &timeout, NULL);
910 }
911
912 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
913 {
914         int ret;
915         int32_t res;
916
917         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
918         if (ret != 0) {
919                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
920                 return -1;
921         }
922
923         if (recmode) {
924                 *recmode = (uint32_t)res;
925         }
926
927         return 0;
928 }
929
930 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
931 {
932         struct ctdb_client_control_state *state;
933
934         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
935         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
936 }
937
938
939
940
941 /*
942   set the recovery mode of a remote node
943  */
944 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
945 {
946         int ret;
947         TDB_DATA data;
948         int32_t res;
949
950         data.dsize = sizeof(uint32_t);
951         data.dptr = (unsigned char *)&recmode;
952
953         ret = ctdb_control(ctdb, destnode, 0,
954                            CTDB_CONTROL_SET_RECMODE, 0, data,
955                            NULL, NULL, &res, &timeout, NULL);
956         if (ret != 0 || res != 0) {
957                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
958                 return -1;
959         }
960
961         return 0;
962 }
963
964
965
966 /*
967   get the recovery master of a remote node
968  */
969 struct ctdb_client_control_state *
970 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
971                         struct timeval timeout, uint32_t destnode)
972 {
973         return ctdb_control_send(ctdb, destnode, 0,
974                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
975                            mem_ctx, &timeout, NULL);
976 }
977
978 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
979 {
980         int ret;
981         int32_t res;
982
983         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
984         if (ret != 0) {
985                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
986                 return -1;
987         }
988
989         if (recmaster) {
990                 *recmaster = (uint32_t)res;
991         }
992
993         return 0;
994 }
995
996 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
997 {
998         struct ctdb_client_control_state *state;
999
1000         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1001         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1002 }
1003
1004
1005 /*
1006   set the recovery master of a remote node
1007  */
1008 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1009 {
1010         int ret;
1011         TDB_DATA data;
1012         int32_t res;
1013
1014         ZERO_STRUCT(data);
1015         data.dsize = sizeof(uint32_t);
1016         data.dptr = (unsigned char *)&recmaster;
1017
1018         ret = ctdb_control(ctdb, destnode, 0,
1019                            CTDB_CONTROL_SET_RECMASTER, 0, data,
1020                            NULL, NULL, &res, &timeout, NULL);
1021         if (ret != 0 || res != 0) {
1022                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1023                 return -1;
1024         }
1025
1026         return 0;
1027 }
1028
1029
1030 /*
1031   get a list of databases off a remote node
1032  */
1033 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1034                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1035 {
1036         int ret;
1037         TDB_DATA outdata;
1038         int32_t res;
1039
1040         ret = ctdb_control(ctdb, destnode, 0,
1041                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1042                            mem_ctx, &outdata, &res, &timeout, NULL);
1043         if (ret != 0 || res != 0) {
1044                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1045                 return -1;
1046         }
1047
1048         *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1049         talloc_free(outdata.dptr);
1050
1051         return 0;
1052 }
1053
1054 /*
1055   get a list of nodes (vnn and flags ) from a remote node
1056  */
1057 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1058                 struct timeval timeout, uint32_t destnode,
1059                 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1060 {
1061         int ret;
1062         TDB_DATA outdata;
1063         int32_t res;
1064
1065         ret = ctdb_control(ctdb, destnode, 0,
1066                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1067                            mem_ctx, &outdata, &res, &timeout, NULL);
1068         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1069                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1070                 return -1;
1071         }
1072
1073         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1074         talloc_free(outdata.dptr);
1075         return 0;
1076 }
1077
1078 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb,
1079                            struct timeval timeout,
1080                            uint32_t destnode,
1081                            uint32_t *runstate)
1082 {
1083         TDB_DATA outdata;
1084         int32_t res;
1085         int ret;
1086
1087         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1088                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1089         if (ret != 0 || res != 0) {
1090                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1091                 return ret != 0 ? ret : res;
1092         }
1093
1094         if (outdata.dsize != sizeof(uint32_t)) {
1095                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1096                 talloc_free(outdata.dptr);
1097                 return -1;
1098         }
1099
1100         if (runstate != NULL) {
1101                 *runstate = *(uint32_t *)outdata.dptr;
1102         }
1103         talloc_free(outdata.dptr);
1104
1105         return 0;
1106 }
1107
1108 /*
1109   find the real path to a ltdb
1110  */
1111 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1112                    const char **path)
1113 {
1114         int ret;
1115         int32_t res;
1116         TDB_DATA data;
1117
1118         data.dptr = (uint8_t *)&dbid;
1119         data.dsize = sizeof(dbid);
1120
1121         ret = ctdb_control(ctdb, destnode, 0,
1122                            CTDB_CONTROL_GETDBPATH, 0, data,
1123                            mem_ctx, &data, &res, &timeout, NULL);
1124         if (ret != 0 || res != 0) {
1125                 return -1;
1126         }
1127
1128         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1129         if ((*path) == NULL) {
1130                 return -1;
1131         }
1132
1133         talloc_free(data.dptr);
1134
1135         return 0;
1136 }
1137
1138 /*
1139   find the name of a db
1140  */
1141 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1142                    const char **name)
1143 {
1144         int ret;
1145         int32_t res;
1146         TDB_DATA data;
1147
1148         data.dptr = (uint8_t *)&dbid;
1149         data.dsize = sizeof(dbid);
1150
1151         ret = ctdb_control(ctdb, destnode, 0,
1152                            CTDB_CONTROL_GET_DBNAME, 0, data,
1153                            mem_ctx, &data, &res, &timeout, NULL);
1154         if (ret != 0 || res != 0) {
1155                 return -1;
1156         }
1157
1158         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1159         if ((*name) == NULL) {
1160                 return -1;
1161         }
1162
1163         talloc_free(data.dptr);
1164
1165         return 0;
1166 }
1167
1168 /*
1169   create a database
1170  */
1171 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
1172                        uint32_t destnode, TALLOC_CTX *mem_ctx,
1173                        const char *name, uint8_t db_flags, uint32_t *db_id)
1174 {
1175         int ret;
1176         int32_t res;
1177         TDB_DATA data;
1178         uint32_t opcode;
1179
1180         data.dptr = discard_const(name);
1181         data.dsize = strlen(name)+1;
1182
1183         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
1184                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1185         } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
1186                 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1187         } else {
1188                 opcode = CTDB_CONTROL_DB_ATTACH;
1189         }
1190
1191         ret = ctdb_control(ctdb, destnode, 0, opcode, 0, data,
1192                            mem_ctx, &data, &res, &timeout, NULL);
1193
1194         if (ret != 0 || res != 0) {
1195                 return -1;
1196         }
1197
1198         if (data.dsize != sizeof(uint32_t)) {
1199                 TALLOC_FREE(data.dptr);
1200                 return -1;
1201         }
1202         if (db_id != NULL) {
1203                 *db_id = *(uint32_t *)data.dptr;
1204         }
1205         talloc_free(data.dptr);
1206
1207         return 0;
1208 }
1209
1210 /*
1211   get debug level on a node
1212  */
1213 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1214 {
1215         int ret;
1216         int32_t res;
1217         TDB_DATA data;
1218
1219         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1220                            ctdb, &data, &res, NULL, NULL);
1221         if (ret != 0 || res != 0) {
1222                 return -1;
1223         }
1224         if (data.dsize != sizeof(int32_t)) {
1225                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1226                          (unsigned)data.dsize));
1227                 return -1;
1228         }
1229         *level = *(int32_t *)data.dptr;
1230         talloc_free(data.dptr);
1231         return 0;
1232 }
1233
1234 /*
1235  * Get db open flags
1236  */
1237 int ctdb_ctrl_db_open_flags(struct ctdb_context *ctdb, uint32_t db_id,
1238                             int *tdb_flags)
1239 {
1240         TDB_DATA indata, outdata;
1241         int ret;
1242         int32_t res;
1243
1244         indata.dptr = (uint8_t *)&db_id;
1245         indata.dsize = sizeof(db_id);
1246
1247         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
1248                            CTDB_CONTROL_DB_OPEN_FLAGS, 0, indata,
1249                            ctdb, &outdata, &res, NULL, NULL);
1250         if (ret != 0 || res != 0) {
1251                 D_ERR("ctdb control for db open flags failed\n");
1252                 return  -1;
1253         }
1254
1255         if (outdata.dsize != sizeof(int32_t)) {
1256                 D_ERR(__location__ " expected %zi bytes, received %zi bytes\n",
1257                       sizeof(int32_t), outdata.dsize);
1258                 talloc_free(outdata.dptr);
1259                 return -1;
1260         }
1261
1262         *tdb_flags = *(int32_t *)outdata.dptr;
1263         talloc_free(outdata.dptr);
1264         return 0;
1265 }
1266
1267 /*
1268   attach to a specific database - client call
1269 */
1270 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1271                                     struct timeval timeout,
1272                                     const char *name,
1273                                     uint8_t db_flags)
1274 {
1275         struct ctdb_db_context *ctdb_db;
1276         int ret;
1277         int tdb_flags;
1278
1279         ctdb_db = ctdb_db_handle(ctdb, name);
1280         if (ctdb_db) {
1281                 return ctdb_db;
1282         }
1283
1284         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1285         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1286
1287         ctdb_db->ctdb = ctdb;
1288         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1289         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1290
1291         /* tell ctdb daemon to attach */
1292         ret = ctdb_ctrl_createdb(ctdb, timeout, CTDB_CURRENT_NODE,
1293                                  ctdb_db, name, db_flags, &ctdb_db->db_id);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1296                 talloc_free(ctdb_db);
1297                 return NULL;
1298         }
1299
1300         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1301         if (ret != 0) {
1302                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1303                 talloc_free(ctdb_db);
1304                 return NULL;
1305         }
1306
1307         ret = ctdb_ctrl_db_open_flags(ctdb, ctdb_db->db_id, &tdb_flags);
1308         if (ret != 0) {
1309                 D_ERR("Failed to get tdb_flags for database '%s'\n", name);
1310                 talloc_free(ctdb_db);
1311                 return NULL;
1312         }
1313
1314         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
1315                                       O_RDWR, 0);
1316         if (ctdb_db->ltdb == NULL) {
1317                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1318                 talloc_free(ctdb_db);
1319                 return NULL;
1320         }
1321
1322         ctdb_db->db_flags = db_flags;
1323
1324         DLIST_ADD(ctdb->db_list, ctdb_db);
1325
1326         /* add well known functions */
1327         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1328         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1329         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1330
1331         return ctdb_db;
1332 }
1333
1334 /*
1335   setup a call for a database
1336  */
1337 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1338 {
1339         struct ctdb_registered_call *call;
1340
1341         /* register locally */
1342         call = talloc(ctdb_db, struct ctdb_registered_call);
1343         call->fn = fn;
1344         call->id = id;
1345
1346         DLIST_ADD(ctdb_db->calls, call);
1347         return 0;
1348 }
1349
1350 /* Freeze all databases */
1351 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout,
1352                      uint32_t destnode)
1353 {
1354         int ret;
1355         int32_t res;
1356
1357         ret = ctdb_control(ctdb, destnode, 0,
1358                            CTDB_CONTROL_FREEZE, 0, tdb_null,
1359                            NULL, NULL, &res, &timeout, NULL);
1360         if (ret != 0 || res != 0) {
1361                 DEBUG(DEBUG_ERR, ("ctdb_ctrl_freeze_priority failed\n"));
1362                 return -1;
1363         }
1364
1365         return 0;
1366 }
1367
1368 /*
1369   get pnn of a node, or -1
1370  */
1371 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1372 {
1373         int ret;
1374         int32_t res;
1375
1376         ret = ctdb_control(ctdb, destnode, 0,
1377                            CTDB_CONTROL_GET_PNN, 0, tdb_null,
1378                            NULL, NULL, &res, &timeout, NULL);
1379         if (ret != 0) {
1380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1381                 return -1;
1382         }
1383
1384         return res;
1385 }
1386
1387 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1388                                    struct timeval timeout, uint32_t destnode,
1389                                    TALLOC_CTX *mem_ctx,
1390                                    uint32_t flags,
1391                                    struct ctdb_public_ip_list_old **ips)
1392 {
1393         int ret;
1394         TDB_DATA outdata;
1395         int32_t res;
1396
1397         ret = ctdb_control(ctdb, destnode, 0,
1398                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1399                            mem_ctx, &outdata, &res, &timeout, NULL);
1400         if (ret != 0 || res != 0) {
1401                 DEBUG(DEBUG_ERR,(__location__
1402                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
1403                                  ret, res));
1404                 return -1;
1405         }
1406
1407         *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1408         talloc_free(outdata.dptr);
1409
1410         return 0;
1411 }
1412
1413 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1414                              struct timeval timeout, uint32_t destnode,
1415                              TALLOC_CTX *mem_ctx,
1416                              struct ctdb_public_ip_list_old **ips)
1417 {
1418         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1419                                               destnode, mem_ctx,
1420                                               0, ips);
1421 }
1422
1423 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1424                          struct timeval timeout, uint32_t destnode,
1425                          TALLOC_CTX *mem_ctx,
1426                          struct ctdb_iface_list_old **_ifaces)
1427 {
1428         int ret;
1429         TDB_DATA outdata;
1430         int32_t res;
1431         struct ctdb_iface_list_old *ifaces;
1432         uint32_t len;
1433         uint32_t i;
1434
1435         ret = ctdb_control(ctdb, destnode, 0,
1436                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1437                            mem_ctx, &outdata, &res, &timeout, NULL);
1438         if (ret != 0 || res != 0) {
1439                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1440                                 "failed ret:%d res:%d\n",
1441                                 ret, res));
1442                 return -1;
1443         }
1444
1445         len = offsetof(struct ctdb_iface_list_old, ifaces);
1446         if (len > outdata.dsize) {
1447                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1448                                 "returned invalid data with size %u > %u\n",
1449                                 (unsigned int)outdata.dsize,
1450                                 (unsigned int)len));
1451                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1452                 return -1;
1453         }
1454
1455         ifaces = (struct ctdb_iface_list_old *)outdata.dptr;
1456         len += ifaces->num*sizeof(struct ctdb_iface);
1457
1458         if (len > outdata.dsize) {
1459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1460                                 "returned invalid data with size %u > %u\n",
1461                                 (unsigned int)outdata.dsize,
1462                                 (unsigned int)len));
1463                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1464                 return -1;
1465         }
1466
1467         /* make sure we null terminate the returned strings */
1468         for (i=0; i < ifaces->num; i++) {
1469                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1470         }
1471
1472         *_ifaces = (struct ctdb_iface_list_old *)talloc_memdup(mem_ctx,
1473                                                                   outdata.dptr,
1474                                                                   outdata.dsize);
1475         talloc_free(outdata.dptr);
1476         if (*_ifaces == NULL) {
1477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1478                                 "talloc_memdup size %u failed\n",
1479                                 (unsigned int)outdata.dsize));
1480                 return -1;
1481         }
1482
1483         return 0;
1484 }
1485
1486 /*
1487   set/clear the permanent disabled bit on a remote node
1488  */
1489 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1490                        uint32_t set, uint32_t clear)
1491 {
1492         int ret;
1493         TDB_DATA data;
1494         struct ctdb_node_map_old *nodemap=NULL;
1495         struct ctdb_node_flag_change c;
1496         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1497         uint32_t recmaster;
1498         uint32_t *nodes;
1499
1500
1501         /* find the recovery master */
1502         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1503         if (ret != 0) {
1504                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1505                 talloc_free(tmp_ctx);
1506                 return ret;
1507         }
1508
1509
1510         /* read the node flags from the recmaster */
1511         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1512         if (ret != 0) {
1513                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1514                 talloc_free(tmp_ctx);
1515                 return -1;
1516         }
1517         if (destnode >= nodemap->num) {
1518                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1519                 talloc_free(tmp_ctx);
1520                 return -1;
1521         }
1522
1523         c.pnn       = destnode;
1524         c.old_flags = nodemap->nodes[destnode].flags;
1525         c.new_flags = c.old_flags;
1526         c.new_flags |= set;
1527         c.new_flags &= ~clear;
1528
1529         data.dsize = sizeof(c);
1530         data.dptr = (unsigned char *)&c;
1531
1532         /* send the flags update to all connected nodes */
1533         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1534
1535         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1536                                         nodes, 0,
1537                                         timeout, false, data,
1538                                         NULL, NULL,
1539                                         NULL) != 0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1541
1542                 talloc_free(tmp_ctx);
1543                 return -1;
1544         }
1545
1546         talloc_free(tmp_ctx);
1547         return 0;
1548 }
1549
1550
1551 /*
1552   get all tunables
1553  */
1554 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
1555                                struct timeval timeout,
1556                                uint32_t destnode,
1557                                struct ctdb_tunable_list *tunables)
1558 {
1559         TDB_DATA outdata;
1560         int ret;
1561         int32_t res;
1562
1563         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1564                            &outdata, &res, &timeout, NULL);
1565         if (ret != 0 || res != 0) {
1566                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1567                 return -1;
1568         }
1569
1570         if (outdata.dsize != sizeof(*tunables)) {
1571                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1572                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1573                 return -1;
1574         }
1575
1576         *tunables = *(struct ctdb_tunable_list *)outdata.dptr;
1577         talloc_free(outdata.dptr);
1578         return 0;
1579 }
1580
1581 /*
1582   set some ctdb flags
1583 */
1584 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
1585 {
1586         ctdb->flags |= flags;
1587 }
1588
1589 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
1590 {
1591         return ctdb->daemon.name;
1592 }
1593
1594 /*
1595   return the pnn of this node
1596 */
1597 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
1598 {
1599         return ctdb->pnn;
1600 }
1601
1602 /*
1603   callback for the async helpers used when sending the same control
1604   to multiple nodes in parallell.
1605 */
1606 static void async_callback(struct ctdb_client_control_state *state)
1607 {
1608         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
1609         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
1610         int ret;
1611         TDB_DATA outdata;
1612         int32_t res = -1;
1613         uint32_t destnode = state->c->hdr.destnode;
1614
1615         outdata.dsize = 0;
1616         outdata.dptr = NULL;
1617
1618         /* one more node has responded with recmode data */
1619         data->count--;
1620
1621         /* if we failed to push the db, then return an error and let
1622            the main loop try again.
1623         */
1624         if (state->state != CTDB_CONTROL_DONE) {
1625                 if ( !data->dont_log_errors) {
1626                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
1627                 }
1628                 data->fail_count++;
1629                 if (state->state == CTDB_CONTROL_TIMEOUT) {
1630                         res = -ETIMEDOUT;
1631                 } else {
1632                         res = -1;
1633                 }
1634                 if (data->fail_callback) {
1635                         data->fail_callback(ctdb, destnode, res, outdata,
1636                                         data->callback_data);
1637                 }
1638                 return;
1639         }
1640
1641         state->async.fn = NULL;
1642
1643         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
1644         if ((ret != 0) || (res != 0)) {
1645                 if ( !data->dont_log_errors) {
1646                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
1647                 }
1648                 data->fail_count++;
1649                 if (data->fail_callback) {
1650                         data->fail_callback(ctdb, destnode, res, outdata,
1651                                         data->callback_data);
1652                 }
1653         }
1654         if ((ret == 0) && (data->callback != NULL)) {
1655                 data->callback(ctdb, destnode, res, outdata,
1656                                         data->callback_data);
1657         }
1658 }
1659
1660
1661 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
1662 {
1663         /* set up the callback functions */
1664         state->async.fn = async_callback;
1665         state->async.private_data = data;
1666
1667         /* one more control to wait for to complete */
1668         data->count++;
1669 }
1670
1671
1672 /* wait for up to the maximum number of seconds allowed
1673    or until all nodes we expect a response from has replied
1674 */
1675 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
1676 {
1677         while (data->count > 0) {
1678                 tevent_loop_once(ctdb->ev);
1679         }
1680         if (data->fail_count != 0) {
1681                 if (!data->dont_log_errors) {
1682                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
1683                                  data->fail_count));
1684                 }
1685                 return -1;
1686         }
1687         return 0;
1688 }
1689
1690
1691 /*
1692    perform a simple control on the listed nodes
1693    The control cannot return data
1694  */
1695 int ctdb_client_async_control(struct ctdb_context *ctdb,
1696                                 enum ctdb_controls opcode,
1697                                 uint32_t *nodes,
1698                                 uint64_t srvid,
1699                                 struct timeval timeout,
1700                                 bool dont_log_errors,
1701                                 TDB_DATA data,
1702                                 client_async_callback client_callback,
1703                                 client_async_callback fail_callback,
1704                                 void *callback_data)
1705 {
1706         struct client_async_data *async_data;
1707         struct ctdb_client_control_state *state;
1708         int j, num_nodes;
1709
1710         async_data = talloc_zero(ctdb, struct client_async_data);
1711         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
1712         async_data->dont_log_errors = dont_log_errors;
1713         async_data->callback = client_callback;
1714         async_data->fail_callback = fail_callback;
1715         async_data->callback_data = callback_data;
1716         async_data->opcode        = opcode;
1717
1718         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
1719
1720         /* loop over all nodes and send an async control to each of them */
1721         for (j=0; j<num_nodes; j++) {
1722                 uint32_t pnn = nodes[j];
1723
1724                 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
1725                                           0, data, async_data, &timeout, NULL);
1726                 if (state == NULL) {
1727                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
1728                         talloc_free(async_data);
1729                         return -1;
1730                 }
1731
1732                 ctdb_client_async_add(async_data, state);
1733         }
1734
1735         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
1736                 talloc_free(async_data);
1737                 return -1;
1738         }
1739
1740         talloc_free(async_data);
1741         return 0;
1742 }
1743
1744 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
1745                                 struct ctdb_vnn_map *vnn_map,
1746                                 TALLOC_CTX *mem_ctx,
1747                                 bool include_self)
1748 {
1749         unsigned int i, j, num_nodes;
1750         uint32_t *nodes;
1751
1752         for (i=num_nodes=0;i<vnn_map->size;i++) {
1753                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
1754                         continue;
1755                 }
1756                 num_nodes++;
1757         }
1758
1759         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
1760         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
1761
1762         for (i=j=0;i<vnn_map->size;i++) {
1763                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
1764                         continue;
1765                 }
1766                 nodes[j++] = vnn_map->map[i];
1767         }
1768
1769         return nodes;
1770 }
1771
1772 /* Get list of nodes not including those with flags specified by mask */
1773 static uint32_t *list_of_nodes(struct ctdb_context *ctdb,
1774                                struct ctdb_node_map_old *node_map,
1775                                TALLOC_CTX *mem_ctx,
1776                                uint32_t mask,
1777                                bool include_self)
1778 {
1779         unsigned int i, j, num_nodes;
1780         uint32_t exclude_pnn;
1781         uint32_t *nodes;
1782
1783         exclude_pnn = include_self ? CTDB_UNKNOWN_PNN : ctdb->pnn;
1784
1785         for (i=num_nodes=0;i<node_map->num;i++) {
1786                 if (node_map->nodes[i].flags & mask) {
1787                         continue;
1788                 }
1789                 if (node_map->nodes[i].pnn == exclude_pnn) {
1790                         continue;
1791                 }
1792                 num_nodes++;
1793         }
1794
1795         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
1796         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
1797
1798         for (i=j=0;i<node_map->num;i++) {
1799                 if (node_map->nodes[i].flags & mask) {
1800                         continue;
1801                 }
1802                 if (node_map->nodes[i].pnn == exclude_pnn) {
1803                         continue;
1804                 }
1805                 nodes[j++] = node_map->nodes[i].pnn;
1806         }
1807
1808         return nodes;
1809 }
1810
1811 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
1812                                 struct ctdb_node_map_old *node_map,
1813                                 TALLOC_CTX *mem_ctx,
1814                                 bool include_self)
1815 {
1816         return list_of_nodes(ctdb,
1817                              node_map,
1818                              mem_ctx,
1819                              NODE_FLAGS_INACTIVE,
1820                              include_self);
1821 }
1822
1823 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
1824                                 struct ctdb_node_map_old *node_map,
1825                                 TALLOC_CTX *mem_ctx,
1826                                 bool include_self)
1827 {
1828         return list_of_nodes(ctdb,
1829                              node_map,
1830                              mem_ctx,
1831                              NODE_FLAGS_DISCONNECTED,
1832                              include_self);
1833 }
1834
1835 /*
1836   get capabilities of a remote node
1837  */
1838 struct ctdb_client_control_state *
1839 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1840 {
1841         return ctdb_control_send(ctdb, destnode, 0,
1842                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
1843                            mem_ctx, &timeout, NULL);
1844 }
1845
1846 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
1847 {
1848         int ret;
1849         int32_t res;
1850         TDB_DATA outdata;
1851
1852         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
1853         if ( (ret != 0) || (res != 0) ) {
1854                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
1855                 return -1;
1856         }
1857
1858         if (capabilities) {
1859                 *capabilities = *((uint32_t *)outdata.dptr);
1860         }
1861
1862         return 0;
1863 }
1864
1865 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
1866 {
1867         struct ctdb_client_control_state *state;
1868         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1869         int ret;
1870
1871         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
1872         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
1873         talloc_free(tmp_ctx);
1874         return ret;
1875 }
1876
1877 static void get_capabilities_callback(struct ctdb_context *ctdb,
1878                                       uint32_t node_pnn, int32_t res,
1879                                       TDB_DATA outdata, void *callback_data)
1880 {
1881         struct ctdb_node_capabilities *caps =
1882                 talloc_get_type(callback_data,
1883                                 struct ctdb_node_capabilities);
1884
1885         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
1886                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
1887                 return;
1888         }
1889
1890         if (node_pnn >= talloc_array_length(caps)) {
1891                 DEBUG(DEBUG_ERR,
1892                       (__location__ " unexpected PNN %u\n", node_pnn));
1893                 return;
1894         }
1895
1896         caps[node_pnn].retrieved = true;
1897         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
1898 }
1899
1900 struct ctdb_node_capabilities *
1901 ctdb_get_capabilities(struct ctdb_context *ctdb,
1902                       TALLOC_CTX *mem_ctx,
1903                       struct timeval timeout,
1904                       struct ctdb_node_map_old *nodemap)
1905 {
1906         uint32_t *nodes;
1907         uint32_t i, res;
1908         struct ctdb_node_capabilities *ret;
1909
1910         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1911
1912         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
1913                            nodemap->num);
1914         CTDB_NO_MEMORY_NULL(ctdb, ret);
1915         /* Prepopulate the expected PNNs */
1916         for (i = 0; i < talloc_array_length(ret); i++) {
1917                 ret[i].retrieved = false;
1918         }
1919
1920         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
1921                                         nodes, 0, timeout,
1922                                         false, tdb_null,
1923                                         get_capabilities_callback, NULL,
1924                                         ret);
1925         if (res != 0) {
1926                 DEBUG(DEBUG_ERR,
1927                       (__location__ " Failed to read node capabilities.\n"));
1928                 TALLOC_FREE(ret);
1929         }
1930
1931         return ret;
1932 }
1933
1934 uint32_t *
1935 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
1936                            uint32_t pnn)
1937 {
1938         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
1939                 return &caps[pnn].capabilities;
1940         }
1941
1942         return NULL;
1943 }
1944
1945 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
1946                                 uint32_t pnn,
1947                                 uint32_t capabilities_required)
1948 {
1949         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
1950         return (capp != NULL) &&
1951                 ((*capp & capabilities_required) == capabilities_required);
1952 }
1953
1954 /*
1955   recovery daemon ping to main daemon
1956  */
1957 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
1958 {
1959         int ret;
1960         int32_t res;
1961
1962         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
1963                            ctdb, NULL, &res, NULL, NULL);
1964         if (ret != 0 || res != 0) {
1965                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
1966                 return -1;
1967         }
1968
1969         return 0;
1970 }
1971
1972 /*
1973   tell the main daemon how long it took to lock the reclock file
1974  */
1975 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
1976 {
1977         int ret;
1978         int32_t res;
1979         TDB_DATA data;
1980
1981         data.dptr = (uint8_t *)&latency;
1982         data.dsize = sizeof(latency);
1983
1984         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
1985                            ctdb, NULL, &res, NULL, NULL);
1986         if (ret != 0 || res != 0) {
1987                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
1988                 return -1;
1989         }
1990
1991         return 0;
1992 }
1993
1994 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout,
1995                       uint32_t destnode, struct ctdb_ban_state *bantime)
1996 {
1997         int ret;
1998         TDB_DATA data;
1999         int32_t res;
2000
2001         data.dsize = sizeof(*bantime);
2002         data.dptr  = (uint8_t *)bantime;
2003
2004         ret = ctdb_control(ctdb, destnode, 0,
2005                            CTDB_CONTROL_SET_BAN_STATE, 0, data,
2006                            NULL, NULL, &res, &timeout, NULL);
2007         if (ret != 0 || res != 0) {
2008                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
2009                 return -1;
2010         }
2011
2012         return 0;
2013 }
2014
2015 struct ctdb_client_control_state *
2016 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
2017 {
2018         struct ctdb_client_control_state *handle;
2019         struct ctdb_marshall_buffer *m;
2020         struct ctdb_rec_data_old *rec;
2021         TDB_DATA outdata;
2022
2023         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
2024         if (m == NULL) {
2025                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
2026                 return NULL;
2027         }
2028
2029         m->db_id = ctdb_db->db_id;
2030
2031         rec = ctdb_marshall_record(m, 0, key, header, data);
2032         if (rec == NULL) {
2033                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
2034                 talloc_free(m);
2035                 return NULL;
2036         }
2037         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
2038         if (m == NULL) {
2039                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
2040                 talloc_free(m);
2041                 return NULL;
2042         }
2043         m->count++;
2044         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
2045
2046
2047         outdata.dptr = (uint8_t *)m;
2048         outdata.dsize = talloc_get_size(m);
2049
2050         handle = ctdb_control_send(ctdb, destnode, 0,
2051                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
2052                            mem_ctx, &timeout, NULL);
2053         talloc_free(m);
2054         return handle;
2055 }
2056
2057 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
2058 {
2059         int ret;
2060         int32_t res;
2061
2062         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
2063         if ( (ret != 0) || (res != 0) ){
2064                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
2065                 return -1;
2066         }
2067
2068         return 0;
2069 }
2070
2071 int
2072 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
2073 {
2074         struct ctdb_client_control_state *state;
2075
2076         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
2077         return ctdb_ctrl_updaterecord_recv(ctdb, state);
2078 }