On RHEL, "service nfs stop;service nfs start" and "service nfs restart"
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         if (ctdb->daemon.queue == NULL) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup queue to daemon\n"));
287                 return -1;
288         }
289
290         return 0;
291 }
292
293
294 struct ctdb_record_handle {
295         struct ctdb_db_context *ctdb_db;
296         TDB_DATA key;
297         TDB_DATA *data;
298         struct ctdb_ltdb_header header;
299 };
300
301
302 /*
303   make a recv call to the local ctdb daemon - called from client context
304
305   This is called when the program wants to wait for a ctdb_call to complete and get the 
306   results. This call will block unless the call has already completed.
307 */
308 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
309 {
310         if (state == NULL) {
311                 return -1;
312         }
313
314         while (state->state < CTDB_CALL_DONE) {
315                 event_loop_once(state->ctdb_db->ctdb->ev);
316         }
317         if (state->state != CTDB_CALL_DONE) {
318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
319                 talloc_free(state);
320                 return -1;
321         }
322
323         if (state->call->reply_data.dsize) {
324                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
325                                                       state->call->reply_data.dptr,
326                                                       state->call->reply_data.dsize);
327                 call->reply_data.dsize = state->call->reply_data.dsize;
328         } else {
329                 call->reply_data.dptr = NULL;
330                 call->reply_data.dsize = 0;
331         }
332         call->status = state->call->status;
333         talloc_free(state);
334
335         return 0;
336 }
337
338
339
340
341 /*
342   destroy a ctdb_call in client
343 */
344 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
345 {
346         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
347         return 0;
348 }
349
350 /*
351   construct an event driven local ctdb_call
352
353   this is used so that locally processed ctdb_call requests are processed
354   in an event driven manner
355 */
356 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
357                                                                   struct ctdb_call *call,
358                                                                   struct ctdb_ltdb_header *header,
359                                                                   TDB_DATA *data)
360 {
361         struct ctdb_client_call_state *state;
362         struct ctdb_context *ctdb = ctdb_db->ctdb;
363         int ret;
364
365         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
366         CTDB_NO_MEMORY_NULL(ctdb, state);
367         state->call = talloc_zero(state, struct ctdb_call);
368         CTDB_NO_MEMORY_NULL(ctdb, state->call);
369
370         talloc_steal(state, data->dptr);
371
372         state->state   = CTDB_CALL_DONE;
373         *(state->call) = *call;
374         state->ctdb_db = ctdb_db;
375
376         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
377
378         return state;
379 }
380
381 /*
382   make a ctdb call to the local daemon - async send. Called from client context.
383
384   This constructs a ctdb_call request and queues it for processing. 
385   This call never blocks.
386 */
387 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
388                                               struct ctdb_call *call)
389 {
390         struct ctdb_client_call_state *state;
391         struct ctdb_context *ctdb = ctdb_db->ctdb;
392         struct ctdb_ltdb_header header;
393         TDB_DATA data;
394         int ret;
395         size_t len;
396         struct ctdb_req_call *c;
397
398         /* if the domain socket is not yet open, open it */
399         if (ctdb->daemon.sd==-1) {
400                 ctdb_socket_connect(ctdb);
401         }
402
403         ret = ctdb_ltdb_lock(ctdb_db, call->key);
404         if (ret != 0) {
405                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
406                 return NULL;
407         }
408
409         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
410
411         if (ret == 0 && header.dmaster == ctdb->pnn) {
412                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
413                 talloc_free(data.dptr);
414                 ctdb_ltdb_unlock(ctdb_db, call->key);
415                 return state;
416         }
417
418         ctdb_ltdb_unlock(ctdb_db, call->key);
419         talloc_free(data.dptr);
420
421         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
422         if (state == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
424                 return NULL;
425         }
426         state->call = talloc_zero(state, struct ctdb_call);
427         if (state->call == NULL) {
428                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
429                 return NULL;
430         }
431
432         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
433         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
434         if (c == NULL) {
435                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
436                 return NULL;
437         }
438
439         state->reqid     = ctdb_reqid_new(ctdb, state);
440         state->ctdb_db = ctdb_db;
441         talloc_set_destructor(state, ctdb_client_call_destructor);
442
443         c->hdr.reqid     = state->reqid;
444         c->flags         = call->flags;
445         c->db_id         = ctdb_db->db_id;
446         c->callid        = call->call_id;
447         c->hopcount      = 0;
448         c->keylen        = call->key.dsize;
449         c->calldatalen   = call->call_data.dsize;
450         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
451         memcpy(&c->data[call->key.dsize], 
452                call->call_data.dptr, call->call_data.dsize);
453         *(state->call)              = *call;
454         state->call->call_data.dptr = &c->data[call->key.dsize];
455         state->call->key.dptr       = &c->data[0];
456
457         state->state  = CTDB_CALL_WAIT;
458
459
460         ctdb_client_queue_pkt(ctdb, &c->hdr);
461
462         return state;
463 }
464
465
466 /*
467   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
468 */
469 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
470 {
471         struct ctdb_client_call_state *state;
472
473         state = ctdb_call_send(ctdb_db, call);
474         return ctdb_call_recv(state, call);
475 }
476
477
478 /*
479   tell the daemon what messaging srvid we will use, and register the message
480   handler function in the client
481 */
482 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
483                              ctdb_message_fn_t handler,
484                              void *private_data)
485                                     
486 {
487         int res;
488         int32_t status;
489         
490         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
491                            tdb_null, NULL, NULL, &status, NULL, NULL);
492         if (res != 0 || status != 0) {
493                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
494                 return -1;
495         }
496
497         /* also need to register the handler with our own ctdb structure */
498         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
499 }
500
501 /*
502   tell the daemon we no longer want a srvid
503 */
504 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
505 {
506         int res;
507         int32_t status;
508         
509         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
510                            tdb_null, NULL, NULL, &status, NULL, NULL);
511         if (res != 0 || status != 0) {
512                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
513                 return -1;
514         }
515
516         /* also need to register the handler with our own ctdb structure */
517         ctdb_deregister_message_handler(ctdb, srvid, private_data);
518         return 0;
519 }
520
521
522 /*
523   send a message - from client context
524  */
525 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
526                       uint64_t srvid, TDB_DATA data)
527 {
528         struct ctdb_req_message *r;
529         int len, res;
530
531         len = offsetof(struct ctdb_req_message, data) + data.dsize;
532         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
533                                len, struct ctdb_req_message);
534         CTDB_NO_MEMORY(ctdb, r);
535
536         r->hdr.destnode  = pnn;
537         r->srvid         = srvid;
538         r->datalen       = data.dsize;
539         memcpy(&r->data[0], data.dptr, data.dsize);
540         
541         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
542         if (res != 0) {
543                 return res;
544         }
545
546         talloc_free(r);
547         return 0;
548 }
549
550
551 /*
552   cancel a ctdb_fetch_lock operation, releasing the lock
553  */
554 static int fetch_lock_destructor(struct ctdb_record_handle *h)
555 {
556         ctdb_ltdb_unlock(h->ctdb_db, h->key);
557         return 0;
558 }
559
560 /*
561   force the migration of a record to this node
562  */
563 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
564 {
565         struct ctdb_call call;
566         ZERO_STRUCT(call);
567         call.call_id = CTDB_NULL_FUNC;
568         call.key = key;
569         call.flags = CTDB_IMMEDIATE_MIGRATION;
570         return ctdb_call(ctdb_db, &call);
571 }
572
573 /*
574   get a lock on a record, and return the records data. Blocks until it gets the lock
575  */
576 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
577                                            TDB_DATA key, TDB_DATA *data)
578 {
579         int ret;
580         struct ctdb_record_handle *h;
581
582         /*
583           procedure is as follows:
584
585           1) get the chain lock. 
586           2) check if we are dmaster
587           3) if we are the dmaster then return handle 
588           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
589              reply from ctdbd
590           5) when we get the reply, goto (1)
591          */
592
593         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
594         if (h == NULL) {
595                 return NULL;
596         }
597
598         h->ctdb_db = ctdb_db;
599         h->key     = key;
600         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
601         if (h->key.dptr == NULL) {
602                 talloc_free(h);
603                 return NULL;
604         }
605         h->data    = data;
606
607         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
608                  (const char *)key.dptr));
609
610 again:
611         /* step 1 - get the chain lock */
612         ret = ctdb_ltdb_lock(ctdb_db, key);
613         if (ret != 0) {
614                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
615                 talloc_free(h);
616                 return NULL;
617         }
618
619         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
620
621         talloc_set_destructor(h, fetch_lock_destructor);
622
623         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
624
625         /* when torturing, ensure we test the remote path */
626         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
627             random() % 5 == 0) {
628                 h->header.dmaster = (uint32_t)-1;
629         }
630
631
632         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
633
634         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
635                 ctdb_ltdb_unlock(ctdb_db, key);
636                 ret = ctdb_client_force_migration(ctdb_db, key);
637                 if (ret != 0) {
638                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
639                         talloc_free(h);
640                         return NULL;
641                 }
642                 goto again;
643         }
644
645         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
646         return h;
647 }
648
649 /*
650   store some data to the record that was locked with ctdb_fetch_lock()
651 */
652 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
653 {
654         if (h->ctdb_db->persistent) {
655                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
656                 return -1;
657         }
658
659         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
660 }
661
662 /*
663   non-locking fetch of a record
664  */
665 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
666                TDB_DATA key, TDB_DATA *data)
667 {
668         struct ctdb_call call;
669         int ret;
670
671         call.call_id = CTDB_FETCH_FUNC;
672         call.call_data.dptr = NULL;
673         call.call_data.dsize = 0;
674
675         ret = ctdb_call(ctdb_db, &call);
676
677         if (ret == 0) {
678                 *data = call.reply_data;
679                 talloc_steal(mem_ctx, data->dptr);
680         }
681
682         return ret;
683 }
684
685
686
687 /*
688    called when a control completes or timesout to invoke the callback
689    function the user provided
690 */
691 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
692         struct timeval t, void *private_data)
693 {
694         struct ctdb_client_control_state *state;
695         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
696         int ret;
697
698         state = talloc_get_type(private_data, struct ctdb_client_control_state);
699         talloc_steal(tmp_ctx, state);
700
701         ret = ctdb_control_recv(state->ctdb, state, state,
702                         NULL, 
703                         NULL, 
704                         NULL);
705
706         talloc_free(tmp_ctx);
707 }
708
709 /*
710   called when a CTDB_REPLY_CONTROL packet comes in in the client
711
712   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
713   contains any reply data from the control
714 */
715 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
716                                       struct ctdb_req_header *hdr)
717 {
718         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
719         struct ctdb_client_control_state *state;
720
721         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
722         if (state == NULL) {
723                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
724                 return;
725         }
726
727         if (hdr->reqid != state->reqid) {
728                 /* we found a record  but it was the wrong one */
729                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
730                 return;
731         }
732
733         state->outdata.dptr = c->data;
734         state->outdata.dsize = c->datalen;
735         state->status = c->status;
736         if (c->errorlen) {
737                 state->errormsg = talloc_strndup(state, 
738                                                  (char *)&c->data[c->datalen], 
739                                                  c->errorlen);
740         }
741
742         /* state->outdata now uses resources from c so we dont want c
743            to just dissappear from under us while state is still alive
744         */
745         talloc_steal(state, c);
746
747         state->state = CTDB_CONTROL_DONE;
748
749         /* if we had a callback registered for this control, pull the response
750            and call the callback.
751         */
752         if (state->async.fn) {
753                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
754         }
755 }
756
757
758 /*
759   destroy a ctdb_control in client
760 */
761 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
762 {
763         ctdb_reqid_remove(state->ctdb, state->reqid);
764         return 0;
765 }
766
767
768 /* time out handler for ctdb_control */
769 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
770         struct timeval t, void *private_data)
771 {
772         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
773
774         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
775                          "dstnode:%u\n", state->reqid, state->c->opcode,
776                          state->c->hdr.destnode));
777
778         state->state = CTDB_CONTROL_TIMEOUT;
779
780         /* if we had a callback registered for this control, pull the response
781            and call the callback.
782         */
783         if (state->async.fn) {
784                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
785         }
786 }
787
788 /* async version of send control request */
789 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
790                 uint32_t destnode, uint64_t srvid, 
791                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
792                 TALLOC_CTX *mem_ctx,
793                 struct timeval *timeout,
794                 char **errormsg)
795 {
796         struct ctdb_client_control_state *state;
797         size_t len;
798         struct ctdb_req_control *c;
799         int ret;
800
801         if (errormsg) {
802                 *errormsg = NULL;
803         }
804
805         /* if the domain socket is not yet open, open it */
806         if (ctdb->daemon.sd==-1) {
807                 ctdb_socket_connect(ctdb);
808         }
809
810         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
811         CTDB_NO_MEMORY_NULL(ctdb, state);
812
813         state->ctdb       = ctdb;
814         state->reqid      = ctdb_reqid_new(ctdb, state);
815         state->state      = CTDB_CONTROL_WAIT;
816         state->errormsg   = NULL;
817
818         talloc_set_destructor(state, ctdb_control_destructor);
819
820         len = offsetof(struct ctdb_req_control, data) + data.dsize;
821         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
822                                len, struct ctdb_req_control);
823         state->c            = c;        
824         CTDB_NO_MEMORY_NULL(ctdb, c);
825         c->hdr.reqid        = state->reqid;
826         c->hdr.destnode     = destnode;
827         c->opcode           = opcode;
828         c->client_id        = 0;
829         c->flags            = flags;
830         c->srvid            = srvid;
831         c->datalen          = data.dsize;
832         if (data.dsize) {
833                 memcpy(&c->data[0], data.dptr, data.dsize);
834         }
835
836         /* timeout */
837         if (timeout && !timeval_is_zero(timeout)) {
838                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
839         }
840
841         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
842         if (ret != 0) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to queue packet to ctdb daemon\n"));
844                 talloc_free(state);
845                 return NULL;
846         }
847
848         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
849                 talloc_free(state);
850                 return NULL;
851         }
852
853         return state;
854 }
855
856
857 /* async version of receive control reply */
858 int ctdb_control_recv(struct ctdb_context *ctdb, 
859                 struct ctdb_client_control_state *state, 
860                 TALLOC_CTX *mem_ctx,
861                 TDB_DATA *outdata, int32_t *status, char **errormsg)
862 {
863         TALLOC_CTX *tmp_ctx;
864
865         if (status != NULL) {
866                 *status = -1;
867         }
868         if (errormsg != NULL) {
869                 *errormsg = NULL;
870         }
871
872         if (state == NULL) {
873                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv called with state==NULL\n"));
874                 return -1;
875         }
876
877         /* prevent double free of state */
878         tmp_ctx = talloc_new(ctdb);
879         talloc_steal(tmp_ctx, state);
880
881         /* loop one event at a time until we either timeout or the control
882            completes.
883         */
884         while (state->state == CTDB_CONTROL_WAIT) {
885                 event_loop_once(ctdb->ev);
886         }
887
888         if (state->state != CTDB_CONTROL_DONE) {
889                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed with state:%d\n", state->state));
890                 if (state->async.fn) {
891                         state->async.fn(state);
892                 }
893                 talloc_free(tmp_ctx);
894                 return -2;
895         }
896
897         if (state->errormsg) {
898                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
899                 if (errormsg) {
900                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
901                 }
902                 if (state->async.fn) {
903                         state->async.fn(state);
904                 }
905                 talloc_free(tmp_ctx);
906                 return -3;
907         }
908
909         if (outdata) {
910                 *outdata = state->outdata;
911                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
912         }
913
914         if (status) {
915                 *status = state->status;
916         }
917
918         if (state->async.fn) {
919                 state->async.fn(state);
920         }
921
922         talloc_free(tmp_ctx);
923         return 0;
924 }
925
926
927
928 /*
929   send a ctdb control message
930   timeout specifies how long we should wait for a reply.
931   if timeout is NULL we wait indefinitely
932  */
933 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
934                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
935                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
936                  struct timeval *timeout,
937                  char **errormsg)
938 {
939         struct ctdb_client_control_state *state;
940
941         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
942                         flags, data, mem_ctx,
943                         timeout, errormsg);
944         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
945                         errormsg);
946 }
947
948
949
950
951 /*
952   a process exists call. Returns 0 if process exists, -1 otherwise
953  */
954 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
955 {
956         int ret;
957         TDB_DATA data;
958         int32_t status;
959
960         data.dptr = (uint8_t*)&pid;
961         data.dsize = sizeof(pid);
962
963         ret = ctdb_control(ctdb, destnode, 0, 
964                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
965                            NULL, NULL, &status, NULL, NULL);
966         if (ret != 0) {
967                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
968                 return -1;
969         }
970
971         return status;
972 }
973
974 /*
975   get remote statistics
976  */
977 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
978 {
979         int ret;
980         TDB_DATA data;
981         int32_t res;
982
983         ret = ctdb_control(ctdb, destnode, 0, 
984                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
985                            ctdb, &data, &res, NULL, NULL);
986         if (ret != 0 || res != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
988                 return -1;
989         }
990
991         if (data.dsize != sizeof(struct ctdb_statistics)) {
992                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
993                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
994                       return -1;
995         }
996
997         *status = *(struct ctdb_statistics *)data.dptr;
998         talloc_free(data.dptr);
999                         
1000         return 0;
1001 }
1002
1003 /*
1004   shutdown a remote ctdb node
1005  */
1006 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1007 {
1008         struct ctdb_client_control_state *state;
1009
1010         state = ctdb_control_send(ctdb, destnode, 0, 
1011                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1012                            NULL, &timeout, NULL);
1013         if (state == NULL) {
1014                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1015                 return -1;
1016         }
1017
1018         return 0;
1019 }
1020
1021 /*
1022   get vnn map from a remote node
1023  */
1024 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1025 {
1026         int ret;
1027         TDB_DATA outdata;
1028         int32_t res;
1029         struct ctdb_vnn_map_wire *map;
1030
1031         ret = ctdb_control(ctdb, destnode, 0, 
1032                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1033                            mem_ctx, &outdata, &res, &timeout, NULL);
1034         if (ret != 0 || res != 0) {
1035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1036                 return -1;
1037         }
1038         
1039         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1040         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1041             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1042                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1043                 return -1;
1044         }
1045
1046         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1047         CTDB_NO_MEMORY(ctdb, *vnnmap);
1048         (*vnnmap)->generation = map->generation;
1049         (*vnnmap)->size       = map->size;
1050         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1051
1052         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1053         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1054         talloc_free(outdata.dptr);
1055                     
1056         return 0;
1057 }
1058
1059
1060 /*
1061   get the recovery mode of a remote node
1062  */
1063 struct ctdb_client_control_state *
1064 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1065 {
1066         return ctdb_control_send(ctdb, destnode, 0, 
1067                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1068                            mem_ctx, &timeout, NULL);
1069 }
1070
1071 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1072 {
1073         int ret;
1074         int32_t res;
1075
1076         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1077         if (ret != 0) {
1078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1079                 return -1;
1080         }
1081
1082         if (recmode) {
1083                 *recmode = (uint32_t)res;
1084         }
1085
1086         return 0;
1087 }
1088
1089 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1090 {
1091         struct ctdb_client_control_state *state;
1092
1093         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1094         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1095 }
1096
1097
1098
1099
1100 /*
1101   set the recovery mode of a remote node
1102  */
1103 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1104 {
1105         int ret;
1106         TDB_DATA data;
1107         int32_t res;
1108
1109         data.dsize = sizeof(uint32_t);
1110         data.dptr = (unsigned char *)&recmode;
1111
1112         ret = ctdb_control(ctdb, destnode, 0, 
1113                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1114                            NULL, NULL, &res, &timeout, NULL);
1115         if (ret != 0 || res != 0) {
1116                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1117                 return -1;
1118         }
1119
1120         return 0;
1121 }
1122
1123
1124
1125 /*
1126   get the recovery master of a remote node
1127  */
1128 struct ctdb_client_control_state *
1129 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1130                         struct timeval timeout, uint32_t destnode)
1131 {
1132         struct ctdb_client_control_state *state;
1133
1134         state = ctdb_control_send(ctdb, destnode, 0, 
1135                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1136                            mem_ctx, &timeout, NULL);
1137         if (state == NULL) {
1138                 DEBUG(DEBUG_ERR,(__location__ " Failed to send getrecmaster control to node %u\n", destnode));
1139         }
1140
1141         return state;
1142 }
1143
1144 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1145 {
1146         int ret;
1147         int32_t res;
1148
1149         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1150         if (ret != 0) {
1151                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed with error:%d\n", ret));
1152                 return -1;
1153         }
1154
1155         if (recmaster) {
1156                 *recmaster = (uint32_t)res;
1157         }
1158
1159         return 0;
1160 }
1161
1162 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1163 {
1164         struct ctdb_client_control_state *state;
1165
1166         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1167         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1168 }
1169
1170
1171 /*
1172   set the recovery master of a remote node
1173  */
1174 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1175 {
1176         int ret;
1177         TDB_DATA data;
1178         int32_t res;
1179
1180         ZERO_STRUCT(data);
1181         data.dsize = sizeof(uint32_t);
1182         data.dptr = (unsigned char *)&recmaster;
1183
1184         ret = ctdb_control(ctdb, destnode, 0, 
1185                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1186                            NULL, NULL, &res, &timeout, NULL);
1187         if (ret != 0 || res != 0) {
1188                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1189                 return -1;
1190         }
1191
1192         return 0;
1193 }
1194
1195
1196 /*
1197   get a list of databases off a remote node
1198  */
1199 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1200                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1201 {
1202         int ret;
1203         TDB_DATA outdata;
1204         int32_t res;
1205
1206         ret = ctdb_control(ctdb, destnode, 0, 
1207                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1208                            mem_ctx, &outdata, &res, &timeout, NULL);
1209         if (ret != 0 || res != 0) {
1210                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1211                 return -1;
1212         }
1213
1214         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1215         talloc_free(outdata.dptr);
1216                     
1217         return 0;
1218 }
1219
1220 /*
1221   get a list of nodes (vnn and flags ) from a remote node
1222  */
1223 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1224                 struct timeval timeout, uint32_t destnode, 
1225                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1226 {
1227         int ret;
1228         TDB_DATA outdata;
1229         int32_t res;
1230
1231         ret = ctdb_control(ctdb, destnode, 0, 
1232                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1233                            mem_ctx, &outdata, &res, &timeout, NULL);
1234         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1235                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1236                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1237         }
1238         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1239                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1240                 return -1;
1241         }
1242
1243         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1244         talloc_free(outdata.dptr);
1245                     
1246         return 0;
1247 }
1248
1249 /*
1250   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1251  */
1252 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1253                 struct timeval timeout, uint32_t destnode, 
1254                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1255 {
1256         int ret, i, len;
1257         TDB_DATA outdata;
1258         struct ctdb_node_mapv4 *nodemapv4;
1259         int32_t res;
1260
1261         ret = ctdb_control(ctdb, destnode, 0, 
1262                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1263                            mem_ctx, &outdata, &res, &timeout, NULL);
1264         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1266                 return -1;
1267         }
1268
1269         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1270
1271         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1272         (*nodemap) = talloc_zero_size(mem_ctx, len);
1273         CTDB_NO_MEMORY(ctdb, (*nodemap));
1274
1275         (*nodemap)->num = nodemapv4->num;
1276         for (i=0; i<nodemapv4->num; i++) {
1277                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1278                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1279                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1280                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1281         }
1282                 
1283         talloc_free(outdata.dptr);
1284                     
1285         return 0;
1286 }
1287
1288 /*
1289   drop the transport, reload the nodes file and restart the transport
1290  */
1291 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1292                     struct timeval timeout, uint32_t destnode)
1293 {
1294         int ret;
1295         int32_t res;
1296
1297         ret = ctdb_control(ctdb, destnode, 0, 
1298                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1299                            NULL, NULL, &res, &timeout, NULL);
1300         if (ret != 0 || res != 0) {
1301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1302                 return -1;
1303         }
1304
1305         return 0;
1306 }
1307
1308
1309 /*
1310   set vnn map on a node
1311  */
1312 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1313                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1314 {
1315         int ret;
1316         TDB_DATA data;
1317         int32_t res;
1318         struct ctdb_vnn_map_wire *map;
1319         size_t len;
1320
1321         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1322         map = talloc_size(mem_ctx, len);
1323         CTDB_NO_MEMORY(ctdb, map);
1324
1325         map->generation = vnnmap->generation;
1326         map->size = vnnmap->size;
1327         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1328         
1329         data.dsize = len;
1330         data.dptr  = (uint8_t *)map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1334                            NULL, NULL, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1337                 return -1;
1338         }
1339
1340         talloc_free(map);
1341
1342         return 0;
1343 }
1344
1345
1346 /*
1347   async send for pull database
1348  */
1349 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1350         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1351         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1352 {
1353         TDB_DATA indata;
1354         struct ctdb_control_pulldb *pull;
1355         struct ctdb_client_control_state *state;
1356
1357         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1358         CTDB_NO_MEMORY_NULL(ctdb, pull);
1359
1360         pull->db_id   = dbid;
1361         pull->lmaster = lmaster;
1362
1363         indata.dsize = sizeof(struct ctdb_control_pulldb);
1364         indata.dptr  = (unsigned char *)pull;
1365
1366         state = ctdb_control_send(ctdb, destnode, 0, 
1367                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1368                                   mem_ctx, &timeout, NULL);
1369         talloc_free(pull);
1370
1371         return state;
1372 }
1373
1374 /*
1375   async recv for pull database
1376  */
1377 int ctdb_ctrl_pulldb_recv(
1378         struct ctdb_context *ctdb, 
1379         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1380         TDB_DATA *outdata)
1381 {
1382         int ret;
1383         int32_t res;
1384
1385         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1386         if ( (ret != 0) || (res != 0) ){
1387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1388                 return -1;
1389         }
1390
1391         return 0;
1392 }
1393
1394 /*
1395   pull all keys and records for a specific database on a node
1396  */
1397 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1398                 uint32_t dbid, uint32_t lmaster, 
1399                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1400                 TDB_DATA *outdata)
1401 {
1402         struct ctdb_client_control_state *state;
1403
1404         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1405                                       timeout);
1406         
1407         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1408 }
1409
1410
1411 /*
1412   change dmaster for all keys in the database to the new value
1413  */
1414 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1415                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1416 {
1417         int ret;
1418         TDB_DATA indata;
1419         int32_t res;
1420
1421         indata.dsize = 2*sizeof(uint32_t);
1422         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1423
1424         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1425         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1426
1427         ret = ctdb_control(ctdb, destnode, 0, 
1428                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1429                            NULL, NULL, &res, &timeout, NULL);
1430         if (ret != 0 || res != 0) {
1431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1432                 return -1;
1433         }
1434
1435         return 0;
1436 }
1437
1438 /*
1439   ping a node, return number of clients connected
1440  */
1441 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1442 {
1443         int ret;
1444         int32_t res;
1445
1446         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1447                            tdb_null, NULL, NULL, &res, NULL, NULL);
1448         if (ret != 0) {
1449                 return -1;
1450         }
1451         return res;
1452 }
1453
1454 /*
1455   find the real path to a ltdb 
1456  */
1457 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1458                    const char **path)
1459 {
1460         int ret;
1461         int32_t res;
1462         TDB_DATA data;
1463
1464         data.dptr = (uint8_t *)&dbid;
1465         data.dsize = sizeof(dbid);
1466
1467         ret = ctdb_control(ctdb, destnode, 0, 
1468                            CTDB_CONTROL_GETDBPATH, 0, data, 
1469                            mem_ctx, &data, &res, &timeout, NULL);
1470         if (ret != 0 || res != 0) {
1471                 return -1;
1472         }
1473
1474         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1475         if ((*path) == NULL) {
1476                 return -1;
1477         }
1478
1479         talloc_free(data.dptr);
1480
1481         return 0;
1482 }
1483
1484 /*
1485   find the name of a db 
1486  */
1487 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1488                    const char **name)
1489 {
1490         int ret;
1491         int32_t res;
1492         TDB_DATA data;
1493
1494         data.dptr = (uint8_t *)&dbid;
1495         data.dsize = sizeof(dbid);
1496
1497         ret = ctdb_control(ctdb, destnode, 0, 
1498                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1499                            mem_ctx, &data, &res, &timeout, NULL);
1500         if (ret != 0 || res != 0) {
1501                 return -1;
1502         }
1503
1504         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1505         if ((*name) == NULL) {
1506                 return -1;
1507         }
1508
1509         talloc_free(data.dptr);
1510
1511         return 0;
1512 }
1513
1514 /*
1515   get the health status of a db
1516  */
1517 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1518                           struct timeval timeout,
1519                           uint32_t destnode,
1520                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1521                           const char **reason)
1522 {
1523         int ret;
1524         int32_t res;
1525         TDB_DATA data;
1526
1527         data.dptr = (uint8_t *)&dbid;
1528         data.dsize = sizeof(dbid);
1529
1530         ret = ctdb_control(ctdb, destnode, 0,
1531                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1532                            mem_ctx, &data, &res, &timeout, NULL);
1533         if (ret != 0 || res != 0) {
1534                 return -1;
1535         }
1536
1537         if (data.dsize == 0) {
1538                 (*reason) = NULL;
1539                 return 0;
1540         }
1541
1542         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1543         if ((*reason) == NULL) {
1544                 return -1;
1545         }
1546
1547         talloc_free(data.dptr);
1548
1549         return 0;
1550 }
1551
1552 /*
1553   create a database
1554  */
1555 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1556                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1557 {
1558         int ret;
1559         int32_t res;
1560         TDB_DATA data;
1561
1562         data.dptr = discard_const(name);
1563         data.dsize = strlen(name)+1;
1564
1565         ret = ctdb_control(ctdb, destnode, 0, 
1566                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1567                            0, data, 
1568                            mem_ctx, &data, &res, &timeout, NULL);
1569
1570         if (ret != 0 || res != 0) {
1571                 return -1;
1572         }
1573
1574         return 0;
1575 }
1576
1577 /*
1578   get debug level on a node
1579  */
1580 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1581 {
1582         int ret;
1583         int32_t res;
1584         TDB_DATA data;
1585
1586         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1587                            ctdb, &data, &res, NULL, NULL);
1588         if (ret != 0 || res != 0) {
1589                 return -1;
1590         }
1591         if (data.dsize != sizeof(int32_t)) {
1592                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1593                          (unsigned)data.dsize));
1594                 return -1;
1595         }
1596         *level = *(int32_t *)data.dptr;
1597         talloc_free(data.dptr);
1598         return 0;
1599 }
1600
1601 /*
1602   set debug level on a node
1603  */
1604 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1605 {
1606         int ret;
1607         int32_t res;
1608         TDB_DATA data;
1609
1610         data.dptr = (uint8_t *)&level;
1611         data.dsize = sizeof(level);
1612
1613         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1614                            NULL, NULL, &res, NULL, NULL);
1615         if (ret != 0 || res != 0) {
1616                 return -1;
1617         }
1618         return 0;
1619 }
1620
1621
1622 /*
1623   get a list of connected nodes
1624  */
1625 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1626                                 struct timeval timeout,
1627                                 TALLOC_CTX *mem_ctx,
1628                                 uint32_t *num_nodes)
1629 {
1630         struct ctdb_node_map *map=NULL;
1631         int ret, i;
1632         uint32_t *nodes;
1633
1634         *num_nodes = 0;
1635
1636         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1637         if (ret != 0) {
1638                 return NULL;
1639         }
1640
1641         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1642         if (nodes == NULL) {
1643                 return NULL;
1644         }
1645
1646         for (i=0;i<map->num;i++) {
1647                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1648                         nodes[*num_nodes] = map->nodes[i].pnn;
1649                         (*num_nodes)++;
1650                 }
1651         }
1652
1653         return nodes;
1654 }
1655
1656
1657 /*
1658   reset remote status
1659  */
1660 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1661 {
1662         int ret;
1663         int32_t res;
1664
1665         ret = ctdb_control(ctdb, destnode, 0, 
1666                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1667                            NULL, NULL, &res, NULL, NULL);
1668         if (ret != 0 || res != 0) {
1669                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1670                 return -1;
1671         }
1672         return 0;
1673 }
1674
1675 /*
1676   this is the dummy null procedure that all databases support
1677 */
1678 static int ctdb_null_func(struct ctdb_call_info *call)
1679 {
1680         return 0;
1681 }
1682
1683 /*
1684   this is a plain fetch procedure that all databases support
1685 */
1686 static int ctdb_fetch_func(struct ctdb_call_info *call)
1687 {
1688         call->reply_data = &call->record_data;
1689         return 0;
1690 }
1691
1692 /*
1693   attach to a specific database - client call
1694 */
1695 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1696 {
1697         struct ctdb_db_context *ctdb_db;
1698         TDB_DATA data;
1699         int ret;
1700         int32_t res;
1701
1702         ctdb_db = ctdb_db_handle(ctdb, name);
1703         if (ctdb_db) {
1704                 return ctdb_db;
1705         }
1706
1707         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1708         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1709
1710         ctdb_db->ctdb = ctdb;
1711         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1712         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1713
1714         data.dptr = discard_const(name);
1715         data.dsize = strlen(name)+1;
1716
1717         /* tell ctdb daemon to attach */
1718         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1719                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1720                            0, data, ctdb_db, &data, &res, NULL, NULL);
1721         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1722                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1723                 talloc_free(ctdb_db);
1724                 return NULL;
1725         }
1726         
1727         ctdb_db->db_id = *(uint32_t *)data.dptr;
1728         talloc_free(data.dptr);
1729
1730         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1731         if (ret != 0) {
1732                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1733                 talloc_free(ctdb_db);
1734                 return NULL;
1735         }
1736
1737         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1738         if (ctdb->valgrinding) {
1739                 tdb_flags |= TDB_NOMMAP;
1740         }
1741         tdb_flags |= TDB_DISALLOW_NESTING;
1742
1743         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1744         if (ctdb_db->ltdb == NULL) {
1745                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1746                 talloc_free(ctdb_db);
1747                 return NULL;
1748         }
1749
1750         ctdb_db->persistent = persistent;
1751
1752         DLIST_ADD(ctdb->db_list, ctdb_db);
1753
1754         /* add well known functions */
1755         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1756         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1757
1758         return ctdb_db;
1759 }
1760
1761
1762 /*
1763   setup a call for a database
1764  */
1765 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1766 {
1767         struct ctdb_registered_call *call;
1768
1769 #if 0
1770         TDB_DATA data;
1771         int32_t status;
1772         struct ctdb_control_set_call c;
1773         int ret;
1774
1775         /* this is no longer valid with the separate daemon architecture */
1776         c.db_id = ctdb_db->db_id;
1777         c.fn    = fn;
1778         c.id    = id;
1779
1780         data.dptr = (uint8_t *)&c;
1781         data.dsize = sizeof(c);
1782
1783         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1784                            data, NULL, NULL, &status, NULL, NULL);
1785         if (ret != 0 || status != 0) {
1786                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1787                 return -1;
1788         }
1789 #endif
1790
1791         /* also register locally */
1792         call = talloc(ctdb_db, struct ctdb_registered_call);
1793         call->fn = fn;
1794         call->id = id;
1795
1796         DLIST_ADD(ctdb_db->calls, call);        
1797         return 0;
1798 }
1799
1800
1801 struct traverse_state {
1802         bool done;
1803         uint32_t count;
1804         ctdb_traverse_func fn;
1805         void *private_data;
1806 };
1807
1808 /*
1809   called on each key during a ctdb_traverse
1810  */
1811 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1812 {
1813         struct traverse_state *state = (struct traverse_state *)p;
1814         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1815         TDB_DATA key;
1816
1817         if (data.dsize < sizeof(uint32_t) ||
1818             d->length != data.dsize) {
1819                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1820                 state->done = True;
1821                 return;
1822         }
1823
1824         key.dsize = d->keylen;
1825         key.dptr  = &d->data[0];
1826         data.dsize = d->datalen;
1827         data.dptr = &d->data[d->keylen];
1828
1829         if (key.dsize == 0 && data.dsize == 0) {
1830                 /* end of traverse */
1831                 state->done = True;
1832                 return;
1833         }
1834
1835         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1836                 /* empty records are deleted records in ctdb */
1837                 return;
1838         }
1839
1840         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1841                 state->done = True;
1842         }
1843
1844         state->count++;
1845 }
1846
1847
1848 /*
1849   start a cluster wide traverse, calling the supplied fn on each record
1850   return the number of records traversed, or -1 on error
1851  */
1852 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1853 {
1854         TDB_DATA data;
1855         struct ctdb_traverse_start t;
1856         int32_t status;
1857         int ret;
1858         uint64_t srvid = (getpid() | 0xFLL<<60);
1859         struct traverse_state state;
1860
1861         state.done = False;
1862         state.count = 0;
1863         state.private_data = private_data;
1864         state.fn = fn;
1865
1866         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1867         if (ret != 0) {
1868                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1869                 return -1;
1870         }
1871
1872         t.db_id = ctdb_db->db_id;
1873         t.srvid = srvid;
1874         t.reqid = 0;
1875
1876         data.dptr = (uint8_t *)&t;
1877         data.dsize = sizeof(t);
1878
1879         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1880                            data, NULL, NULL, &status, NULL, NULL);
1881         if (ret != 0 || status != 0) {
1882                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1883                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1884                 return -1;
1885         }
1886
1887         while (!state.done) {
1888                 event_loop_once(ctdb_db->ctdb->ev);
1889         }
1890
1891         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1892         if (ret != 0) {
1893                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1894                 return -1;
1895         }
1896
1897         return state.count;
1898 }
1899
1900 #define ISASCII(x) ((x>31)&&(x<128))
1901 /*
1902   called on each key during a catdb
1903  */
1904 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1905 {
1906         int i;
1907         FILE *f = (FILE *)p;
1908         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1909
1910         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1911         for (i=0;i<key.dsize;i++) {
1912                 if (ISASCII(key.dptr[i])) {
1913                         fprintf(f, "%c", key.dptr[i]);
1914                 } else {
1915                         fprintf(f, "\\%02X", key.dptr[i]);
1916                 }
1917         }
1918         fprintf(f, "\"\n");
1919
1920         fprintf(f, "dmaster: %u\n", h->dmaster);
1921         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1922
1923         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1924         for (i=sizeof(*h);i<data.dsize;i++) {
1925                 if (ISASCII(data.dptr[i])) {
1926                         fprintf(f, "%c", data.dptr[i]);
1927                 } else {
1928                         fprintf(f, "\\%02X", data.dptr[i]);
1929                 }
1930         }
1931         fprintf(f, "\"\n");
1932
1933         fprintf(f, "\n");
1934
1935         return 0;
1936 }
1937
1938 /*
1939   convenience function to list all keys to stdout
1940  */
1941 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1942 {
1943         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1944 }
1945
1946 /*
1947   get the pid of a ctdb daemon
1948  */
1949 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1950 {
1951         int ret;
1952         int32_t res;
1953
1954         ret = ctdb_control(ctdb, destnode, 0, 
1955                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1956                            NULL, NULL, &res, &timeout, NULL);
1957         if (ret != 0) {
1958                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1959                 return -1;
1960         }
1961
1962         *pid = res;
1963
1964         return 0;
1965 }
1966
1967
1968 /*
1969   async freeze send control
1970  */
1971 struct ctdb_client_control_state *
1972 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1973 {
1974         return ctdb_control_send(ctdb, destnode, priority, 
1975                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1976                            mem_ctx, &timeout, NULL);
1977 }
1978
1979 /* 
1980    async freeze recv control
1981 */
1982 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1983 {
1984         int ret;
1985         int32_t res;
1986
1987         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1988         if ( (ret != 0) || (res != 0) ){
1989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1990                 return -1;
1991         }
1992
1993         return 0;
1994 }
1995
1996 /*
1997   freeze databases of a certain priority
1998  */
1999 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2000 {
2001         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2002         struct ctdb_client_control_state *state;
2003         int ret;
2004
2005         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2006         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2007         talloc_free(tmp_ctx);
2008
2009         return ret;
2010 }
2011
2012 /* Freeze all databases */
2013 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2014 {
2015         int i;
2016
2017         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2018                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2019                         return -1;
2020                 }
2021         }
2022         return 0;
2023 }
2024
2025 /*
2026   thaw databases of a certain priority
2027  */
2028 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2029 {
2030         int ret;
2031         int32_t res;
2032
2033         ret = ctdb_control(ctdb, destnode, priority, 
2034                            CTDB_CONTROL_THAW, 0, tdb_null, 
2035                            NULL, NULL, &res, &timeout, NULL);
2036         if (ret != 0 || res != 0) {
2037                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2038                 return -1;
2039         }
2040
2041         return 0;
2042 }
2043
2044 /* thaw all databases */
2045 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2046 {
2047         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2048 }
2049
2050 /*
2051   get pnn of a node, or -1
2052  */
2053 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2054 {
2055         int ret;
2056         int32_t res;
2057
2058         ret = ctdb_control(ctdb, destnode, 0, 
2059                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2060                            NULL, NULL, &res, &timeout, NULL);
2061         if (ret != 0) {
2062                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2063                 return -1;
2064         }
2065
2066         return res;
2067 }
2068
2069 /*
2070   get the monitoring mode of a remote node
2071  */
2072 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2073 {
2074         int ret;
2075         int32_t res;
2076
2077         ret = ctdb_control(ctdb, destnode, 0, 
2078                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2079                            NULL, NULL, &res, &timeout, NULL);
2080         if (ret != 0) {
2081                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2082                 return -1;
2083         }
2084
2085         *monmode = res;
2086
2087         return 0;
2088 }
2089
2090
2091 /*
2092  set the monitoring mode of a remote node to active
2093  */
2094 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2095 {
2096         int ret;
2097         
2098
2099         ret = ctdb_control(ctdb, destnode, 0, 
2100                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2101                            NULL, NULL,NULL, &timeout, NULL);
2102         if (ret != 0) {
2103                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2104                 return -1;
2105         }
2106
2107         
2108
2109         return 0;
2110 }
2111
2112 /*
2113   set the monitoring mode of a remote node to disable
2114  */
2115 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2116 {
2117         int ret;
2118         
2119
2120         ret = ctdb_control(ctdb, destnode, 0, 
2121                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2122                            NULL, NULL, NULL, &timeout, NULL);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2125                 return -1;
2126         }
2127
2128         
2129
2130         return 0;
2131 }
2132
2133
2134
2135 /* 
2136   sent to a node to make it take over an ip address
2137 */
2138 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2139                           uint32_t destnode, struct ctdb_public_ip *ip)
2140 {
2141         TDB_DATA data;
2142         struct ctdb_public_ipv4 ipv4;
2143         int ret;
2144         int32_t res;
2145
2146         if (ip->addr.sa.sa_family == AF_INET) {
2147                 ipv4.pnn = ip->pnn;
2148                 ipv4.sin = ip->addr.ip;
2149
2150                 data.dsize = sizeof(ipv4);
2151                 data.dptr  = (uint8_t *)&ipv4;
2152
2153                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2154                            NULL, &res, &timeout, NULL);
2155         } else {
2156                 data.dsize = sizeof(*ip);
2157                 data.dptr  = (uint8_t *)ip;
2158
2159                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2160                            NULL, &res, &timeout, NULL);
2161         }
2162
2163         if (ret != 0 || res != 0) {
2164                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2165                 return -1;
2166         }
2167
2168         return 0;       
2169 }
2170
2171
2172 /* 
2173   sent to a node to make it release an ip address
2174 */
2175 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2176                          uint32_t destnode, struct ctdb_public_ip *ip)
2177 {
2178         TDB_DATA data;
2179         struct ctdb_public_ipv4 ipv4;
2180         int ret;
2181         int32_t res;
2182
2183         if (ip->addr.sa.sa_family == AF_INET) {
2184                 ipv4.pnn = ip->pnn;
2185                 ipv4.sin = ip->addr.ip;
2186
2187                 data.dsize = sizeof(ipv4);
2188                 data.dptr  = (uint8_t *)&ipv4;
2189
2190                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2191                                    NULL, &res, &timeout, NULL);
2192         } else {
2193                 data.dsize = sizeof(*ip);
2194                 data.dptr  = (uint8_t *)ip;
2195
2196                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2197                                    NULL, &res, &timeout, NULL);
2198         }
2199
2200         if (ret != 0 || res != 0) {
2201                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2202                 return -1;
2203         }
2204
2205         return 0;       
2206 }
2207
2208
2209 /*
2210   get a tunable
2211  */
2212 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2213                           struct timeval timeout, 
2214                           uint32_t destnode,
2215                           const char *name, uint32_t *value)
2216 {
2217         struct ctdb_control_get_tunable *t;
2218         TDB_DATA data, outdata;
2219         int32_t res;
2220         int ret;
2221
2222         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2223         data.dptr  = talloc_size(ctdb, data.dsize);
2224         CTDB_NO_MEMORY(ctdb, data.dptr);
2225
2226         t = (struct ctdb_control_get_tunable *)data.dptr;
2227         t->length = strlen(name)+1;
2228         memcpy(t->name, name, t->length);
2229
2230         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2231                            &outdata, &res, &timeout, NULL);
2232         talloc_free(data.dptr);
2233         if (ret != 0 || res != 0) {
2234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2235                 return -1;
2236         }
2237
2238         if (outdata.dsize != sizeof(uint32_t)) {
2239                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2240                 talloc_free(outdata.dptr);
2241                 return -1;
2242         }
2243         
2244         *value = *(uint32_t *)outdata.dptr;
2245         talloc_free(outdata.dptr);
2246
2247         return 0;
2248 }
2249
2250 /*
2251   set a tunable
2252  */
2253 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2254                           struct timeval timeout, 
2255                           uint32_t destnode,
2256                           const char *name, uint32_t value)
2257 {
2258         struct ctdb_control_set_tunable *t;
2259         TDB_DATA data;
2260         int32_t res;
2261         int ret;
2262
2263         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2264         data.dptr  = talloc_size(ctdb, data.dsize);
2265         CTDB_NO_MEMORY(ctdb, data.dptr);
2266
2267         t = (struct ctdb_control_set_tunable *)data.dptr;
2268         t->length = strlen(name)+1;
2269         memcpy(t->name, name, t->length);
2270         t->value = value;
2271
2272         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2273                            NULL, &res, &timeout, NULL);
2274         talloc_free(data.dptr);
2275         if (ret != 0 || res != 0) {
2276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2277                 return -1;
2278         }
2279
2280         return 0;
2281 }
2282
2283 /*
2284   list tunables
2285  */
2286 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2287                             struct timeval timeout, 
2288                             uint32_t destnode,
2289                             TALLOC_CTX *mem_ctx,
2290                             const char ***list, uint32_t *count)
2291 {
2292         TDB_DATA outdata;
2293         int32_t res;
2294         int ret;
2295         struct ctdb_control_list_tunable *t;
2296         char *p, *s, *ptr;
2297
2298         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2299                            mem_ctx, &outdata, &res, &timeout, NULL);
2300         if (ret != 0 || res != 0) {
2301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2302                 return -1;
2303         }
2304
2305         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2306         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2307             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2308                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2309                 talloc_free(outdata.dptr);
2310                 return -1;              
2311         }
2312         
2313         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2314         CTDB_NO_MEMORY(ctdb, p);
2315
2316         talloc_free(outdata.dptr);
2317         
2318         (*list) = NULL;
2319         (*count) = 0;
2320
2321         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2322                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2323                 CTDB_NO_MEMORY(ctdb, *list);
2324                 (*list)[*count] = talloc_strdup(*list, s);
2325                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2326                 (*count)++;
2327         }
2328
2329         talloc_free(p);
2330
2331         return 0;
2332 }
2333
2334
2335 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2336                         struct timeval timeout, uint32_t destnode, 
2337                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2338 {
2339         int ret;
2340         TDB_DATA outdata;
2341         int32_t res;
2342
2343         ret = ctdb_control(ctdb, destnode, 0, 
2344                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2345                            mem_ctx, &outdata, &res, &timeout, NULL);
2346         if (ret == 0 && res == -1) {
2347                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2348                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2349         }
2350         if (ret != 0 || res != 0) {
2351           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2352                 return -1;
2353         }
2354
2355         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2356         talloc_free(outdata.dptr);
2357                     
2358         return 0;
2359 }
2360
2361 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2362                         struct timeval timeout, uint32_t destnode, 
2363                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2364 {
2365         int ret, i, len;
2366         TDB_DATA outdata;
2367         int32_t res;
2368         struct ctdb_all_public_ipsv4 *ipsv4;
2369
2370         ret = ctdb_control(ctdb, destnode, 0, 
2371                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2372                            mem_ctx, &outdata, &res, &timeout, NULL);
2373         if (ret != 0 || res != 0) {
2374                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2375                 return -1;
2376         }
2377
2378         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2379         len = offsetof(struct ctdb_all_public_ips, ips) +
2380                 ipsv4->num*sizeof(struct ctdb_public_ip);
2381         *ips = talloc_zero_size(mem_ctx, len);
2382         CTDB_NO_MEMORY(ctdb, *ips);
2383         (*ips)->num = ipsv4->num;
2384         for (i=0; i<ipsv4->num; i++) {
2385                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2386                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2387         }
2388
2389         talloc_free(outdata.dptr);
2390                     
2391         return 0;
2392 }
2393
2394 /*
2395   set/clear the permanent disabled bit on a remote node
2396  */
2397 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2398                        uint32_t set, uint32_t clear)
2399 {
2400         int ret;
2401         TDB_DATA data;
2402         struct ctdb_node_map *nodemap=NULL;
2403         struct ctdb_node_flag_change c;
2404         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2405         uint32_t recmaster;
2406         uint32_t *nodes;
2407
2408
2409         /* find the recovery master */
2410         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2411         if (ret != 0) {
2412                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2413                 talloc_free(tmp_ctx);
2414                 return ret;
2415         }
2416
2417
2418         /* read the node flags from the recmaster */
2419         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2420         if (ret != 0) {
2421                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2422                 talloc_free(tmp_ctx);
2423                 return -1;
2424         }
2425         if (destnode >= nodemap->num) {
2426                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2427                 talloc_free(tmp_ctx);
2428                 return -1;
2429         }
2430
2431         c.pnn       = destnode;
2432         c.old_flags = nodemap->nodes[destnode].flags;
2433         c.new_flags = c.old_flags;
2434         c.new_flags |= set;
2435         c.new_flags &= ~clear;
2436
2437         data.dsize = sizeof(c);
2438         data.dptr = (unsigned char *)&c;
2439
2440         /* send the flags update to all connected nodes */
2441         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2442
2443         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2444                                         nodes, 0,
2445                                         timeout, false, data,
2446                                         NULL, NULL,
2447                                         NULL) != 0) {
2448                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2449
2450                 talloc_free(tmp_ctx);
2451                 return -1;
2452         }
2453
2454         talloc_free(tmp_ctx);
2455         return 0;
2456 }
2457
2458
2459 /*
2460   get all tunables
2461  */
2462 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2463                                struct timeval timeout, 
2464                                uint32_t destnode,
2465                                struct ctdb_tunable *tunables)
2466 {
2467         TDB_DATA outdata;
2468         int ret;
2469         int32_t res;
2470
2471         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2472                            &outdata, &res, &timeout, NULL);
2473         if (ret != 0 || res != 0) {
2474                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2475                 return -1;
2476         }
2477
2478         if (outdata.dsize != sizeof(*tunables)) {
2479                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2480                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2481                 return -1;              
2482         }
2483
2484         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2485         talloc_free(outdata.dptr);
2486         return 0;
2487 }
2488
2489 /*
2490   add a public address to a node
2491  */
2492 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2493                       struct timeval timeout, 
2494                       uint32_t destnode,
2495                       struct ctdb_control_ip_iface *pub)
2496 {
2497         TDB_DATA data;
2498         int32_t res;
2499         int ret;
2500
2501         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2502         data.dptr  = (unsigned char *)pub;
2503
2504         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2505                            NULL, &res, &timeout, NULL);
2506         if (ret != 0 || res != 0) {
2507                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2508                 return -1;
2509         }
2510
2511         return 0;
2512 }
2513
2514 /*
2515   delete a public address from a node
2516  */
2517 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2518                       struct timeval timeout, 
2519                       uint32_t destnode,
2520                       struct ctdb_control_ip_iface *pub)
2521 {
2522         TDB_DATA data;
2523         int32_t res;
2524         int ret;
2525
2526         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2527         data.dptr  = (unsigned char *)pub;
2528
2529         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2530                            NULL, &res, &timeout, NULL);
2531         if (ret != 0 || res != 0) {
2532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2533                 return -1;
2534         }
2535
2536         return 0;
2537 }
2538
2539 /*
2540   kill a tcp connection
2541  */
2542 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2543                       struct timeval timeout, 
2544                       uint32_t destnode,
2545                       struct ctdb_control_killtcp *killtcp)
2546 {
2547         TDB_DATA data;
2548         int32_t res;
2549         int ret;
2550
2551         data.dsize = sizeof(struct ctdb_control_killtcp);
2552         data.dptr  = (unsigned char *)killtcp;
2553
2554         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2555                            NULL, &res, &timeout, NULL);
2556         if (ret != 0 || res != 0) {
2557                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2558                 return -1;
2559         }
2560
2561         return 0;
2562 }
2563
2564 /*
2565   send a gratious arp
2566  */
2567 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2568                       struct timeval timeout, 
2569                       uint32_t destnode,
2570                       ctdb_sock_addr *addr,
2571                       const char *ifname)
2572 {
2573         TDB_DATA data;
2574         int32_t res;
2575         int ret, len;
2576         struct ctdb_control_gratious_arp *gratious_arp;
2577         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2578
2579
2580         len = strlen(ifname)+1;
2581         gratious_arp = talloc_size(tmp_ctx, 
2582                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2583         CTDB_NO_MEMORY(ctdb, gratious_arp);
2584
2585         gratious_arp->addr = *addr;
2586         gratious_arp->len = len;
2587         memcpy(&gratious_arp->iface[0], ifname, len);
2588
2589
2590         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2591         data.dptr  = (unsigned char *)gratious_arp;
2592
2593         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2594                            NULL, &res, &timeout, NULL);
2595         if (ret != 0 || res != 0) {
2596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2597                 talloc_free(tmp_ctx);
2598                 return -1;
2599         }
2600
2601         talloc_free(tmp_ctx);
2602         return 0;
2603 }
2604
2605 /*
2606   get a list of all tcp tickles that a node knows about for a particular vnn
2607  */
2608 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2609                               struct timeval timeout, uint32_t destnode, 
2610                               TALLOC_CTX *mem_ctx, 
2611                               ctdb_sock_addr *addr,
2612                               struct ctdb_control_tcp_tickle_list **list)
2613 {
2614         int ret;
2615         TDB_DATA data, outdata;
2616         int32_t status;
2617
2618         data.dptr = (uint8_t*)addr;
2619         data.dsize = sizeof(ctdb_sock_addr);
2620
2621         ret = ctdb_control(ctdb, destnode, 0, 
2622                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2623                            mem_ctx, &outdata, &status, NULL, NULL);
2624         if (ret != 0 || status != 0) {
2625                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2626                 return -1;
2627         }
2628
2629         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2630
2631         return status;
2632 }
2633
2634 /*
2635   register a server id
2636  */
2637 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2638                       struct timeval timeout, 
2639                       struct ctdb_server_id *id)
2640 {
2641         TDB_DATA data;
2642         int32_t res;
2643         int ret;
2644
2645         data.dsize = sizeof(struct ctdb_server_id);
2646         data.dptr  = (unsigned char *)id;
2647
2648         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2649                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2650                         0, data, NULL,
2651                         NULL, &res, &timeout, NULL);
2652         if (ret != 0 || res != 0) {
2653                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2654                 return -1;
2655         }
2656
2657         return 0;
2658 }
2659
2660 /*
2661   unregister a server id
2662  */
2663 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2664                       struct timeval timeout, 
2665                       struct ctdb_server_id *id)
2666 {
2667         TDB_DATA data;
2668         int32_t res;
2669         int ret;
2670
2671         data.dsize = sizeof(struct ctdb_server_id);
2672         data.dptr  = (unsigned char *)id;
2673
2674         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2675                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2676                         0, data, NULL,
2677                         NULL, &res, &timeout, NULL);
2678         if (ret != 0 || res != 0) {
2679                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2680                 return -1;
2681         }
2682
2683         return 0;
2684 }
2685
2686
2687 /*
2688   check if a server id exists
2689
2690   if a server id does exist, return *status == 1, otherwise *status == 0
2691  */
2692 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2693                       struct timeval timeout, 
2694                       uint32_t destnode,
2695                       struct ctdb_server_id *id,
2696                       uint32_t *status)
2697 {
2698         TDB_DATA data;
2699         int32_t res;
2700         int ret;
2701
2702         data.dsize = sizeof(struct ctdb_server_id);
2703         data.dptr  = (unsigned char *)id;
2704
2705         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2706                         0, data, NULL,
2707                         NULL, &res, &timeout, NULL);
2708         if (ret != 0) {
2709                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2710                 return -1;
2711         }
2712
2713         if (res) {
2714                 *status = 1;
2715         } else {
2716                 *status = 0;
2717         }
2718
2719         return 0;
2720 }
2721
2722 /*
2723    get the list of server ids that are registered on a node
2724 */
2725 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2726                 TALLOC_CTX *mem_ctx,
2727                 struct timeval timeout, uint32_t destnode, 
2728                 struct ctdb_server_id_list **svid_list)
2729 {
2730         int ret;
2731         TDB_DATA outdata;
2732         int32_t res;
2733
2734         ret = ctdb_control(ctdb, destnode, 0, 
2735                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2736                            mem_ctx, &outdata, &res, &timeout, NULL);
2737         if (ret != 0 || res != 0) {
2738                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2739                 return -1;
2740         }
2741
2742         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2743                     
2744         return 0;
2745 }
2746
2747 /*
2748   initialise the ctdb daemon for client applications
2749
2750   NOTE: In current code the daemon does not fork. This is for testing purposes only
2751   and to simplify the code.
2752 */
2753 struct ctdb_context *ctdb_init(struct event_context *ev)
2754 {
2755         int ret;
2756         struct ctdb_context *ctdb;
2757
2758         ctdb = talloc_zero(ev, struct ctdb_context);
2759         if (ctdb == NULL) {
2760                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2761                 return NULL;
2762         }
2763         ctdb->ev  = ev;
2764         ctdb->idr = idr_init(ctdb);
2765         /* Wrap early to exercise code. */
2766         ctdb->lastid = INT_MAX-200;
2767         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2768
2769         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2770         if (ret != 0) {
2771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2772                 talloc_free(ctdb);
2773                 return NULL;
2774         }
2775
2776         ctdb->statistics.statistics_start_time = timeval_current();
2777
2778         return ctdb;
2779 }
2780
2781
2782 /*
2783   set some ctdb flags
2784 */
2785 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2786 {
2787         ctdb->flags |= flags;
2788 }
2789
2790 /*
2791   setup the local socket name
2792 */
2793 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2794 {
2795         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2796         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2797
2798         return 0;
2799 }
2800
2801 /*
2802   return the pnn of this node
2803 */
2804 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2805 {
2806         return ctdb->pnn;
2807 }
2808
2809
2810 /*
2811   get the uptime of a remote node
2812  */
2813 struct ctdb_client_control_state *
2814 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2815 {
2816         return ctdb_control_send(ctdb, destnode, 0, 
2817                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2818                            mem_ctx, &timeout, NULL);
2819 }
2820
2821 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2822 {
2823         int ret;
2824         int32_t res;
2825         TDB_DATA outdata;
2826
2827         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2828         if (ret != 0 || res != 0) {
2829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2830                 return -1;
2831         }
2832
2833         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2834
2835         return 0;
2836 }
2837
2838 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2839 {
2840         struct ctdb_client_control_state *state;
2841
2842         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2843         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2844 }
2845
2846 /*
2847   send a control to execute the "recovered" event script on a node
2848  */
2849 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2850 {
2851         int ret;
2852         int32_t status;
2853
2854         ret = ctdb_control(ctdb, destnode, 0, 
2855                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2856                            NULL, NULL, &status, &timeout, NULL);
2857         if (ret != 0 || status != 0) {
2858                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2859                 return -1;
2860         }
2861
2862         return 0;
2863 }
2864
2865 /* 
2866   callback for the async helpers used when sending the same control
2867   to multiple nodes in parallell.
2868 */
2869 static void async_callback(struct ctdb_client_control_state *state)
2870 {
2871         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2872         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2873         int ret;
2874         TDB_DATA outdata;
2875         int32_t res;
2876         uint32_t destnode = state->c->hdr.destnode;
2877
2878         /* one more node has responded with recmode data */
2879         data->count--;
2880
2881         /* if we failed to push the db, then return an error and let
2882            the main loop try again.
2883         */
2884         if (state->state != CTDB_CONTROL_DONE) {
2885                 if ( !data->dont_log_errors) {
2886                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2887                 }
2888                 data->fail_count++;
2889                 if (data->fail_callback) {
2890                         data->fail_callback(ctdb, destnode, res, outdata,
2891                                         data->callback_data);
2892                 }
2893                 return;
2894         }
2895         
2896         state->async.fn = NULL;
2897
2898         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2899         if ((ret != 0) || (res != 0)) {
2900                 if ( !data->dont_log_errors) {
2901                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2902                 }
2903                 data->fail_count++;
2904                 if (data->fail_callback) {
2905                         data->fail_callback(ctdb, destnode, res, outdata,
2906                                         data->callback_data);
2907                 }
2908         }
2909         if ((ret == 0) && (data->callback != NULL)) {
2910                 data->callback(ctdb, destnode, res, outdata,
2911                                         data->callback_data);
2912         }
2913 }
2914
2915
2916 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2917 {
2918         /* set up the callback functions */
2919         state->async.fn = async_callback;
2920         state->async.private_data = data;
2921         
2922         /* one more control to wait for to complete */
2923         data->count++;
2924 }
2925
2926
2927 /* wait for up to the maximum number of seconds allowed
2928    or until all nodes we expect a response from has replied
2929 */
2930 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2931 {
2932         while (data->count > 0) {
2933                 event_loop_once(ctdb->ev);
2934         }
2935         if (data->fail_count != 0) {
2936                 if (!data->dont_log_errors) {
2937                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2938                                  data->fail_count));
2939                 }
2940                 return -1;
2941         }
2942         return 0;
2943 }
2944
2945
2946 /* 
2947    perform a simple control on the listed nodes
2948    The control cannot return data
2949  */
2950 int ctdb_client_async_control(struct ctdb_context *ctdb,
2951                                 enum ctdb_controls opcode,
2952                                 uint32_t *nodes,
2953                                 uint64_t srvid,
2954                                 struct timeval timeout,
2955                                 bool dont_log_errors,
2956                                 TDB_DATA data,
2957                                 client_async_callback client_callback,
2958                                 client_async_callback fail_callback,
2959                                 void *callback_data)
2960 {
2961         struct client_async_data *async_data;
2962         struct ctdb_client_control_state *state;
2963         int j, num_nodes;
2964
2965         async_data = talloc_zero(ctdb, struct client_async_data);
2966         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2967         async_data->dont_log_errors = dont_log_errors;
2968         async_data->callback = client_callback;
2969         async_data->fail_callback = fail_callback;
2970         async_data->callback_data = callback_data;
2971         async_data->opcode        = opcode;
2972
2973         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2974
2975         /* loop over all nodes and send an async control to each of them */
2976         for (j=0; j<num_nodes; j++) {
2977                 uint32_t pnn = nodes[j];
2978
2979                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2980                                           0, data, async_data, &timeout, NULL);
2981                 if (state == NULL) {
2982                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2983                         talloc_free(async_data);
2984                         return -1;
2985                 }
2986                 
2987                 ctdb_client_async_add(async_data, state);
2988         }
2989
2990         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2991                 talloc_free(async_data);
2992                 return -1;
2993         }
2994
2995         talloc_free(async_data);
2996         return 0;
2997 }
2998
2999 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3000                                 struct ctdb_vnn_map *vnn_map,
3001                                 TALLOC_CTX *mem_ctx,
3002                                 bool include_self)
3003 {
3004         int i, j, num_nodes;
3005         uint32_t *nodes;
3006
3007         for (i=num_nodes=0;i<vnn_map->size;i++) {
3008                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3009                         continue;
3010                 }
3011                 num_nodes++;
3012         } 
3013
3014         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3015         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3016
3017         for (i=j=0;i<vnn_map->size;i++) {
3018                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3019                         continue;
3020                 }
3021                 nodes[j++] = vnn_map->map[i];
3022         } 
3023
3024         return nodes;
3025 }
3026
3027 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3028                                 struct ctdb_node_map *node_map,
3029                                 TALLOC_CTX *mem_ctx,
3030                                 bool include_self)
3031 {
3032         int i, j, num_nodes;
3033         uint32_t *nodes;
3034
3035         for (i=num_nodes=0;i<node_map->num;i++) {
3036                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3037                         continue;
3038                 }
3039                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3040                         continue;
3041                 }
3042                 num_nodes++;
3043         } 
3044
3045         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3046         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3047
3048         for (i=j=0;i<node_map->num;i++) {
3049                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3050                         continue;
3051                 }
3052                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3053                         continue;
3054                 }
3055                 nodes[j++] = node_map->nodes[i].pnn;
3056         } 
3057
3058         return nodes;
3059 }
3060
3061 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3062                                 struct ctdb_node_map *node_map,
3063                                 TALLOC_CTX *mem_ctx,
3064                                 uint32_t pnn)
3065 {
3066         int i, j, num_nodes;
3067         uint32_t *nodes;
3068
3069         for (i=num_nodes=0;i<node_map->num;i++) {
3070                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3071                         continue;
3072                 }
3073                 if (node_map->nodes[i].pnn == pnn) {
3074                         continue;
3075                 }
3076                 num_nodes++;
3077         } 
3078
3079         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3080         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3081
3082         for (i=j=0;i<node_map->num;i++) {
3083                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3084                         continue;
3085                 }
3086                 if (node_map->nodes[i].pnn == pnn) {
3087                         continue;
3088                 }
3089                 nodes[j++] = node_map->nodes[i].pnn;
3090         } 
3091
3092         return nodes;
3093 }
3094
3095 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3096                                 struct ctdb_node_map *node_map,
3097                                 TALLOC_CTX *mem_ctx,
3098                                 bool include_self)
3099 {
3100         int i, j, num_nodes;
3101         uint32_t *nodes;
3102
3103         for (i=num_nodes=0;i<node_map->num;i++) {
3104                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3105                         continue;
3106                 }
3107                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3108                         continue;
3109                 }
3110                 num_nodes++;
3111         } 
3112
3113         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3114         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3115
3116         for (i=j=0;i<node_map->num;i++) {
3117                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3118                         continue;
3119                 }
3120                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3121                         continue;
3122                 }
3123                 nodes[j++] = node_map->nodes[i].pnn;
3124         } 
3125
3126         return nodes;
3127 }
3128
3129 /* 
3130   this is used to test if a pnn lock exists and if it exists will return
3131   the number of connections that pnn has reported or -1 if that recovery
3132   daemon is not running.
3133 */
3134 int
3135 ctdb_read_pnn_lock(int fd, int32_t pnn)
3136 {
3137         struct flock lock;
3138         char c;
3139
3140         lock.l_type = F_WRLCK;
3141         lock.l_whence = SEEK_SET;
3142         lock.l_start = pnn;
3143         lock.l_len = 1;
3144         lock.l_pid = 0;
3145
3146         if (fcntl(fd, F_GETLK, &lock) != 0) {
3147                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3148                 return -1;
3149         }
3150
3151         if (lock.l_type == F_UNLCK) {
3152                 return -1;
3153         }
3154
3155         if (pread(fd, &c, 1, pnn) == -1) {
3156                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3157                 return -1;
3158         }
3159
3160         return c;
3161 }
3162
3163 /*
3164   get capabilities of a remote node
3165  */
3166 struct ctdb_client_control_state *
3167 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3168 {
3169         return ctdb_control_send(ctdb, destnode, 0, 
3170                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3171                            mem_ctx, &timeout, NULL);
3172 }
3173
3174 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3175 {
3176         int ret;
3177         int32_t res;
3178         TDB_DATA outdata;
3179
3180         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3181         if ( (ret != 0) || (res != 0) ) {
3182                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3183                 return -1;
3184         }
3185
3186         if (capabilities) {
3187                 *capabilities = *((uint32_t *)outdata.dptr);
3188         }
3189
3190         return 0;
3191 }
3192
3193 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3194 {
3195         struct ctdb_client_control_state *state;
3196         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3197         int ret;
3198
3199         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3200         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3201         talloc_free(tmp_ctx);
3202         return ret;
3203 }
3204
3205 /**
3206  * check whether a transaction is active on a given db on a given node
3207  */
3208 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3209                                      uint32_t destnode,
3210                                      uint32_t db_id)
3211 {
3212         int32_t status;
3213         int ret;
3214         TDB_DATA indata;
3215
3216         indata.dptr = (uint8_t *)&db_id;
3217         indata.dsize = sizeof(db_id);
3218
3219         ret = ctdb_control(ctdb, destnode, 0,
3220                            CTDB_CONTROL_TRANS2_ACTIVE,
3221                            0, indata, NULL, NULL, &status,
3222                            NULL, NULL);
3223
3224         if (ret != 0) {
3225                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3226                 return -1;
3227         }
3228
3229         return status;
3230 }
3231
3232
3233 struct ctdb_transaction_handle {
3234         struct ctdb_db_context *ctdb_db;
3235         bool in_replay;
3236         /*
3237          * we store the reads and writes done under a transaction:
3238          * - one list stores both reads and writes (m_all),
3239          * - the other just writes (m_write)
3240          */
3241         struct ctdb_marshall_buffer *m_all;
3242         struct ctdb_marshall_buffer *m_write;
3243 };
3244
3245 /* start a transaction on a database */
3246 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3247 {
3248         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3249         return 0;
3250 }
3251
3252 /* start a transaction on a database */
3253 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3254 {
3255         struct ctdb_record_handle *rh;
3256         TDB_DATA key;
3257         TDB_DATA data;
3258         struct ctdb_ltdb_header header;
3259         TALLOC_CTX *tmp_ctx;
3260         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3261         int ret;
3262         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3263         pid_t pid;
3264         int32_t status;
3265
3266         key.dptr = discard_const(keyname);
3267         key.dsize = strlen(keyname);
3268
3269         if (!ctdb_db->persistent) {
3270                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3271                 return -1;
3272         }
3273
3274 again:
3275         tmp_ctx = talloc_new(h);
3276
3277         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3278         if (rh == NULL) {
3279                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3280                 talloc_free(tmp_ctx);
3281                 return -1;
3282         }
3283
3284         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3285                                               CTDB_CURRENT_NODE,
3286                                               ctdb_db->db_id);
3287         if (status == 1) {
3288                 unsigned long int usec = (1000 + random()) % 100000;
3289                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3290                                     "on db_id[0x%08x]. waiting for %lu "
3291                                     "microseconds\n",
3292                                     ctdb_db->db_id, usec));
3293                 talloc_free(tmp_ctx);
3294                 usleep(usec);
3295                 goto again;
3296         }
3297
3298         /*
3299          * store the pid in the database:
3300          * it is not enough that the node is dmaster...
3301          */
3302         pid = getpid();
3303         data.dptr = (unsigned char *)&pid;
3304         data.dsize = sizeof(pid_t);
3305         rh->header.rsn++;
3306         rh->header.dmaster = ctdb_db->ctdb->pnn;
3307         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3308         if (ret != 0) {
3309                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3310                                   "transaction record\n"));
3311                 talloc_free(tmp_ctx);
3312                 return -1;
3313         }
3314
3315         talloc_free(rh);
3316
3317         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3318         if (ret != 0) {
3319                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3320                 talloc_free(tmp_ctx);
3321                 return -1;
3322         }
3323
3324         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3325         if (ret != 0) {
3326                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3327                                  "lock record inside transaction\n"));
3328                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3329                 talloc_free(tmp_ctx);
3330                 goto again;
3331         }
3332
3333         if (header.dmaster != ctdb_db->ctdb->pnn) {
3334                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3335                                    "transaction lock record\n"));
3336                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3337                 talloc_free(tmp_ctx);
3338                 goto again;
3339         }
3340
3341         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3342                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3343                                     "the transaction lock record\n"));
3344                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3345                 talloc_free(tmp_ctx);
3346                 goto again;
3347         }
3348
3349         talloc_free(tmp_ctx);
3350
3351         return 0;
3352 }
3353
3354
3355 /* start a transaction on a database */
3356 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3357                                                        TALLOC_CTX *mem_ctx)
3358 {
3359         struct ctdb_transaction_handle *h;
3360         int ret;
3361
3362         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3363         if (h == NULL) {
3364                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3365                 return NULL;
3366         }
3367
3368         h->ctdb_db = ctdb_db;
3369
3370         ret = ctdb_transaction_fetch_start(h);
3371         if (ret != 0) {
3372                 talloc_free(h);
3373                 return NULL;
3374         }
3375
3376         talloc_set_destructor(h, ctdb_transaction_destructor);
3377
3378         return h;
3379 }
3380
3381
3382
3383 /*
3384   fetch a record inside a transaction
3385  */
3386 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3387                            TALLOC_CTX *mem_ctx, 
3388                            TDB_DATA key, TDB_DATA *data)
3389 {
3390         struct ctdb_ltdb_header header;
3391         int ret;
3392
3393         ZERO_STRUCT(header);
3394
3395         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3396         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3397                 /* record doesn't exist yet */
3398                 *data = tdb_null;
3399                 ret = 0;
3400         }
3401         
3402         if (ret != 0) {
3403                 return ret;
3404         }
3405
3406         if (!h->in_replay) {
3407                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3408                 if (h->m_all == NULL) {
3409                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3410                         return -1;
3411                 }
3412         }
3413
3414         return 0;
3415 }
3416
3417 /*
3418   stores a record inside a transaction
3419  */
3420 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3421                            TDB_DATA key, TDB_DATA data)
3422 {
3423         TALLOC_CTX *tmp_ctx = talloc_new(h);
3424         struct ctdb_ltdb_header header;
3425         TDB_DATA olddata;
3426         int ret;
3427
3428         ZERO_STRUCT(header);
3429
3430         /* we need the header so we can update the RSN */
3431         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3432         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3433                 /* the record doesn't exist - create one with us as dmaster.
3434                    This is only safe because we are in a transaction and this
3435                    is a persistent database */
3436                 ZERO_STRUCT(header);
3437         } else if (ret != 0) {
3438                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3439                 talloc_free(tmp_ctx);
3440                 return ret;
3441         }
3442
3443         if (data.dsize == olddata.dsize &&
3444             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3445                 /* save writing the same data */
3446                 talloc_free(tmp_ctx);
3447                 return 0;
3448         }
3449
3450         header.dmaster = h->ctdb_db->ctdb->pnn;
3451         header.rsn++;
3452
3453         if (!h->in_replay) {
3454                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3455                 if (h->m_all == NULL) {
3456                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3457                         talloc_free(tmp_ctx);
3458                         return -1;
3459                 }
3460         }               
3461
3462         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3463         if (h->m_write == NULL) {
3464                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3465                 talloc_free(tmp_ctx);
3466                 return -1;
3467         }
3468         
3469         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3470
3471         talloc_free(tmp_ctx);
3472         
3473         return ret;
3474 }
3475
3476 /*
3477   replay a transaction
3478  */
3479 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3480 {
3481         int ret, i;
3482         struct ctdb_rec_data *rec = NULL;
3483
3484         h->in_replay = true;
3485         talloc_free(h->m_write);
3486         h->m_write = NULL;
3487
3488         ret = ctdb_transaction_fetch_start(h);
3489         if (ret != 0) {
3490                 return ret;
3491         }
3492
3493         for (i=0;i<h->m_all->count;i++) {
3494                 TDB_DATA key, data;
3495
3496                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3497                 if (rec == NULL) {
3498                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3499                         goto failed;
3500                 }
3501
3502                 if (rec->reqid == 0) {
3503                         /* its a store */
3504                         if (ctdb_transaction_store(h, key, data) != 0) {
3505                                 goto failed;
3506                         }
3507                 } else {
3508                         TDB_DATA data2;
3509                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3510
3511                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3512                                 talloc_free(tmp_ctx);
3513                                 goto failed;
3514                         }
3515                         if (data2.dsize != data.dsize ||
3516                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3517                                 /* the record has changed on us - we have to give up */
3518                                 talloc_free(tmp_ctx);
3519                                 goto failed;
3520                         }
3521                         talloc_free(tmp_ctx);
3522                 }
3523         }
3524         
3525         return 0;
3526
3527 failed:
3528         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3529         return -1;
3530 }
3531
3532
3533 /*
3534   commit a transaction
3535  */
3536 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3537 {
3538         int ret, retries=0;
3539         int32_t status;
3540         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3541         struct timeval timeout;
3542         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3543
3544         talloc_set_destructor(h, NULL);
3545
3546         /* our commit strategy is quite complex.
3547
3548            - we first try to commit the changes to all other nodes
3549
3550            - if that works, then we commit locally and we are done
3551
3552            - if a commit on another node fails, then we need to cancel
3553              the transaction, then restart the transaction (thus
3554              opening a window of time for a pending recovery to
3555              complete), then replay the transaction, checking all the
3556              reads and writes (checking that reads give the same data,
3557              and writes succeed). Then we retry the transaction to the
3558              other nodes
3559         */
3560
3561 again:
3562         if (h->m_write == NULL) {
3563                 /* no changes were made */
3564                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3565                 talloc_free(h);
3566                 return 0;
3567         }
3568
3569         /* tell ctdbd to commit to the other nodes */
3570         timeout = timeval_current_ofs(1, 0);
3571         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3572                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3573                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3574                            &timeout, NULL);
3575         if (ret != 0 || status != 0) {
3576                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3577                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3578                                      ", retrying after 1 second...\n",
3579                                      (retries==0)?"":"retry "));
3580                 sleep(1);
3581
3582                 if (ret != 0) {
3583                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3584                 } else {
3585                         /* work out what error code we will give if we 
3586                            have to fail the operation */
3587                         switch ((enum ctdb_trans2_commit_error)status) {
3588                         case CTDB_TRANS2_COMMIT_SUCCESS:
3589                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3590                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3591                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3592                                 break;
3593                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3594                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3595                                 break;
3596                         }
3597                 }
3598
3599                 if (++retries == 100) {
3600                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3601                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3602                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3603                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3604                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3605                         talloc_free(h);
3606                         return -1;
3607                 }               
3608
3609                 if (ctdb_replay_transaction(h) != 0) {
3610                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3611                                           "transaction on db 0x%08x, "
3612                                           "failure control =%u\n",
3613                                           h->ctdb_db->db_id,
3614                                           (unsigned)failure_control));
3615                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3616                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3617                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3618                         talloc_free(h);
3619                         return -1;
3620                 }
3621                 goto again;
3622         } else {
3623                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3624         }
3625
3626         /* do the real commit locally */
3627         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3628         if (ret != 0) {
3629                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3630                                   "on db id 0x%08x locally, "
3631                                   "failure_control=%u\n",
3632                                   h->ctdb_db->db_id,
3633                                   (unsigned)failure_control));
3634                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3635                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3636                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3637                 talloc_free(h);
3638                 return ret;
3639         }
3640
3641         /* tell ctdbd that we are finished with our local commit */
3642         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3643                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3644                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3645         talloc_free(h);
3646         return 0;
3647 }
3648
3649 /*
3650   recovery daemon ping to main daemon
3651  */
3652 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3653 {
3654         int ret;
3655         int32_t res;
3656
3657         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3658                            ctdb, NULL, &res, NULL, NULL);
3659         if (ret != 0 || res != 0) {
3660                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3661                 return -1;
3662         }
3663
3664         return 0;
3665 }
3666
3667 /* when forking the main daemon and the child process needs to connect back
3668  * to the daemon as a client process, this function can be used to change
3669  * the ctdb context from daemon into client mode
3670  */
3671 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3672 {
3673         int ret;
3674         va_list ap;
3675
3676         /* Add extra information so we can identify this in the logs */
3677         va_start(ap, fmt);
3678         debug_extra = talloc_append_string(NULL, talloc_vasprintf(NULL, fmt, ap), ":");
3679         va_end(ap);
3680
3681         /* shutdown the transport */
3682         if (ctdb->methods) {
3683                 ctdb->methods->shutdown(ctdb);
3684         }
3685
3686         /* get a new event context */
3687         talloc_free(ctdb->ev);
3688         ctdb->ev = event_context_init(ctdb);
3689
3690         close(ctdb->daemon.sd);
3691         ctdb->daemon.sd = -1;
3692
3693         /* initialise ctdb */
3694         ret = ctdb_socket_connect(ctdb);
3695         if (ret != 0) {
3696                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3697                 return -1;
3698         }
3699
3700          return 0;
3701 }
3702
3703 /*
3704   get the status of running the monitor eventscripts: NULL means never run.
3705  */
3706 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3707                 struct timeval timeout, uint32_t destnode, 
3708                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3709                 struct ctdb_scripts_wire **script_status)
3710 {
3711         int ret;
3712         TDB_DATA outdata, indata;
3713         int32_t res;
3714         uint32_t uinttype = type;
3715
3716         indata.dptr = (uint8_t *)&uinttype;
3717         indata.dsize = sizeof(uinttype);
3718
3719         ret = ctdb_control(ctdb, destnode, 0, 
3720                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3721                            mem_ctx, &outdata, &res, &timeout, NULL);
3722         if (ret != 0 || res != 0) {
3723                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3724                 return -1;
3725         }
3726
3727         if (outdata.dsize == 0) {
3728                 *script_status = NULL;
3729         } else {
3730                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3731                 talloc_free(outdata.dptr);
3732         }
3733                     
3734         return 0;
3735 }
3736
3737 /*
3738   tell the main daemon how long it took to lock the reclock file
3739  */
3740 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3741 {
3742         int ret;
3743         int32_t res;
3744         TDB_DATA data;
3745
3746         data.dptr = (uint8_t *)&latency;
3747         data.dsize = sizeof(latency);
3748
3749         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3750                            ctdb, NULL, &res, NULL, NULL);
3751         if (ret != 0 || res != 0) {
3752                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3753                 return -1;
3754         }
3755
3756         return 0;
3757 }
3758
3759 /*
3760   get the name of the reclock file
3761  */
3762 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3763                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3764                          const char **name)
3765 {
3766         int ret;
3767         int32_t res;
3768         TDB_DATA data;
3769
3770         ret = ctdb_control(ctdb, destnode, 0, 
3771                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3772                            mem_ctx, &data, &res, &timeout, NULL);
3773         if (ret != 0 || res != 0) {
3774                 return -1;
3775         }
3776
3777         if (data.dsize == 0) {
3778                 *name = NULL;
3779         } else {
3780                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3781         }
3782         talloc_free(data.dptr);
3783
3784         return 0;
3785 }
3786
3787 /*
3788   set the reclock filename for a node
3789  */
3790 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3791 {
3792         int ret;
3793         TDB_DATA data;
3794         int32_t res;
3795
3796         if (reclock == NULL) {
3797                 data.dsize = 0;
3798                 data.dptr  = NULL;
3799         } else {
3800                 data.dsize = strlen(reclock) + 1;
3801                 data.dptr  = discard_const(reclock);
3802         }
3803
3804         ret = ctdb_control(ctdb, destnode, 0, 
3805                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3806                            NULL, NULL, &res, &timeout, NULL);
3807         if (ret != 0 || res != 0) {
3808                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3809                 return -1;
3810         }
3811
3812         return 0;
3813 }
3814
3815 /*
3816   stop a node
3817  */
3818 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3819 {
3820         int ret;
3821         int32_t res;
3822
3823         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3824                            ctdb, NULL, &res, &timeout, NULL);
3825         if (ret != 0 || res != 0) {
3826                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3827                 return -1;
3828         }
3829
3830         return 0;
3831 }
3832
3833 /*
3834   continue a node
3835  */
3836 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3837 {
3838         int ret;
3839
3840         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3841                            ctdb, NULL, NULL, &timeout, NULL);
3842         if (ret != 0) {
3843                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3844                 return -1;
3845         }
3846
3847         return 0;
3848 }
3849
3850 /*
3851   set the natgw state for a node
3852  */
3853 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3854 {
3855         int ret;
3856         TDB_DATA data;
3857         int32_t res;
3858
3859         data.dsize = sizeof(natgwstate);
3860         data.dptr  = (uint8_t *)&natgwstate;
3861
3862         ret = ctdb_control(ctdb, destnode, 0, 
3863                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3864                            NULL, NULL, &res, &timeout, NULL);
3865         if (ret != 0 || res != 0) {
3866                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3867                 return -1;
3868         }
3869
3870         return 0;
3871 }
3872
3873 /*
3874   set the lmaster role for a node
3875  */
3876 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3877 {
3878         int ret;
3879         TDB_DATA data;
3880         int32_t res;
3881
3882         data.dsize = sizeof(lmasterrole);
3883         data.dptr  = (uint8_t *)&lmasterrole;
3884
3885         ret = ctdb_control(ctdb, destnode, 0, 
3886                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3887                            NULL, NULL, &res, &timeout, NULL);
3888         if (ret != 0 || res != 0) {
3889                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3890                 return -1;
3891         }
3892
3893         return 0;
3894 }
3895
3896 /*
3897   set the recmaster role for a node
3898  */
3899 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3900 {
3901         int ret;
3902         TDB_DATA data;
3903         int32_t res;
3904
3905         data.dsize = sizeof(recmasterrole);
3906         data.dptr  = (uint8_t *)&recmasterrole;
3907
3908         ret = ctdb_control(ctdb, destnode, 0, 
3909                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3910                            NULL, NULL, &res, &timeout, NULL);
3911         if (ret != 0 || res != 0) {
3912                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3913                 return -1;
3914         }
3915
3916         return 0;
3917 }
3918
3919 /* enable an eventscript
3920  */
3921 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3922 {
3923         int ret;
3924         TDB_DATA data;
3925         int32_t res;
3926
3927         data.dsize = strlen(script) + 1;
3928         data.dptr  = discard_const(script);
3929
3930         ret = ctdb_control(ctdb, destnode, 0, 
3931                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3932                            NULL, NULL, &res, &timeout, NULL);
3933         if (ret != 0 || res != 0) {
3934                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3935                 return -1;
3936         }
3937
3938         return 0;
3939 }
3940
3941 /* disable an eventscript
3942  */
3943 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3944 {
3945         int ret;
3946         TDB_DATA data;
3947         int32_t res;
3948
3949         data.dsize = strlen(script) + 1;
3950         data.dptr  = discard_const(script);
3951
3952         ret = ctdb_control(ctdb, destnode, 0, 
3953                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3954                            NULL, NULL, &res, &timeout, NULL);
3955         if (ret != 0 || res != 0) {
3956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3957                 return -1;
3958         }
3959
3960         return 0;
3961 }
3962
3963
3964 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3965 {
3966         int ret;
3967         TDB_DATA data;
3968         int32_t res;
3969
3970         data.dsize = sizeof(*bantime);
3971         data.dptr  = (uint8_t *)bantime;
3972
3973         ret = ctdb_control(ctdb, destnode, 0, 
3974                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3975                            NULL, NULL, &res, &timeout, NULL);
3976         if (ret != 0 || res != 0) {
3977                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3978                 return -1;
3979         }
3980
3981         return 0;
3982 }
3983
3984
3985 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3986 {
3987         int ret;
3988         TDB_DATA outdata;
3989         int32_t res;
3990         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3991
3992         ret = ctdb_control(ctdb, destnode, 0, 
3993                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3994                            tmp_ctx, &outdata, &res, &timeout, NULL);
3995         if (ret != 0 || res != 0) {
3996                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3997                 talloc_free(tmp_ctx);
3998                 return -1;
3999         }
4000
4001         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4002         talloc_free(tmp_ctx);
4003
4004         return 0;
4005 }
4006
4007
4008 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4009 {
4010         int ret;
4011         int32_t res;
4012         TDB_DATA data;
4013         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4014
4015         data.dptr = (uint8_t*)db_prio;
4016         data.dsize = sizeof(*db_prio);
4017
4018         ret = ctdb_control(ctdb, destnode, 0, 
4019                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4020                            tmp_ctx, NULL, &res, &timeout, NULL);
4021         if (ret != 0 || res != 0) {
4022                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4023                 talloc_free(tmp_ctx);
4024                 return -1;
4025         }
4026
4027         talloc_free(tmp_ctx);
4028
4029         return 0;
4030 }
4031
4032 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4033 {
4034         int ret;
4035         int32_t res;
4036         TDB_DATA data;
4037         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4038
4039         data.dptr = (uint8_t*)&db_id;
4040         data.dsize = sizeof(db_id);
4041
4042         ret = ctdb_control(ctdb, destnode, 0, 
4043                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4044                            tmp_ctx, NULL, &res, &timeout, NULL);
4045         if (ret != 0 || res < 0) {
4046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4047                 talloc_free(tmp_ctx);
4048                 return -1;
4049         }
4050
4051         if (priority) {
4052                 *priority = res;
4053         }
4054
4055         talloc_free(tmp_ctx);
4056
4057         return 0;
4058 }