client: Add ctdb_client_check_message_handlers() function
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   num*sizeof(uint8_t), outdata.dsize));
538                 talloc_free(outdata.dptr);
539                 return -1;
540         }
541
542         for (i=0; i<num; i++) {
543                 result[i] = outdata.dptr[i];
544         }
545
546         talloc_free(outdata.dptr);
547         return 0;
548 }
549
550 /*
551   send a message - from client context
552  */
553 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
554                       uint64_t srvid, TDB_DATA data)
555 {
556         struct ctdb_req_message *r;
557         int len, res;
558
559         len = offsetof(struct ctdb_req_message, data) + data.dsize;
560         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
561                                len, struct ctdb_req_message);
562         CTDB_NO_MEMORY(ctdb, r);
563
564         r->hdr.destnode  = pnn;
565         r->srvid         = srvid;
566         r->datalen       = data.dsize;
567         memcpy(&r->data[0], data.dptr, data.dsize);
568         
569         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
570         talloc_free(r);
571         return res;
572 }
573
574
575 /*
576   cancel a ctdb_fetch_lock operation, releasing the lock
577  */
578 static int fetch_lock_destructor(struct ctdb_record_handle *h)
579 {
580         ctdb_ltdb_unlock(h->ctdb_db, h->key);
581         return 0;
582 }
583
584 /*
585   force the migration of a record to this node
586  */
587 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
588 {
589         struct ctdb_call call;
590         ZERO_STRUCT(call);
591         call.call_id = CTDB_NULL_FUNC;
592         call.key = key;
593         call.flags = CTDB_IMMEDIATE_MIGRATION;
594         return ctdb_call(ctdb_db, &call);
595 }
596
597 /*
598   try to fetch a readonly copy of a record
599  */
600 static int
601 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
602 {
603         int ret;
604
605         struct ctdb_call call;
606         ZERO_STRUCT(call);
607
608         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
609         call.call_data.dptr = NULL;
610         call.call_data.dsize = 0;
611         call.key = key;
612         call.flags = CTDB_WANT_READONLY;
613         ret = ctdb_call(ctdb_db, &call);
614
615         if (ret != 0) {
616                 return -1;
617         }
618         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 return -1;
620         }
621
622         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
623         if (*hdr == NULL) {
624                 talloc_free(call.reply_data.dptr);
625                 return -1;
626         }
627
628         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
629         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
630         if (data->dptr == NULL) {
631                 talloc_free(call.reply_data.dptr);
632                 talloc_free(hdr);
633                 return -1;
634         }
635
636         return 0;
637 }
638
639 /*
640   get a lock on a record, and return the records data. Blocks until it gets the lock
641  */
642 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
643                                            TDB_DATA key, TDB_DATA *data)
644 {
645         int ret;
646         struct ctdb_record_handle *h;
647
648         /*
649           procedure is as follows:
650
651           1) get the chain lock. 
652           2) check if we are dmaster
653           3) if we are the dmaster then return handle 
654           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
655              reply from ctdbd
656           5) when we get the reply, goto (1)
657          */
658
659         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
660         if (h == NULL) {
661                 return NULL;
662         }
663
664         h->ctdb_db = ctdb_db;
665         h->key     = key;
666         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
667         if (h->key.dptr == NULL) {
668                 talloc_free(h);
669                 return NULL;
670         }
671         h->data    = data;
672
673         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
674                  (const char *)key.dptr));
675
676 again:
677         /* step 1 - get the chain lock */
678         ret = ctdb_ltdb_lock(ctdb_db, key);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
681                 talloc_free(h);
682                 return NULL;
683         }
684
685         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
686
687         talloc_set_destructor(h, fetch_lock_destructor);
688
689         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
690
691         /* when torturing, ensure we test the remote path */
692         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
693             random() % 5 == 0) {
694                 h->header.dmaster = (uint32_t)-1;
695         }
696
697
698         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
699
700         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
701                 ctdb_ltdb_unlock(ctdb_db, key);
702                 ret = ctdb_client_force_migration(ctdb_db, key);
703                 if (ret != 0) {
704                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
705                         talloc_free(h);
706                         return NULL;
707                 }
708                 goto again;
709         }
710
711         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
712         return h;
713 }
714
715 /*
716   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
717  */
718 struct ctdb_record_handle *
719 ctdb_fetch_readonly_lock(
720         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
721         TDB_DATA key, TDB_DATA *data,
722         int read_only)
723 {
724         int ret;
725         struct ctdb_record_handle *h;
726         struct ctdb_ltdb_header *roheader = NULL;
727
728         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
729         if (h == NULL) {
730                 return NULL;
731         }
732
733         h->ctdb_db = ctdb_db;
734         h->key     = key;
735         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
736         if (h->key.dptr == NULL) {
737                 talloc_free(h);
738                 return NULL;
739         }
740         h->data    = data;
741
742         data->dptr = NULL;
743         data->dsize = 0;
744
745
746 again:
747         talloc_free(roheader);
748         roheader = NULL;
749
750         talloc_free(data->dptr);
751         data->dptr = NULL;
752         data->dsize = 0;
753
754         /* Lock the record/chain */
755         ret = ctdb_ltdb_lock(ctdb_db, key);
756         if (ret != 0) {
757                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
758                 talloc_free(h);
759                 return NULL;
760         }
761
762         talloc_set_destructor(h, fetch_lock_destructor);
763
764         /* Check if record exists yet in the TDB */
765         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
766         if (ret != 0) {
767                 ctdb_ltdb_unlock(ctdb_db, key);
768                 ret = ctdb_client_force_migration(ctdb_db, key);
769                 if (ret != 0) {
770                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
771                         talloc_free(h);
772                         return NULL;
773                 }
774                 goto again;
775         }
776
777         /* if this is a request for read/write and we have delegations
778            we have to revoke all delegations first
779         */
780         if ((read_only == 0) 
781         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
782         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
783                 ctdb_ltdb_unlock(ctdb_db, key);
784                 ret = ctdb_client_force_migration(ctdb_db, key);
785                 if (ret != 0) {
786                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
787                         talloc_free(h);
788                         return NULL;
789                 }
790                 goto again;
791         }
792
793         /* if we are dmaster, just return the handle */
794         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
795                 return h;
796         }
797
798         if (read_only != 0) {
799                 TDB_DATA rodata = {NULL, 0};
800
801                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
802                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
803                         return h;
804                 }
805
806                 ctdb_ltdb_unlock(ctdb_db, key);
807                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
808                 if (ret != 0) {
809                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
810                         ret = ctdb_client_force_migration(ctdb_db, key);
811                         if (ret != 0) {
812                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
813                                 talloc_free(h);
814                                 return NULL;
815                         }
816
817                         goto again;
818                 }
819
820                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
821                         ret = ctdb_client_force_migration(ctdb_db, key);
822                         if (ret != 0) {
823                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
824                                 talloc_free(h);
825                                 return NULL;
826                         }
827
828                         goto again;
829                 }
830
831                 ret = ctdb_ltdb_lock(ctdb_db, key);
832                 if (ret != 0) {
833                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
834                         talloc_free(h);
835                         return NULL;
836                 }
837
838                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
839                 if (ret != 0) {
840                         ctdb_ltdb_unlock(ctdb_db, key);
841
842                         ret = ctdb_client_force_migration(ctdb_db, key);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
845                                 talloc_free(h);
846                                 return NULL;
847                         }
848
849                         goto again;
850                 }
851
852                 return h;
853         }
854
855         /* we are not dmaster and this was not a request for a readonly lock
856          * so unlock the record, migrate it and try again
857          */
858         ctdb_ltdb_unlock(ctdb_db, key);
859         ret = ctdb_client_force_migration(ctdb_db, key);
860         if (ret != 0) {
861                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
862                 talloc_free(h);
863                 return NULL;
864         }
865         goto again;
866 }
867
868 /*
869   store some data to the record that was locked with ctdb_fetch_lock()
870 */
871 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
872 {
873         if (h->ctdb_db->persistent) {
874                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
875                 return -1;
876         }
877
878         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
879 }
880
881 /*
882   non-locking fetch of a record
883  */
884 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
885                TDB_DATA key, TDB_DATA *data)
886 {
887         struct ctdb_call call;
888         int ret;
889
890         call.call_id = CTDB_FETCH_FUNC;
891         call.call_data.dptr = NULL;
892         call.call_data.dsize = 0;
893         call.key = key;
894
895         ret = ctdb_call(ctdb_db, &call);
896
897         if (ret == 0) {
898                 *data = call.reply_data;
899                 talloc_steal(mem_ctx, data->dptr);
900         }
901
902         return ret;
903 }
904
905
906
907 /*
908    called when a control completes or timesout to invoke the callback
909    function the user provided
910 */
911 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
912         struct timeval t, void *private_data)
913 {
914         struct ctdb_client_control_state *state;
915         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
916         int ret;
917
918         state = talloc_get_type(private_data, struct ctdb_client_control_state);
919         talloc_steal(tmp_ctx, state);
920
921         ret = ctdb_control_recv(state->ctdb, state, state,
922                         NULL, 
923                         NULL, 
924                         NULL);
925         if (ret != 0) {
926                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
927         }
928
929         talloc_free(tmp_ctx);
930 }
931
932 /*
933   called when a CTDB_REPLY_CONTROL packet comes in in the client
934
935   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
936   contains any reply data from the control
937 */
938 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
939                                       struct ctdb_req_header *hdr)
940 {
941         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
942         struct ctdb_client_control_state *state;
943
944         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
945         if (state == NULL) {
946                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
947                 return;
948         }
949
950         if (hdr->reqid != state->reqid) {
951                 /* we found a record  but it was the wrong one */
952                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
953                 return;
954         }
955
956         state->outdata.dptr = c->data;
957         state->outdata.dsize = c->datalen;
958         state->status = c->status;
959         if (c->errorlen) {
960                 state->errormsg = talloc_strndup(state, 
961                                                  (char *)&c->data[c->datalen], 
962                                                  c->errorlen);
963         }
964
965         /* state->outdata now uses resources from c so we dont want c
966            to just dissappear from under us while state is still alive
967         */
968         talloc_steal(state, c);
969
970         state->state = CTDB_CONTROL_DONE;
971
972         /* if we had a callback registered for this control, pull the response
973            and call the callback.
974         */
975         if (state->async.fn) {
976                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
977         }
978 }
979
980
981 /*
982   destroy a ctdb_control in client
983 */
984 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
985 {
986         ctdb_reqid_remove(state->ctdb, state->reqid);
987         return 0;
988 }
989
990
991 /* time out handler for ctdb_control */
992 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
993         struct timeval t, void *private_data)
994 {
995         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
996
997         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
998                          "dstnode:%u\n", state->reqid, state->c->opcode,
999                          state->c->hdr.destnode));
1000
1001         state->state = CTDB_CONTROL_TIMEOUT;
1002
1003         /* if we had a callback registered for this control, pull the response
1004            and call the callback.
1005         */
1006         if (state->async.fn) {
1007                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1008         }
1009 }
1010
1011 /* async version of send control request */
1012 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1013                 uint32_t destnode, uint64_t srvid, 
1014                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1015                 TALLOC_CTX *mem_ctx,
1016                 struct timeval *timeout,
1017                 char **errormsg)
1018 {
1019         struct ctdb_client_control_state *state;
1020         size_t len;
1021         struct ctdb_req_control *c;
1022         int ret;
1023
1024         if (errormsg) {
1025                 *errormsg = NULL;
1026         }
1027
1028         /* if the domain socket is not yet open, open it */
1029         if (ctdb->daemon.sd==-1) {
1030                 ctdb_socket_connect(ctdb);
1031         }
1032
1033         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1034         CTDB_NO_MEMORY_NULL(ctdb, state);
1035
1036         state->ctdb       = ctdb;
1037         state->reqid      = ctdb_reqid_new(ctdb, state);
1038         state->state      = CTDB_CONTROL_WAIT;
1039         state->errormsg   = NULL;
1040
1041         talloc_set_destructor(state, ctdb_client_control_destructor);
1042
1043         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1044         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1045                                len, struct ctdb_req_control);
1046         state->c            = c;        
1047         CTDB_NO_MEMORY_NULL(ctdb, c);
1048         c->hdr.reqid        = state->reqid;
1049         c->hdr.destnode     = destnode;
1050         c->opcode           = opcode;
1051         c->client_id        = 0;
1052         c->flags            = flags;
1053         c->srvid            = srvid;
1054         c->datalen          = data.dsize;
1055         if (data.dsize) {
1056                 memcpy(&c->data[0], data.dptr, data.dsize);
1057         }
1058
1059         /* timeout */
1060         if (timeout && !timeval_is_zero(timeout)) {
1061                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1062         }
1063
1064         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1065         if (ret != 0) {
1066                 talloc_free(state);
1067                 return NULL;
1068         }
1069
1070         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1071                 talloc_free(state);
1072                 return NULL;
1073         }
1074
1075         return state;
1076 }
1077
1078
1079 /* async version of receive control reply */
1080 int ctdb_control_recv(struct ctdb_context *ctdb, 
1081                 struct ctdb_client_control_state *state, 
1082                 TALLOC_CTX *mem_ctx,
1083                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1084 {
1085         TALLOC_CTX *tmp_ctx;
1086
1087         if (status != NULL) {
1088                 *status = -1;
1089         }
1090         if (errormsg != NULL) {
1091                 *errormsg = NULL;
1092         }
1093
1094         if (state == NULL) {
1095                 return -1;
1096         }
1097
1098         /* prevent double free of state */
1099         tmp_ctx = talloc_new(ctdb);
1100         talloc_steal(tmp_ctx, state);
1101
1102         /* loop one event at a time until we either timeout or the control
1103            completes.
1104         */
1105         while (state->state == CTDB_CONTROL_WAIT) {
1106                 event_loop_once(ctdb->ev);
1107         }
1108
1109         if (state->state != CTDB_CONTROL_DONE) {
1110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1111                 if (state->async.fn) {
1112                         state->async.fn(state);
1113                 }
1114                 talloc_free(tmp_ctx);
1115                 return -1;
1116         }
1117
1118         if (state->errormsg) {
1119                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1120                 if (errormsg) {
1121                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1122                 }
1123                 if (state->async.fn) {
1124                         state->async.fn(state);
1125                 }
1126                 talloc_free(tmp_ctx);
1127                 return -1;
1128         }
1129
1130         if (outdata) {
1131                 *outdata = state->outdata;
1132                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1133         }
1134
1135         if (status) {
1136                 *status = state->status;
1137         }
1138
1139         if (state->async.fn) {
1140                 state->async.fn(state);
1141         }
1142
1143         talloc_free(tmp_ctx);
1144         return 0;
1145 }
1146
1147
1148
1149 /*
1150   send a ctdb control message
1151   timeout specifies how long we should wait for a reply.
1152   if timeout is NULL we wait indefinitely
1153  */
1154 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1155                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1156                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1157                  struct timeval *timeout,
1158                  char **errormsg)
1159 {
1160         struct ctdb_client_control_state *state;
1161
1162         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1163                         flags, data, mem_ctx,
1164                         timeout, errormsg);
1165
1166         /* FIXME: Error conditions in ctdb_control_send return NULL without
1167          * setting errormsg.  So, there is no way to distinguish between sucess
1168          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1169         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1170                 if (status != NULL) {
1171                         *status = 0;
1172                 }
1173                 return 0;
1174         }
1175
1176         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1177                         errormsg);
1178 }
1179
1180
1181
1182
1183 /*
1184   a process exists call. Returns 0 if process exists, -1 otherwise
1185  */
1186 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1187 {
1188         int ret;
1189         TDB_DATA data;
1190         int32_t status;
1191
1192         data.dptr = (uint8_t*)&pid;
1193         data.dsize = sizeof(pid);
1194
1195         ret = ctdb_control(ctdb, destnode, 0, 
1196                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1197                            NULL, NULL, &status, NULL, NULL);
1198         if (ret != 0) {
1199                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1200                 return -1;
1201         }
1202
1203         return status;
1204 }
1205
1206 /*
1207   get remote statistics
1208  */
1209 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1210 {
1211         int ret;
1212         TDB_DATA data;
1213         int32_t res;
1214
1215         ret = ctdb_control(ctdb, destnode, 0, 
1216                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1217                            ctdb, &data, &res, NULL, NULL);
1218         if (ret != 0 || res != 0) {
1219                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1220                 return -1;
1221         }
1222
1223         if (data.dsize != sizeof(struct ctdb_statistics)) {
1224                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1225                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1226                       return -1;
1227         }
1228
1229         *status = *(struct ctdb_statistics *)data.dptr;
1230         talloc_free(data.dptr);
1231                         
1232         return 0;
1233 }
1234
1235 /*
1236   shutdown a remote ctdb node
1237  */
1238 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1239 {
1240         struct ctdb_client_control_state *state;
1241
1242         state = ctdb_control_send(ctdb, destnode, 0, 
1243                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1244                            NULL, &timeout, NULL);
1245         if (state == NULL) {
1246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1247                 return -1;
1248         }
1249
1250         return 0;
1251 }
1252
1253 /*
1254   get vnn map from a remote node
1255  */
1256 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1257 {
1258         int ret;
1259         TDB_DATA outdata;
1260         int32_t res;
1261         struct ctdb_vnn_map_wire *map;
1262
1263         ret = ctdb_control(ctdb, destnode, 0, 
1264                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1265                            mem_ctx, &outdata, &res, &timeout, NULL);
1266         if (ret != 0 || res != 0) {
1267                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1268                 return -1;
1269         }
1270         
1271         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1272         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1273             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1274                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1275                 return -1;
1276         }
1277
1278         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1279         CTDB_NO_MEMORY(ctdb, *vnnmap);
1280         (*vnnmap)->generation = map->generation;
1281         (*vnnmap)->size       = map->size;
1282         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1283
1284         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1285         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1286         talloc_free(outdata.dptr);
1287                     
1288         return 0;
1289 }
1290
1291
1292 /*
1293   get the recovery mode of a remote node
1294  */
1295 struct ctdb_client_control_state *
1296 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1297 {
1298         return ctdb_control_send(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1300                            mem_ctx, &timeout, NULL);
1301 }
1302
1303 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1304 {
1305         int ret;
1306         int32_t res;
1307
1308         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1309         if (ret != 0) {
1310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1311                 return -1;
1312         }
1313
1314         if (recmode) {
1315                 *recmode = (uint32_t)res;
1316         }
1317
1318         return 0;
1319 }
1320
1321 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1322 {
1323         struct ctdb_client_control_state *state;
1324
1325         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1326         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1327 }
1328
1329
1330
1331
1332 /*
1333   set the recovery mode of a remote node
1334  */
1335 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1336 {
1337         int ret;
1338         TDB_DATA data;
1339         int32_t res;
1340
1341         data.dsize = sizeof(uint32_t);
1342         data.dptr = (unsigned char *)&recmode;
1343
1344         ret = ctdb_control(ctdb, destnode, 0, 
1345                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1346                            NULL, NULL, &res, &timeout, NULL);
1347         if (ret != 0 || res != 0) {
1348                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1349                 return -1;
1350         }
1351
1352         return 0;
1353 }
1354
1355
1356
1357 /*
1358   get the recovery master of a remote node
1359  */
1360 struct ctdb_client_control_state *
1361 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1362                         struct timeval timeout, uint32_t destnode)
1363 {
1364         return ctdb_control_send(ctdb, destnode, 0, 
1365                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1366                            mem_ctx, &timeout, NULL);
1367 }
1368
1369 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1370 {
1371         int ret;
1372         int32_t res;
1373
1374         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1375         if (ret != 0) {
1376                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1377                 return -1;
1378         }
1379
1380         if (recmaster) {
1381                 *recmaster = (uint32_t)res;
1382         }
1383
1384         return 0;
1385 }
1386
1387 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1388 {
1389         struct ctdb_client_control_state *state;
1390
1391         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1392         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1393 }
1394
1395
1396 /*
1397   set the recovery master of a remote node
1398  */
1399 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1400 {
1401         int ret;
1402         TDB_DATA data;
1403         int32_t res;
1404
1405         ZERO_STRUCT(data);
1406         data.dsize = sizeof(uint32_t);
1407         data.dptr = (unsigned char *)&recmaster;
1408
1409         ret = ctdb_control(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1411                            NULL, NULL, &res, &timeout, NULL);
1412         if (ret != 0 || res != 0) {
1413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1414                 return -1;
1415         }
1416
1417         return 0;
1418 }
1419
1420
1421 /*
1422   get a list of databases off a remote node
1423  */
1424 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1425                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1426 {
1427         int ret;
1428         TDB_DATA outdata;
1429         int32_t res;
1430
1431         ret = ctdb_control(ctdb, destnode, 0, 
1432                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1433                            mem_ctx, &outdata, &res, &timeout, NULL);
1434         if (ret != 0 || res != 0) {
1435                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1436                 return -1;
1437         }
1438
1439         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1440         talloc_free(outdata.dptr);
1441                     
1442         return 0;
1443 }
1444
1445 /*
1446   get a list of nodes (vnn and flags ) from a remote node
1447  */
1448 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1449                 struct timeval timeout, uint32_t destnode, 
1450                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1451 {
1452         int ret;
1453         TDB_DATA outdata;
1454         int32_t res;
1455
1456         ret = ctdb_control(ctdb, destnode, 0, 
1457                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1458                            mem_ctx, &outdata, &res, &timeout, NULL);
1459         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1460                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1461                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1462         }
1463         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1465                 return -1;
1466         }
1467
1468         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1469         talloc_free(outdata.dptr);
1470                     
1471         return 0;
1472 }
1473
1474 /*
1475   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1476  */
1477 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1478                 struct timeval timeout, uint32_t destnode, 
1479                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1480 {
1481         int ret, i, len;
1482         TDB_DATA outdata;
1483         struct ctdb_node_mapv4 *nodemapv4;
1484         int32_t res;
1485
1486         ret = ctdb_control(ctdb, destnode, 0, 
1487                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1488                            mem_ctx, &outdata, &res, &timeout, NULL);
1489         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1490                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1491                 return -1;
1492         }
1493
1494         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1495
1496         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1497         (*nodemap) = talloc_zero_size(mem_ctx, len);
1498         CTDB_NO_MEMORY(ctdb, (*nodemap));
1499
1500         (*nodemap)->num = nodemapv4->num;
1501         for (i=0; i<nodemapv4->num; i++) {
1502                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1503                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1504                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1505                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1506         }
1507                 
1508         talloc_free(outdata.dptr);
1509                     
1510         return 0;
1511 }
1512
1513 /*
1514   drop the transport, reload the nodes file and restart the transport
1515  */
1516 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1517                     struct timeval timeout, uint32_t destnode)
1518 {
1519         int ret;
1520         int32_t res;
1521
1522         ret = ctdb_control(ctdb, destnode, 0, 
1523                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1524                            NULL, NULL, &res, &timeout, NULL);
1525         if (ret != 0 || res != 0) {
1526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1527                 return -1;
1528         }
1529
1530         return 0;
1531 }
1532
1533
1534 /*
1535   set vnn map on a node
1536  */
1537 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1538                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1539 {
1540         int ret;
1541         TDB_DATA data;
1542         int32_t res;
1543         struct ctdb_vnn_map_wire *map;
1544         size_t len;
1545
1546         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1547         map = talloc_size(mem_ctx, len);
1548         CTDB_NO_MEMORY(ctdb, map);
1549
1550         map->generation = vnnmap->generation;
1551         map->size = vnnmap->size;
1552         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1553         
1554         data.dsize = len;
1555         data.dptr  = (uint8_t *)map;
1556
1557         ret = ctdb_control(ctdb, destnode, 0, 
1558                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1559                            NULL, NULL, &res, &timeout, NULL);
1560         if (ret != 0 || res != 0) {
1561                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1562                 return -1;
1563         }
1564
1565         talloc_free(map);
1566
1567         return 0;
1568 }
1569
1570
1571 /*
1572   async send for pull database
1573  */
1574 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1575         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1576         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1577 {
1578         TDB_DATA indata;
1579         struct ctdb_control_pulldb *pull;
1580         struct ctdb_client_control_state *state;
1581
1582         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1583         CTDB_NO_MEMORY_NULL(ctdb, pull);
1584
1585         pull->db_id   = dbid;
1586         pull->lmaster = lmaster;
1587
1588         indata.dsize = sizeof(struct ctdb_control_pulldb);
1589         indata.dptr  = (unsigned char *)pull;
1590
1591         state = ctdb_control_send(ctdb, destnode, 0, 
1592                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1593                                   mem_ctx, &timeout, NULL);
1594         talloc_free(pull);
1595
1596         return state;
1597 }
1598
1599 /*
1600   async recv for pull database
1601  */
1602 int ctdb_ctrl_pulldb_recv(
1603         struct ctdb_context *ctdb, 
1604         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1605         TDB_DATA *outdata)
1606 {
1607         int ret;
1608         int32_t res;
1609
1610         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1611         if ( (ret != 0) || (res != 0) ){
1612                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1613                 return -1;
1614         }
1615
1616         return 0;
1617 }
1618
1619 /*
1620   pull all keys and records for a specific database on a node
1621  */
1622 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1623                 uint32_t dbid, uint32_t lmaster, 
1624                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1625                 TDB_DATA *outdata)
1626 {
1627         struct ctdb_client_control_state *state;
1628
1629         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1630                                       timeout);
1631         
1632         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1633 }
1634
1635
1636 /*
1637   change dmaster for all keys in the database to the new value
1638  */
1639 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1640                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1641 {
1642         int ret;
1643         TDB_DATA indata;
1644         int32_t res;
1645
1646         indata.dsize = 2*sizeof(uint32_t);
1647         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1648
1649         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1650         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1651
1652         ret = ctdb_control(ctdb, destnode, 0, 
1653                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1654                            NULL, NULL, &res, &timeout, NULL);
1655         if (ret != 0 || res != 0) {
1656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1657                 return -1;
1658         }
1659
1660         return 0;
1661 }
1662
1663 /*
1664   ping a node, return number of clients connected
1665  */
1666 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1667 {
1668         int ret;
1669         int32_t res;
1670
1671         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1672                            tdb_null, NULL, NULL, &res, NULL, NULL);
1673         if (ret != 0) {
1674                 return -1;
1675         }
1676         return res;
1677 }
1678
1679 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1680                            struct timeval timeout, 
1681                            uint32_t destnode,
1682                            uint32_t *runstate)
1683 {
1684         TDB_DATA outdata;
1685         int32_t res;
1686         int ret;
1687
1688         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1689                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1690         if (ret != 0 || res != 0) {
1691                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1692                 return ret != 0 ? ret : res;
1693         }
1694
1695         if (outdata.dsize != sizeof(uint32_t)) {
1696                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1697                 talloc_free(outdata.dptr);
1698                 return -1;
1699         }
1700
1701         if (runstate != NULL) {
1702                 *runstate = *(uint32_t *)outdata.dptr;
1703         }
1704         talloc_free(outdata.dptr);
1705
1706         return 0;
1707 }
1708
1709 /*
1710   find the real path to a ltdb 
1711  */
1712 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1713                    const char **path)
1714 {
1715         int ret;
1716         int32_t res;
1717         TDB_DATA data;
1718
1719         data.dptr = (uint8_t *)&dbid;
1720         data.dsize = sizeof(dbid);
1721
1722         ret = ctdb_control(ctdb, destnode, 0, 
1723                            CTDB_CONTROL_GETDBPATH, 0, data, 
1724                            mem_ctx, &data, &res, &timeout, NULL);
1725         if (ret != 0 || res != 0) {
1726                 return -1;
1727         }
1728
1729         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1730         if ((*path) == NULL) {
1731                 return -1;
1732         }
1733
1734         talloc_free(data.dptr);
1735
1736         return 0;
1737 }
1738
1739 /*
1740   find the name of a db 
1741  */
1742 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1743                    const char **name)
1744 {
1745         int ret;
1746         int32_t res;
1747         TDB_DATA data;
1748
1749         data.dptr = (uint8_t *)&dbid;
1750         data.dsize = sizeof(dbid);
1751
1752         ret = ctdb_control(ctdb, destnode, 0, 
1753                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1754                            mem_ctx, &data, &res, &timeout, NULL);
1755         if (ret != 0 || res != 0) {
1756                 return -1;
1757         }
1758
1759         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1760         if ((*name) == NULL) {
1761                 return -1;
1762         }
1763
1764         talloc_free(data.dptr);
1765
1766         return 0;
1767 }
1768
1769 /*
1770   get the health status of a db
1771  */
1772 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1773                           struct timeval timeout,
1774                           uint32_t destnode,
1775                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1776                           const char **reason)
1777 {
1778         int ret;
1779         int32_t res;
1780         TDB_DATA data;
1781
1782         data.dptr = (uint8_t *)&dbid;
1783         data.dsize = sizeof(dbid);
1784
1785         ret = ctdb_control(ctdb, destnode, 0,
1786                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1787                            mem_ctx, &data, &res, &timeout, NULL);
1788         if (ret != 0 || res != 0) {
1789                 return -1;
1790         }
1791
1792         if (data.dsize == 0) {
1793                 (*reason) = NULL;
1794                 return 0;
1795         }
1796
1797         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1798         if ((*reason) == NULL) {
1799                 return -1;
1800         }
1801
1802         talloc_free(data.dptr);
1803
1804         return 0;
1805 }
1806
1807 /*
1808   create a database
1809  */
1810 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1811                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1812 {
1813         int ret;
1814         int32_t res;
1815         TDB_DATA data;
1816         uint64_t tdb_flags = 0;
1817
1818         data.dptr = discard_const(name);
1819         data.dsize = strlen(name)+1;
1820
1821         /* Make sure that volatile databases use jenkins hash */
1822         if (!persistent) {
1823                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1824         }
1825
1826         ret = ctdb_control(ctdb, destnode, tdb_flags,
1827                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1828                            0, data, 
1829                            mem_ctx, &data, &res, &timeout, NULL);
1830
1831         if (ret != 0 || res != 0) {
1832                 return -1;
1833         }
1834
1835         return 0;
1836 }
1837
1838 /*
1839   get debug level on a node
1840  */
1841 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1842 {
1843         int ret;
1844         int32_t res;
1845         TDB_DATA data;
1846
1847         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1848                            ctdb, &data, &res, NULL, NULL);
1849         if (ret != 0 || res != 0) {
1850                 return -1;
1851         }
1852         if (data.dsize != sizeof(int32_t)) {
1853                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1854                          (unsigned)data.dsize));
1855                 return -1;
1856         }
1857         *level = *(int32_t *)data.dptr;
1858         talloc_free(data.dptr);
1859         return 0;
1860 }
1861
1862 /*
1863   set debug level on a node
1864  */
1865 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1866 {
1867         int ret;
1868         int32_t res;
1869         TDB_DATA data;
1870
1871         data.dptr = (uint8_t *)&level;
1872         data.dsize = sizeof(level);
1873
1874         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1875                            NULL, NULL, &res, NULL, NULL);
1876         if (ret != 0 || res != 0) {
1877                 return -1;
1878         }
1879         return 0;
1880 }
1881
1882
1883 /*
1884   get a list of connected nodes
1885  */
1886 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1887                                 struct timeval timeout,
1888                                 TALLOC_CTX *mem_ctx,
1889                                 uint32_t *num_nodes)
1890 {
1891         struct ctdb_node_map *map=NULL;
1892         int ret, i;
1893         uint32_t *nodes;
1894
1895         *num_nodes = 0;
1896
1897         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1898         if (ret != 0) {
1899                 return NULL;
1900         }
1901
1902         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1903         if (nodes == NULL) {
1904                 return NULL;
1905         }
1906
1907         for (i=0;i<map->num;i++) {
1908                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1909                         nodes[*num_nodes] = map->nodes[i].pnn;
1910                         (*num_nodes)++;
1911                 }
1912         }
1913
1914         return nodes;
1915 }
1916
1917
1918 /*
1919   reset remote status
1920  */
1921 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1922 {
1923         int ret;
1924         int32_t res;
1925
1926         ret = ctdb_control(ctdb, destnode, 0, 
1927                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1928                            NULL, NULL, &res, NULL, NULL);
1929         if (ret != 0 || res != 0) {
1930                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1931                 return -1;
1932         }
1933         return 0;
1934 }
1935
1936 /*
1937   attach to a specific database - client call
1938 */
1939 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1940                                     struct timeval timeout,
1941                                     const char *name,
1942                                     bool persistent,
1943                                     uint32_t tdb_flags)
1944 {
1945         struct ctdb_db_context *ctdb_db;
1946         TDB_DATA data;
1947         int ret;
1948         int32_t res;
1949
1950         ctdb_db = ctdb_db_handle(ctdb, name);
1951         if (ctdb_db) {
1952                 return ctdb_db;
1953         }
1954
1955         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1956         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1957
1958         ctdb_db->ctdb = ctdb;
1959         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1960         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1961
1962         data.dptr = discard_const(name);
1963         data.dsize = strlen(name)+1;
1964
1965         /* CTDB has switched to using jenkins hash for volatile databases.
1966          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
1967          * always set it.
1968          */
1969         if (!persistent) {
1970                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
1971         }
1972
1973         /* tell ctdb daemon to attach */
1974         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1975                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1976                            0, data, ctdb_db, &data, &res, NULL, NULL);
1977         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1978                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1979                 talloc_free(ctdb_db);
1980                 return NULL;
1981         }
1982         
1983         ctdb_db->db_id = *(uint32_t *)data.dptr;
1984         talloc_free(data.dptr);
1985
1986         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1987         if (ret != 0) {
1988                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1989                 talloc_free(ctdb_db);
1990                 return NULL;
1991         }
1992
1993         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1994         if (ctdb->valgrinding) {
1995                 tdb_flags |= TDB_NOMMAP;
1996         }
1997         tdb_flags |= TDB_DISALLOW_NESTING;
1998
1999         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2000         if (ctdb_db->ltdb == NULL) {
2001                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2002                 talloc_free(ctdb_db);
2003                 return NULL;
2004         }
2005
2006         ctdb_db->persistent = persistent;
2007
2008         DLIST_ADD(ctdb->db_list, ctdb_db);
2009
2010         /* add well known functions */
2011         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2012         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2013         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2014
2015         return ctdb_db;
2016 }
2017
2018
2019 /*
2020   setup a call for a database
2021  */
2022 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2023 {
2024         struct ctdb_registered_call *call;
2025
2026 #if 0
2027         TDB_DATA data;
2028         int32_t status;
2029         struct ctdb_control_set_call c;
2030         int ret;
2031
2032         /* this is no longer valid with the separate daemon architecture */
2033         c.db_id = ctdb_db->db_id;
2034         c.fn    = fn;
2035         c.id    = id;
2036
2037         data.dptr = (uint8_t *)&c;
2038         data.dsize = sizeof(c);
2039
2040         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2041                            data, NULL, NULL, &status, NULL, NULL);
2042         if (ret != 0 || status != 0) {
2043                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2044                 return -1;
2045         }
2046 #endif
2047
2048         /* also register locally */
2049         call = talloc(ctdb_db, struct ctdb_registered_call);
2050         call->fn = fn;
2051         call->id = id;
2052
2053         DLIST_ADD(ctdb_db->calls, call);        
2054         return 0;
2055 }
2056
2057
2058 struct traverse_state {
2059         bool done;
2060         uint32_t count;
2061         ctdb_traverse_func fn;
2062         void *private_data;
2063         bool listemptyrecords;
2064 };
2065
2066 /*
2067   called on each key during a ctdb_traverse
2068  */
2069 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2070 {
2071         struct traverse_state *state = (struct traverse_state *)p;
2072         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2073         TDB_DATA key;
2074
2075         if (data.dsize < sizeof(uint32_t) ||
2076             d->length != data.dsize) {
2077                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2078                 state->done = true;
2079                 return;
2080         }
2081
2082         key.dsize = d->keylen;
2083         key.dptr  = &d->data[0];
2084         data.dsize = d->datalen;
2085         data.dptr = &d->data[d->keylen];
2086
2087         if (key.dsize == 0 && data.dsize == 0) {
2088                 /* end of traverse */
2089                 state->done = true;
2090                 return;
2091         }
2092
2093         if (!state->listemptyrecords &&
2094             data.dsize == sizeof(struct ctdb_ltdb_header))
2095         {
2096                 /* empty records are deleted records in ctdb */
2097                 return;
2098         }
2099
2100         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2101                 state->done = true;
2102         }
2103
2104         state->count++;
2105 }
2106
2107 /**
2108  * start a cluster wide traverse, calling the supplied fn on each record
2109  * return the number of records traversed, or -1 on error
2110  *
2111  * Extendet variant with a flag to signal whether empty records should
2112  * be listed.
2113  */
2114 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2115                              ctdb_traverse_func fn,
2116                              bool withemptyrecords,
2117                              void *private_data)
2118 {
2119         TDB_DATA data;
2120         struct ctdb_traverse_start_ext t;
2121         int32_t status;
2122         int ret;
2123         uint64_t srvid = (getpid() | 0xFLL<<60);
2124         struct traverse_state state;
2125
2126         state.done = false;
2127         state.count = 0;
2128         state.private_data = private_data;
2129         state.fn = fn;
2130         state.listemptyrecords = withemptyrecords;
2131
2132         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2135                 return -1;
2136         }
2137
2138         t.db_id = ctdb_db->db_id;
2139         t.srvid = srvid;
2140         t.reqid = 0;
2141         t.withemptyrecords = withemptyrecords;
2142
2143         data.dptr = (uint8_t *)&t;
2144         data.dsize = sizeof(t);
2145
2146         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2147                            data, NULL, NULL, &status, NULL, NULL);
2148         if (ret != 0 || status != 0) {
2149                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2150                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2151                 return -1;
2152         }
2153
2154         while (!state.done) {
2155                 event_loop_once(ctdb_db->ctdb->ev);
2156         }
2157
2158         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2161                 return -1;
2162         }
2163
2164         return state.count;
2165 }
2166
2167 /**
2168  * start a cluster wide traverse, calling the supplied fn on each record
2169  * return the number of records traversed, or -1 on error
2170  *
2171  * Standard version which does not list the empty records:
2172  * These are considered deleted.
2173  */
2174 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2175 {
2176         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2177 }
2178
2179 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2180 /*
2181   called on each key during a catdb
2182  */
2183 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2184 {
2185         int i;
2186         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2187         FILE *f = c->f;
2188         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2189
2190         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2191         for (i=0;i<key.dsize;i++) {
2192                 if (ISASCII(key.dptr[i])) {
2193                         fprintf(f, "%c", key.dptr[i]);
2194                 } else {
2195                         fprintf(f, "\\%02X", key.dptr[i]);
2196                 }
2197         }
2198         fprintf(f, "\"\n");
2199
2200         fprintf(f, "dmaster: %u\n", h->dmaster);
2201         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2202
2203         if (c->printlmaster && ctdb->vnn_map != NULL) {
2204                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2205         }
2206
2207         if (c->printhash) {
2208                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2209         }
2210
2211         if (c->printrecordflags) {
2212                 fprintf(f, "flags: 0x%08x", h->flags);
2213                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2214                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2215                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2216                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2217                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2218                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2219                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2220                 fprintf(f, "\n");
2221         }
2222
2223         if (c->printdatasize) {
2224                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2225         } else {
2226                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2227                 for (i=sizeof(*h);i<data.dsize;i++) {
2228                         if (ISASCII(data.dptr[i])) {
2229                                 fprintf(f, "%c", data.dptr[i]);
2230                         } else {
2231                                 fprintf(f, "\\%02X", data.dptr[i]);
2232                         }
2233                 }
2234                 fprintf(f, "\"\n");
2235         }
2236
2237         fprintf(f, "\n");
2238
2239         return 0;
2240 }
2241
2242 /*
2243   convenience function to list all keys to stdout
2244  */
2245 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2246                  struct ctdb_dump_db_context *ctx)
2247 {
2248         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2249                                  ctx->printemptyrecords, ctx);
2250 }
2251
2252 /*
2253   get the pid of a ctdb daemon
2254  */
2255 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2256 {
2257         int ret;
2258         int32_t res;
2259
2260         ret = ctdb_control(ctdb, destnode, 0, 
2261                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2262                            NULL, NULL, &res, &timeout, NULL);
2263         if (ret != 0) {
2264                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2265                 return -1;
2266         }
2267
2268         *pid = res;
2269
2270         return 0;
2271 }
2272
2273
2274 /*
2275   async freeze send control
2276  */
2277 struct ctdb_client_control_state *
2278 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2279 {
2280         return ctdb_control_send(ctdb, destnode, priority, 
2281                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2282                            mem_ctx, &timeout, NULL);
2283 }
2284
2285 /* 
2286    async freeze recv control
2287 */
2288 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2289 {
2290         int ret;
2291         int32_t res;
2292
2293         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2294         if ( (ret != 0) || (res != 0) ){
2295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2296                 return -1;
2297         }
2298
2299         return 0;
2300 }
2301
2302 /*
2303   freeze databases of a certain priority
2304  */
2305 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2306 {
2307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2308         struct ctdb_client_control_state *state;
2309         int ret;
2310
2311         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2312         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2313         talloc_free(tmp_ctx);
2314
2315         return ret;
2316 }
2317
2318 /* Freeze all databases */
2319 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2320 {
2321         int i;
2322
2323         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2324                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2325                         return -1;
2326                 }
2327         }
2328         return 0;
2329 }
2330
2331 /*
2332   thaw databases of a certain priority
2333  */
2334 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2335 {
2336         int ret;
2337         int32_t res;
2338
2339         ret = ctdb_control(ctdb, destnode, priority, 
2340                            CTDB_CONTROL_THAW, 0, tdb_null, 
2341                            NULL, NULL, &res, &timeout, NULL);
2342         if (ret != 0 || res != 0) {
2343                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2344                 return -1;
2345         }
2346
2347         return 0;
2348 }
2349
2350 /* thaw all databases */
2351 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2352 {
2353         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2354 }
2355
2356 /*
2357   get pnn of a node, or -1
2358  */
2359 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2360 {
2361         int ret;
2362         int32_t res;
2363
2364         ret = ctdb_control(ctdb, destnode, 0, 
2365                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2366                            NULL, NULL, &res, &timeout, NULL);
2367         if (ret != 0) {
2368                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2369                 return -1;
2370         }
2371
2372         return res;
2373 }
2374
2375 /*
2376   get the monitoring mode of a remote node
2377  */
2378 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control(ctdb, destnode, 0, 
2384                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2385                            NULL, NULL, &res, &timeout, NULL);
2386         if (ret != 0) {
2387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2388                 return -1;
2389         }
2390
2391         *monmode = res;
2392
2393         return 0;
2394 }
2395
2396
2397 /*
2398  set the monitoring mode of a remote node to active
2399  */
2400 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2401 {
2402         int ret;
2403         
2404
2405         ret = ctdb_control(ctdb, destnode, 0, 
2406                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2407                            NULL, NULL,NULL, &timeout, NULL);
2408         if (ret != 0) {
2409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2410                 return -1;
2411         }
2412
2413         
2414
2415         return 0;
2416 }
2417
2418 /*
2419   set the monitoring mode of a remote node to disable
2420  */
2421 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2422 {
2423         int ret;
2424         
2425
2426         ret = ctdb_control(ctdb, destnode, 0, 
2427                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2428                            NULL, NULL, NULL, &timeout, NULL);
2429         if (ret != 0) {
2430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2431                 return -1;
2432         }
2433
2434         
2435
2436         return 0;
2437 }
2438
2439
2440
2441 /* 
2442   sent to a node to make it take over an ip address
2443 */
2444 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2445                           uint32_t destnode, struct ctdb_public_ip *ip)
2446 {
2447         TDB_DATA data;
2448         struct ctdb_public_ipv4 ipv4;
2449         int ret;
2450         int32_t res;
2451
2452         if (ip->addr.sa.sa_family == AF_INET) {
2453                 ipv4.pnn = ip->pnn;
2454                 ipv4.sin = ip->addr.ip;
2455
2456                 data.dsize = sizeof(ipv4);
2457                 data.dptr  = (uint8_t *)&ipv4;
2458
2459                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2460                            NULL, &res, &timeout, NULL);
2461         } else {
2462                 data.dsize = sizeof(*ip);
2463                 data.dptr  = (uint8_t *)ip;
2464
2465                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2466                            NULL, &res, &timeout, NULL);
2467         }
2468
2469         if (ret != 0 || res != 0) {
2470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2471                 return -1;
2472         }
2473
2474         return 0;       
2475 }
2476
2477
2478 /* 
2479   sent to a node to make it release an ip address
2480 */
2481 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2482                          uint32_t destnode, struct ctdb_public_ip *ip)
2483 {
2484         TDB_DATA data;
2485         struct ctdb_public_ipv4 ipv4;
2486         int ret;
2487         int32_t res;
2488
2489         if (ip->addr.sa.sa_family == AF_INET) {
2490                 ipv4.pnn = ip->pnn;
2491                 ipv4.sin = ip->addr.ip;
2492
2493                 data.dsize = sizeof(ipv4);
2494                 data.dptr  = (uint8_t *)&ipv4;
2495
2496                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2497                                    NULL, &res, &timeout, NULL);
2498         } else {
2499                 data.dsize = sizeof(*ip);
2500                 data.dptr  = (uint8_t *)ip;
2501
2502                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2503                                    NULL, &res, &timeout, NULL);
2504         }
2505
2506         if (ret != 0 || res != 0) {
2507                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2508                 return -1;
2509         }
2510
2511         return 0;       
2512 }
2513
2514
2515 /*
2516   get a tunable
2517  */
2518 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2519                           struct timeval timeout, 
2520                           uint32_t destnode,
2521                           const char *name, uint32_t *value)
2522 {
2523         struct ctdb_control_get_tunable *t;
2524         TDB_DATA data, outdata;
2525         int32_t res;
2526         int ret;
2527
2528         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2529         data.dptr  = talloc_size(ctdb, data.dsize);
2530         CTDB_NO_MEMORY(ctdb, data.dptr);
2531
2532         t = (struct ctdb_control_get_tunable *)data.dptr;
2533         t->length = strlen(name)+1;
2534         memcpy(t->name, name, t->length);
2535
2536         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2537                            &outdata, &res, &timeout, NULL);
2538         talloc_free(data.dptr);
2539         if (ret != 0 || res != 0) {
2540                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2541                 return ret != 0 ? ret : res;
2542         }
2543
2544         if (outdata.dsize != sizeof(uint32_t)) {
2545                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2546                 talloc_free(outdata.dptr);
2547                 return -1;
2548         }
2549         
2550         *value = *(uint32_t *)outdata.dptr;
2551         talloc_free(outdata.dptr);
2552
2553         return 0;
2554 }
2555
2556 /*
2557   set a tunable
2558  */
2559 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2560                           struct timeval timeout, 
2561                           uint32_t destnode,
2562                           const char *name, uint32_t value)
2563 {
2564         struct ctdb_control_set_tunable *t;
2565         TDB_DATA data;
2566         int32_t res;
2567         int ret;
2568
2569         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2570         data.dptr  = talloc_size(ctdb, data.dsize);
2571         CTDB_NO_MEMORY(ctdb, data.dptr);
2572
2573         t = (struct ctdb_control_set_tunable *)data.dptr;
2574         t->length = strlen(name)+1;
2575         memcpy(t->name, name, t->length);
2576         t->value = value;
2577
2578         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2579                            NULL, &res, &timeout, NULL);
2580         talloc_free(data.dptr);
2581         if (ret != 0 || res != 0) {
2582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2583                 return -1;
2584         }
2585
2586         return 0;
2587 }
2588
2589 /*
2590   list tunables
2591  */
2592 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2593                             struct timeval timeout, 
2594                             uint32_t destnode,
2595                             TALLOC_CTX *mem_ctx,
2596                             const char ***list, uint32_t *count)
2597 {
2598         TDB_DATA outdata;
2599         int32_t res;
2600         int ret;
2601         struct ctdb_control_list_tunable *t;
2602         char *p, *s, *ptr;
2603
2604         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2605                            mem_ctx, &outdata, &res, &timeout, NULL);
2606         if (ret != 0 || res != 0) {
2607                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2608                 return -1;
2609         }
2610
2611         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2612         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2613             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2614                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2615                 talloc_free(outdata.dptr);
2616                 return -1;              
2617         }
2618         
2619         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2620         CTDB_NO_MEMORY(ctdb, p);
2621
2622         talloc_free(outdata.dptr);
2623         
2624         (*list) = NULL;
2625         (*count) = 0;
2626
2627         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2628                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2629                 CTDB_NO_MEMORY(ctdb, *list);
2630                 (*list)[*count] = talloc_strdup(*list, s);
2631                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2632                 (*count)++;
2633         }
2634
2635         talloc_free(p);
2636
2637         return 0;
2638 }
2639
2640
2641 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2642                                    struct timeval timeout, uint32_t destnode,
2643                                    TALLOC_CTX *mem_ctx,
2644                                    uint32_t flags,
2645                                    struct ctdb_all_public_ips **ips)
2646 {
2647         int ret;
2648         TDB_DATA outdata;
2649         int32_t res;
2650
2651         ret = ctdb_control(ctdb, destnode, 0, 
2652                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2653                            mem_ctx, &outdata, &res, &timeout, NULL);
2654         if (ret == 0 && res == -1) {
2655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2656                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2657         }
2658         if (ret != 0 || res != 0) {
2659           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2660                 return -1;
2661         }
2662
2663         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2664         talloc_free(outdata.dptr);
2665                     
2666         return 0;
2667 }
2668
2669 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2670                              struct timeval timeout, uint32_t destnode,
2671                              TALLOC_CTX *mem_ctx,
2672                              struct ctdb_all_public_ips **ips)
2673 {
2674         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2675                                               destnode, mem_ctx,
2676                                               0, ips);
2677 }
2678
2679 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2680                         struct timeval timeout, uint32_t destnode, 
2681                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2682 {
2683         int ret, i, len;
2684         TDB_DATA outdata;
2685         int32_t res;
2686         struct ctdb_all_public_ipsv4 *ipsv4;
2687
2688         ret = ctdb_control(ctdb, destnode, 0, 
2689                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2690                            mem_ctx, &outdata, &res, &timeout, NULL);
2691         if (ret != 0 || res != 0) {
2692                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2693                 return -1;
2694         }
2695
2696         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2697         len = offsetof(struct ctdb_all_public_ips, ips) +
2698                 ipsv4->num*sizeof(struct ctdb_public_ip);
2699         *ips = talloc_zero_size(mem_ctx, len);
2700         CTDB_NO_MEMORY(ctdb, *ips);
2701         (*ips)->num = ipsv4->num;
2702         for (i=0; i<ipsv4->num; i++) {
2703                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2704                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2705         }
2706
2707         talloc_free(outdata.dptr);
2708                     
2709         return 0;
2710 }
2711
2712 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2713                                  struct timeval timeout, uint32_t destnode,
2714                                  TALLOC_CTX *mem_ctx,
2715                                  const ctdb_sock_addr *addr,
2716                                  struct ctdb_control_public_ip_info **_info)
2717 {
2718         int ret;
2719         TDB_DATA indata;
2720         TDB_DATA outdata;
2721         int32_t res;
2722         struct ctdb_control_public_ip_info *info;
2723         uint32_t len;
2724         uint32_t i;
2725
2726         indata.dptr = discard_const_p(uint8_t, addr);
2727         indata.dsize = sizeof(*addr);
2728
2729         ret = ctdb_control(ctdb, destnode, 0,
2730                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2731                            mem_ctx, &outdata, &res, &timeout, NULL);
2732         if (ret != 0 || res != 0) {
2733                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2734                                 "failed ret:%d res:%d\n",
2735                                 ret, res));
2736                 return -1;
2737         }
2738
2739         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2740         if (len > outdata.dsize) {
2741                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2742                                 "returned invalid data with size %u > %u\n",
2743                                 (unsigned int)outdata.dsize,
2744                                 (unsigned int)len));
2745                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2746                 return -1;
2747         }
2748
2749         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2750         len += info->num*sizeof(struct ctdb_control_iface_info);
2751
2752         if (len > outdata.dsize) {
2753                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2754                                 "returned invalid data with size %u > %u\n",
2755                                 (unsigned int)outdata.dsize,
2756                                 (unsigned int)len));
2757                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2758                 return -1;
2759         }
2760
2761         /* make sure we null terminate the returned strings */
2762         for (i=0; i < info->num; i++) {
2763                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2764         }
2765
2766         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2767                                                                 outdata.dptr,
2768                                                                 outdata.dsize);
2769         talloc_free(outdata.dptr);
2770         if (*_info == NULL) {
2771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2772                                 "talloc_memdup size %u failed\n",
2773                                 (unsigned int)outdata.dsize));
2774                 return -1;
2775         }
2776
2777         return 0;
2778 }
2779
2780 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2781                          struct timeval timeout, uint32_t destnode,
2782                          TALLOC_CTX *mem_ctx,
2783                          struct ctdb_control_get_ifaces **_ifaces)
2784 {
2785         int ret;
2786         TDB_DATA outdata;
2787         int32_t res;
2788         struct ctdb_control_get_ifaces *ifaces;
2789         uint32_t len;
2790         uint32_t i;
2791
2792         ret = ctdb_control(ctdb, destnode, 0,
2793                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2794                            mem_ctx, &outdata, &res, &timeout, NULL);
2795         if (ret != 0 || res != 0) {
2796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2797                                 "failed ret:%d res:%d\n",
2798                                 ret, res));
2799                 return -1;
2800         }
2801
2802         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2803         if (len > outdata.dsize) {
2804                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2805                                 "returned invalid data with size %u > %u\n",
2806                                 (unsigned int)outdata.dsize,
2807                                 (unsigned int)len));
2808                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2809                 return -1;
2810         }
2811
2812         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2813         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2814
2815         if (len > outdata.dsize) {
2816                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2817                                 "returned invalid data with size %u > %u\n",
2818                                 (unsigned int)outdata.dsize,
2819                                 (unsigned int)len));
2820                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2821                 return -1;
2822         }
2823
2824         /* make sure we null terminate the returned strings */
2825         for (i=0; i < ifaces->num; i++) {
2826                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2827         }
2828
2829         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2830                                                                   outdata.dptr,
2831                                                                   outdata.dsize);
2832         talloc_free(outdata.dptr);
2833         if (*_ifaces == NULL) {
2834                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2835                                 "talloc_memdup size %u failed\n",
2836                                 (unsigned int)outdata.dsize));
2837                 return -1;
2838         }
2839
2840         return 0;
2841 }
2842
2843 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2844                              struct timeval timeout, uint32_t destnode,
2845                              TALLOC_CTX *mem_ctx,
2846                              const struct ctdb_control_iface_info *info)
2847 {
2848         int ret;
2849         TDB_DATA indata;
2850         int32_t res;
2851
2852         indata.dptr = discard_const_p(uint8_t, info);
2853         indata.dsize = sizeof(*info);
2854
2855         ret = ctdb_control(ctdb, destnode, 0,
2856                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2857                            mem_ctx, NULL, &res, &timeout, NULL);
2858         if (ret != 0 || res != 0) {
2859                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2860                                 "failed ret:%d res:%d\n",
2861                                 ret, res));
2862                 return -1;
2863         }
2864
2865         return 0;
2866 }
2867
2868 /*
2869   set/clear the permanent disabled bit on a remote node
2870  */
2871 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2872                        uint32_t set, uint32_t clear)
2873 {
2874         int ret;
2875         TDB_DATA data;
2876         struct ctdb_node_map *nodemap=NULL;
2877         struct ctdb_node_flag_change c;
2878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2879         uint32_t recmaster;
2880         uint32_t *nodes;
2881
2882
2883         /* find the recovery master */
2884         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2885         if (ret != 0) {
2886                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2887                 talloc_free(tmp_ctx);
2888                 return ret;
2889         }
2890
2891
2892         /* read the node flags from the recmaster */
2893         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2894         if (ret != 0) {
2895                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2896                 talloc_free(tmp_ctx);
2897                 return -1;
2898         }
2899         if (destnode >= nodemap->num) {
2900                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2901                 talloc_free(tmp_ctx);
2902                 return -1;
2903         }
2904
2905         c.pnn       = destnode;
2906         c.old_flags = nodemap->nodes[destnode].flags;
2907         c.new_flags = c.old_flags;
2908         c.new_flags |= set;
2909         c.new_flags &= ~clear;
2910
2911         data.dsize = sizeof(c);
2912         data.dptr = (unsigned char *)&c;
2913
2914         /* send the flags update to all connected nodes */
2915         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2916
2917         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2918                                         nodes, 0,
2919                                         timeout, false, data,
2920                                         NULL, NULL,
2921                                         NULL) != 0) {
2922                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2923
2924                 talloc_free(tmp_ctx);
2925                 return -1;
2926         }
2927
2928         talloc_free(tmp_ctx);
2929         return 0;
2930 }
2931
2932
2933 /*
2934   get all tunables
2935  */
2936 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2937                                struct timeval timeout, 
2938                                uint32_t destnode,
2939                                struct ctdb_tunable *tunables)
2940 {
2941         TDB_DATA outdata;
2942         int ret;
2943         int32_t res;
2944
2945         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2946                            &outdata, &res, &timeout, NULL);
2947         if (ret != 0 || res != 0) {
2948                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2949                 return -1;
2950         }
2951
2952         if (outdata.dsize != sizeof(*tunables)) {
2953                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2954                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2955                 return -1;              
2956         }
2957
2958         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2959         talloc_free(outdata.dptr);
2960         return 0;
2961 }
2962
2963 /*
2964   add a public address to a node
2965  */
2966 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2967                       struct timeval timeout, 
2968                       uint32_t destnode,
2969                       struct ctdb_control_ip_iface *pub)
2970 {
2971         TDB_DATA data;
2972         int32_t res;
2973         int ret;
2974
2975         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2976         data.dptr  = (unsigned char *)pub;
2977
2978         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2979                            NULL, &res, &timeout, NULL);
2980         if (ret != 0 || res != 0) {
2981                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2982                 return -1;
2983         }
2984
2985         return 0;
2986 }
2987
2988 /*
2989   delete a public address from a node
2990  */
2991 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2992                       struct timeval timeout, 
2993                       uint32_t destnode,
2994                       struct ctdb_control_ip_iface *pub)
2995 {
2996         TDB_DATA data;
2997         int32_t res;
2998         int ret;
2999
3000         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3001         data.dptr  = (unsigned char *)pub;
3002
3003         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3004                            NULL, &res, &timeout, NULL);
3005         if (ret != 0 || res != 0) {
3006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3007                 return -1;
3008         }
3009
3010         return 0;
3011 }
3012
3013 /*
3014   kill a tcp connection
3015  */
3016 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3017                       struct timeval timeout, 
3018                       uint32_t destnode,
3019                       struct ctdb_control_killtcp *killtcp)
3020 {
3021         TDB_DATA data;
3022         int32_t res;
3023         int ret;
3024
3025         data.dsize = sizeof(struct ctdb_control_killtcp);
3026         data.dptr  = (unsigned char *)killtcp;
3027
3028         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3029                            NULL, &res, &timeout, NULL);
3030         if (ret != 0 || res != 0) {
3031                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3032                 return -1;
3033         }
3034
3035         return 0;
3036 }
3037
3038 /*
3039   send a gratious arp
3040  */
3041 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3042                       struct timeval timeout, 
3043                       uint32_t destnode,
3044                       ctdb_sock_addr *addr,
3045                       const char *ifname)
3046 {
3047         TDB_DATA data;
3048         int32_t res;
3049         int ret, len;
3050         struct ctdb_control_gratious_arp *gratious_arp;
3051         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3052
3053
3054         len = strlen(ifname)+1;
3055         gratious_arp = talloc_size(tmp_ctx, 
3056                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3057         CTDB_NO_MEMORY(ctdb, gratious_arp);
3058
3059         gratious_arp->addr = *addr;
3060         gratious_arp->len = len;
3061         memcpy(&gratious_arp->iface[0], ifname, len);
3062
3063
3064         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3065         data.dptr  = (unsigned char *)gratious_arp;
3066
3067         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3068                            NULL, &res, &timeout, NULL);
3069         if (ret != 0 || res != 0) {
3070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3071                 talloc_free(tmp_ctx);
3072                 return -1;
3073         }
3074
3075         talloc_free(tmp_ctx);
3076         return 0;
3077 }
3078
3079 /*
3080   get a list of all tcp tickles that a node knows about for a particular vnn
3081  */
3082 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3083                               struct timeval timeout, uint32_t destnode, 
3084                               TALLOC_CTX *mem_ctx, 
3085                               ctdb_sock_addr *addr,
3086                               struct ctdb_control_tcp_tickle_list **list)
3087 {
3088         int ret;
3089         TDB_DATA data, outdata;
3090         int32_t status;
3091
3092         data.dptr = (uint8_t*)addr;
3093         data.dsize = sizeof(ctdb_sock_addr);
3094
3095         ret = ctdb_control(ctdb, destnode, 0, 
3096                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3097                            mem_ctx, &outdata, &status, NULL, NULL);
3098         if (ret != 0 || status != 0) {
3099                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3100                 return -1;
3101         }
3102
3103         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3104
3105         return status;
3106 }
3107
3108 /*
3109   register a server id
3110  */
3111 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3112                       struct timeval timeout, 
3113                       struct ctdb_server_id *id)
3114 {
3115         TDB_DATA data;
3116         int32_t res;
3117         int ret;
3118
3119         data.dsize = sizeof(struct ctdb_server_id);
3120         data.dptr  = (unsigned char *)id;
3121
3122         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3123                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3124                         0, data, NULL,
3125                         NULL, &res, &timeout, NULL);
3126         if (ret != 0 || res != 0) {
3127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3128                 return -1;
3129         }
3130
3131         return 0;
3132 }
3133
3134 /*
3135   unregister a server id
3136  */
3137 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3138                       struct timeval timeout, 
3139                       struct ctdb_server_id *id)
3140 {
3141         TDB_DATA data;
3142         int32_t res;
3143         int ret;
3144
3145         data.dsize = sizeof(struct ctdb_server_id);
3146         data.dptr  = (unsigned char *)id;
3147
3148         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3149                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3150                         0, data, NULL,
3151                         NULL, &res, &timeout, NULL);
3152         if (ret != 0 || res != 0) {
3153                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3154                 return -1;
3155         }
3156
3157         return 0;
3158 }
3159
3160
3161 /*
3162   check if a server id exists
3163
3164   if a server id does exist, return *status == 1, otherwise *status == 0
3165  */
3166 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3167                       struct timeval timeout, 
3168                       uint32_t destnode,
3169                       struct ctdb_server_id *id,
3170                       uint32_t *status)
3171 {
3172         TDB_DATA data;
3173         int32_t res;
3174         int ret;
3175
3176         data.dsize = sizeof(struct ctdb_server_id);
3177         data.dptr  = (unsigned char *)id;
3178
3179         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3180                         0, data, NULL,
3181                         NULL, &res, &timeout, NULL);
3182         if (ret != 0) {
3183                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3184                 return -1;
3185         }
3186
3187         if (res) {
3188                 *status = 1;
3189         } else {
3190                 *status = 0;
3191         }
3192
3193         return 0;
3194 }
3195
3196 /*
3197    get the list of server ids that are registered on a node
3198 */
3199 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3200                 TALLOC_CTX *mem_ctx,
3201                 struct timeval timeout, uint32_t destnode, 
3202                 struct ctdb_server_id_list **svid_list)
3203 {
3204         int ret;
3205         TDB_DATA outdata;
3206         int32_t res;
3207
3208         ret = ctdb_control(ctdb, destnode, 0, 
3209                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3210                            mem_ctx, &outdata, &res, &timeout, NULL);
3211         if (ret != 0 || res != 0) {
3212                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3213                 return -1;
3214         }
3215
3216         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3217                     
3218         return 0;
3219 }
3220
3221 /*
3222   initialise the ctdb daemon for client applications
3223
3224   NOTE: In current code the daemon does not fork. This is for testing purposes only
3225   and to simplify the code.
3226 */
3227 struct ctdb_context *ctdb_init(struct event_context *ev)
3228 {
3229         int ret;
3230         struct ctdb_context *ctdb;
3231
3232         ctdb = talloc_zero(ev, struct ctdb_context);
3233         if (ctdb == NULL) {
3234                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3235                 return NULL;
3236         }
3237         ctdb->ev  = ev;
3238         ctdb->idr = idr_init(ctdb);
3239         /* Wrap early to exercise code. */
3240         ctdb->lastid = INT_MAX-200;
3241         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3242
3243         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3244         if (ret != 0) {
3245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3246                 talloc_free(ctdb);
3247                 return NULL;
3248         }
3249
3250         ctdb->statistics.statistics_start_time = timeval_current();
3251
3252         return ctdb;
3253 }
3254
3255
3256 /*
3257   set some ctdb flags
3258 */
3259 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3260 {
3261         ctdb->flags |= flags;
3262 }
3263
3264 /*
3265   setup the local socket name
3266 */
3267 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3268 {
3269         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3270         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3271
3272         return 0;
3273 }
3274
3275 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3276 {
3277         return ctdb->daemon.name;
3278 }
3279
3280 /*
3281   return the pnn of this node
3282 */
3283 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3284 {
3285         return ctdb->pnn;
3286 }
3287
3288
3289 /*
3290   get the uptime of a remote node
3291  */
3292 struct ctdb_client_control_state *
3293 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3294 {
3295         return ctdb_control_send(ctdb, destnode, 0, 
3296                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3297                            mem_ctx, &timeout, NULL);
3298 }
3299
3300 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3301 {
3302         int ret;
3303         int32_t res;
3304         TDB_DATA outdata;
3305
3306         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3307         if (ret != 0 || res != 0) {
3308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3309                 return -1;
3310         }
3311
3312         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3313
3314         return 0;
3315 }
3316
3317 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3318 {
3319         struct ctdb_client_control_state *state;
3320
3321         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3322         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3323 }
3324
3325 /*
3326   send a control to execute the "recovered" event script on a node
3327  */
3328 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3329 {
3330         int ret;
3331         int32_t status;
3332
3333         ret = ctdb_control(ctdb, destnode, 0, 
3334                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3335                            NULL, NULL, &status, &timeout, NULL);
3336         if (ret != 0 || status != 0) {
3337                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3338                 return -1;
3339         }
3340
3341         return 0;
3342 }
3343
3344 /* 
3345   callback for the async helpers used when sending the same control
3346   to multiple nodes in parallell.
3347 */
3348 static void async_callback(struct ctdb_client_control_state *state)
3349 {
3350         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3351         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3352         int ret;
3353         TDB_DATA outdata;
3354         int32_t res = -1;
3355         uint32_t destnode = state->c->hdr.destnode;
3356
3357         /* one more node has responded with recmode data */
3358         data->count--;
3359
3360         /* if we failed to push the db, then return an error and let
3361            the main loop try again.
3362         */
3363         if (state->state != CTDB_CONTROL_DONE) {
3364                 if ( !data->dont_log_errors) {
3365                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3366                 }
3367                 data->fail_count++;
3368                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3369                         res = -ETIME;
3370                 } else {
3371                         res = -1;
3372                 }
3373                 if (data->fail_callback) {
3374                         data->fail_callback(ctdb, destnode, res, outdata,
3375                                         data->callback_data);
3376                 }
3377                 return;
3378         }
3379         
3380         state->async.fn = NULL;
3381
3382         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3383         if ((ret != 0) || (res != 0)) {
3384                 if ( !data->dont_log_errors) {
3385                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3386                 }
3387                 data->fail_count++;
3388                 if (data->fail_callback) {
3389                         data->fail_callback(ctdb, destnode, res, outdata,
3390                                         data->callback_data);
3391                 }
3392         }
3393         if ((ret == 0) && (data->callback != NULL)) {
3394                 data->callback(ctdb, destnode, res, outdata,
3395                                         data->callback_data);
3396         }
3397 }
3398
3399
3400 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3401 {
3402         /* set up the callback functions */
3403         state->async.fn = async_callback;
3404         state->async.private_data = data;
3405         
3406         /* one more control to wait for to complete */
3407         data->count++;
3408 }
3409
3410
3411 /* wait for up to the maximum number of seconds allowed
3412    or until all nodes we expect a response from has replied
3413 */
3414 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3415 {
3416         while (data->count > 0) {
3417                 event_loop_once(ctdb->ev);
3418         }
3419         if (data->fail_count != 0) {
3420                 if (!data->dont_log_errors) {
3421                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3422                                  data->fail_count));
3423                 }
3424                 return -1;
3425         }
3426         return 0;
3427 }
3428
3429
3430 /* 
3431    perform a simple control on the listed nodes
3432    The control cannot return data
3433  */
3434 int ctdb_client_async_control(struct ctdb_context *ctdb,
3435                                 enum ctdb_controls opcode,
3436                                 uint32_t *nodes,
3437                                 uint64_t srvid,
3438                                 struct timeval timeout,
3439                                 bool dont_log_errors,
3440                                 TDB_DATA data,
3441                                 client_async_callback client_callback,
3442                                 client_async_callback fail_callback,
3443                                 void *callback_data)
3444 {
3445         struct client_async_data *async_data;
3446         struct ctdb_client_control_state *state;
3447         int j, num_nodes;
3448
3449         async_data = talloc_zero(ctdb, struct client_async_data);
3450         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3451         async_data->dont_log_errors = dont_log_errors;
3452         async_data->callback = client_callback;
3453         async_data->fail_callback = fail_callback;
3454         async_data->callback_data = callback_data;
3455         async_data->opcode        = opcode;
3456
3457         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3458
3459         /* loop over all nodes and send an async control to each of them */
3460         for (j=0; j<num_nodes; j++) {
3461                 uint32_t pnn = nodes[j];
3462
3463                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3464                                           0, data, async_data, &timeout, NULL);
3465                 if (state == NULL) {
3466                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3467                         talloc_free(async_data);
3468                         return -1;
3469                 }
3470                 
3471                 ctdb_client_async_add(async_data, state);
3472         }
3473
3474         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3475                 talloc_free(async_data);
3476                 return -1;
3477         }
3478
3479         talloc_free(async_data);
3480         return 0;
3481 }
3482
3483 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3484                                 struct ctdb_vnn_map *vnn_map,
3485                                 TALLOC_CTX *mem_ctx,
3486                                 bool include_self)
3487 {
3488         int i, j, num_nodes;
3489         uint32_t *nodes;
3490
3491         for (i=num_nodes=0;i<vnn_map->size;i++) {
3492                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3493                         continue;
3494                 }
3495                 num_nodes++;
3496         } 
3497
3498         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3499         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3500
3501         for (i=j=0;i<vnn_map->size;i++) {
3502                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3503                         continue;
3504                 }
3505                 nodes[j++] = vnn_map->map[i];
3506         } 
3507
3508         return nodes;
3509 }
3510
3511 /* Get list of nodes not including those with flags specified by mask.
3512  * If exclude_pnn is not -1 then exclude that pnn from the list.
3513  */
3514 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3515                         struct ctdb_node_map *node_map,
3516                         TALLOC_CTX *mem_ctx,
3517                         uint32_t mask,
3518                         int exclude_pnn)
3519 {
3520         int i, j, num_nodes;
3521         uint32_t *nodes;
3522
3523         for (i=num_nodes=0;i<node_map->num;i++) {
3524                 if (node_map->nodes[i].flags & mask) {
3525                         continue;
3526                 }
3527                 if (node_map->nodes[i].pnn == exclude_pnn) {
3528                         continue;
3529                 }
3530                 num_nodes++;
3531         } 
3532
3533         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3534         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3535
3536         for (i=j=0;i<node_map->num;i++) {
3537                 if (node_map->nodes[i].flags & mask) {
3538                         continue;
3539                 }
3540                 if (node_map->nodes[i].pnn == exclude_pnn) {
3541                         continue;
3542                 }
3543                 nodes[j++] = node_map->nodes[i].pnn;
3544         } 
3545
3546         return nodes;
3547 }
3548
3549 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3550                                 struct ctdb_node_map *node_map,
3551                                 TALLOC_CTX *mem_ctx,
3552                                 bool include_self)
3553 {
3554         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3555                              include_self ? -1 : ctdb->pnn);
3556 }
3557
3558 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3559                                 struct ctdb_node_map *node_map,
3560                                 TALLOC_CTX *mem_ctx,
3561                                 bool include_self)
3562 {
3563         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3564                              include_self ? -1 : ctdb->pnn);
3565 }
3566
3567 /* 
3568   this is used to test if a pnn lock exists and if it exists will return
3569   the number of connections that pnn has reported or -1 if that recovery
3570   daemon is not running.
3571 */
3572 int
3573 ctdb_read_pnn_lock(int fd, int32_t pnn)
3574 {
3575         struct flock lock;
3576         char c;
3577
3578         lock.l_type = F_WRLCK;
3579         lock.l_whence = SEEK_SET;
3580         lock.l_start = pnn;
3581         lock.l_len = 1;
3582         lock.l_pid = 0;
3583
3584         if (fcntl(fd, F_GETLK, &lock) != 0) {
3585                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3586                 return -1;
3587         }
3588
3589         if (lock.l_type == F_UNLCK) {
3590                 return -1;
3591         }
3592
3593         if (pread(fd, &c, 1, pnn) == -1) {
3594                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3595                 return -1;
3596         }
3597
3598         return c;
3599 }
3600
3601 /*
3602   get capabilities of a remote node
3603  */
3604 struct ctdb_client_control_state *
3605 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3606 {
3607         return ctdb_control_send(ctdb, destnode, 0, 
3608                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3609                            mem_ctx, &timeout, NULL);
3610 }
3611
3612 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3613 {
3614         int ret;
3615         int32_t res;
3616         TDB_DATA outdata;
3617
3618         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3619         if ( (ret != 0) || (res != 0) ) {
3620                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3621                 return -1;
3622         }
3623
3624         if (capabilities) {
3625                 *capabilities = *((uint32_t *)outdata.dptr);
3626         }
3627
3628         return 0;
3629 }
3630
3631 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3632 {
3633         struct ctdb_client_control_state *state;
3634         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3635         int ret;
3636
3637         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3638         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3639         talloc_free(tmp_ctx);
3640         return ret;
3641 }
3642
3643 /**
3644  * check whether a transaction is active on a given db on a given node
3645  */
3646 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3647                                      uint32_t destnode,
3648                                      uint32_t db_id)
3649 {
3650         int32_t status;
3651         int ret;
3652         TDB_DATA indata;
3653
3654         indata.dptr = (uint8_t *)&db_id;
3655         indata.dsize = sizeof(db_id);
3656
3657         ret = ctdb_control(ctdb, destnode, 0,
3658                            CTDB_CONTROL_TRANS2_ACTIVE,
3659                            0, indata, NULL, NULL, &status,
3660                            NULL, NULL);
3661
3662         if (ret != 0) {
3663                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3664                 return -1;
3665         }
3666
3667         return status;
3668 }
3669
3670
3671 struct ctdb_transaction_handle {
3672         struct ctdb_db_context *ctdb_db;
3673         bool in_replay;
3674         /*
3675          * we store the reads and writes done under a transaction:
3676          * - one list stores both reads and writes (m_all),
3677          * - the other just writes (m_write)
3678          */
3679         struct ctdb_marshall_buffer *m_all;
3680         struct ctdb_marshall_buffer *m_write;
3681 };
3682
3683 /* start a transaction on a database */
3684 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3685 {
3686         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3687         return 0;
3688 }
3689
3690 /* start a transaction on a database */
3691 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3692 {
3693         struct ctdb_record_handle *rh;
3694         TDB_DATA key;
3695         TDB_DATA data;
3696         struct ctdb_ltdb_header header;
3697         TALLOC_CTX *tmp_ctx;
3698         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3699         int ret;
3700         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3701         pid_t pid;
3702         int32_t status;
3703
3704         key.dptr = discard_const(keyname);
3705         key.dsize = strlen(keyname);
3706
3707         if (!ctdb_db->persistent) {
3708                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3709                 return -1;
3710         }
3711
3712 again:
3713         tmp_ctx = talloc_new(h);
3714
3715         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3716         if (rh == NULL) {
3717                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3718                 talloc_free(tmp_ctx);
3719                 return -1;
3720         }
3721
3722         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3723                                               CTDB_CURRENT_NODE,
3724                                               ctdb_db->db_id);
3725         if (status == 1) {
3726                 unsigned long int usec = (1000 + random()) % 100000;
3727                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3728                                     "on db_id[0x%08x]. waiting for %lu "
3729                                     "microseconds\n",
3730                                     ctdb_db->db_id, usec));
3731                 talloc_free(tmp_ctx);
3732                 usleep(usec);
3733                 goto again;
3734         }
3735
3736         /*
3737          * store the pid in the database:
3738          * it is not enough that the node is dmaster...
3739          */
3740         pid = getpid();
3741         data.dptr = (unsigned char *)&pid;
3742         data.dsize = sizeof(pid_t);
3743         rh->header.rsn++;
3744         rh->header.dmaster = ctdb_db->ctdb->pnn;
3745         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3746         if (ret != 0) {
3747                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3748                                   "transaction record\n"));
3749                 talloc_free(tmp_ctx);
3750                 return -1;
3751         }
3752
3753         talloc_free(rh);
3754
3755         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3756         if (ret != 0) {
3757                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3758                 talloc_free(tmp_ctx);
3759                 return -1;
3760         }
3761
3762         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3763         if (ret != 0) {
3764                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3765                                  "lock record inside transaction\n"));
3766                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3767                 talloc_free(tmp_ctx);
3768                 goto again;
3769         }
3770
3771         if (header.dmaster != ctdb_db->ctdb->pnn) {
3772                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3773                                    "transaction lock record\n"));
3774                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3775                 talloc_free(tmp_ctx);
3776                 goto again;
3777         }
3778
3779         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3780                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3781                                     "the transaction lock record\n"));
3782                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3783                 talloc_free(tmp_ctx);
3784                 goto again;
3785         }
3786
3787         talloc_free(tmp_ctx);
3788
3789         return 0;
3790 }
3791
3792
3793 /* start a transaction on a database */
3794 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3795                                                        TALLOC_CTX *mem_ctx)
3796 {
3797         struct ctdb_transaction_handle *h;
3798         int ret;
3799
3800         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3801         if (h == NULL) {
3802                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3803                 return NULL;
3804         }
3805
3806         h->ctdb_db = ctdb_db;
3807
3808         ret = ctdb_transaction_fetch_start(h);
3809         if (ret != 0) {
3810                 talloc_free(h);
3811                 return NULL;
3812         }
3813
3814         talloc_set_destructor(h, ctdb_transaction_destructor);
3815
3816         return h;
3817 }
3818
3819
3820
3821 /*
3822   fetch a record inside a transaction
3823  */
3824 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3825                            TALLOC_CTX *mem_ctx, 
3826                            TDB_DATA key, TDB_DATA *data)
3827 {
3828         struct ctdb_ltdb_header header;
3829         int ret;
3830
3831         ZERO_STRUCT(header);
3832
3833         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3834         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3835                 /* record doesn't exist yet */
3836                 *data = tdb_null;
3837                 ret = 0;
3838         }
3839         
3840         if (ret != 0) {
3841                 return ret;
3842         }
3843
3844         if (!h->in_replay) {
3845                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3846                 if (h->m_all == NULL) {
3847                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3848                         return -1;
3849                 }
3850         }
3851
3852         return 0;
3853 }
3854
3855 /*
3856   stores a record inside a transaction
3857  */
3858 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3859                            TDB_DATA key, TDB_DATA data)
3860 {
3861         TALLOC_CTX *tmp_ctx = talloc_new(h);
3862         struct ctdb_ltdb_header header;
3863         TDB_DATA olddata;
3864         int ret;
3865
3866         ZERO_STRUCT(header);
3867
3868         /* we need the header so we can update the RSN */
3869         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3870         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3871                 /* the record doesn't exist - create one with us as dmaster.
3872                    This is only safe because we are in a transaction and this
3873                    is a persistent database */
3874                 ZERO_STRUCT(header);
3875         } else if (ret != 0) {
3876                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3877                 talloc_free(tmp_ctx);
3878                 return ret;
3879         }
3880
3881         if (data.dsize == olddata.dsize &&
3882             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3883                 /* save writing the same data */
3884                 talloc_free(tmp_ctx);
3885                 return 0;
3886         }
3887
3888         header.dmaster = h->ctdb_db->ctdb->pnn;
3889         header.rsn++;
3890
3891         if (!h->in_replay) {
3892                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3893                 if (h->m_all == NULL) {
3894                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3895                         talloc_free(tmp_ctx);
3896                         return -1;
3897                 }
3898         }               
3899
3900         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3901         if (h->m_write == NULL) {
3902                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3903                 talloc_free(tmp_ctx);
3904                 return -1;
3905         }
3906         
3907         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3908
3909         talloc_free(tmp_ctx);
3910         
3911         return ret;
3912 }
3913
3914 /*
3915   replay a transaction
3916  */
3917 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3918 {
3919         int ret, i;
3920         struct ctdb_rec_data *rec = NULL;
3921
3922         h->in_replay = true;
3923         talloc_free(h->m_write);
3924         h->m_write = NULL;
3925
3926         ret = ctdb_transaction_fetch_start(h);
3927         if (ret != 0) {
3928                 return ret;
3929         }
3930
3931         for (i=0;i<h->m_all->count;i++) {
3932                 TDB_DATA key, data;
3933
3934                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3935                 if (rec == NULL) {
3936                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3937                         goto failed;
3938                 }
3939
3940                 if (rec->reqid == 0) {
3941                         /* its a store */
3942                         if (ctdb_transaction_store(h, key, data) != 0) {
3943                                 goto failed;
3944                         }
3945                 } else {
3946                         TDB_DATA data2;
3947                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3948
3949                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3950                                 talloc_free(tmp_ctx);
3951                                 goto failed;
3952                         }
3953                         if (data2.dsize != data.dsize ||
3954                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3955                                 /* the record has changed on us - we have to give up */
3956                                 talloc_free(tmp_ctx);
3957                                 goto failed;
3958                         }
3959                         talloc_free(tmp_ctx);
3960                 }
3961         }
3962         
3963         return 0;
3964
3965 failed:
3966         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3967         return -1;
3968 }
3969
3970
3971 /*
3972   commit a transaction
3973  */
3974 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3975 {
3976         int ret, retries=0;
3977         int32_t status;
3978         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3979         struct timeval timeout;
3980         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3981
3982         talloc_set_destructor(h, NULL);
3983
3984         /* our commit strategy is quite complex.
3985
3986            - we first try to commit the changes to all other nodes
3987
3988            - if that works, then we commit locally and we are done
3989
3990            - if a commit on another node fails, then we need to cancel
3991              the transaction, then restart the transaction (thus
3992              opening a window of time for a pending recovery to
3993              complete), then replay the transaction, checking all the
3994              reads and writes (checking that reads give the same data,
3995              and writes succeed). Then we retry the transaction to the
3996              other nodes
3997         */
3998
3999 again:
4000         if (h->m_write == NULL) {
4001                 /* no changes were made */
4002                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4003                 talloc_free(h);
4004                 return 0;
4005         }
4006
4007         /* tell ctdbd to commit to the other nodes */
4008         timeout = timeval_current_ofs(1, 0);
4009         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4010                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4011                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4012                            &timeout, NULL);
4013         if (ret != 0 || status != 0) {
4014                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4015                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4016                                      ", retrying after 1 second...\n",
4017                                      (retries==0)?"":"retry "));
4018                 sleep(1);
4019
4020                 if (ret != 0) {
4021                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4022                 } else {
4023                         /* work out what error code we will give if we 
4024                            have to fail the operation */
4025                         switch ((enum ctdb_trans2_commit_error)status) {
4026                         case CTDB_TRANS2_COMMIT_SUCCESS:
4027                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4028                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4029                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4030                                 break;
4031                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4032                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4033                                 break;
4034                         }
4035                 }
4036
4037                 if (++retries == 100) {
4038                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4039                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4040                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4041                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4042                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4043                         talloc_free(h);
4044                         return -1;
4045                 }               
4046
4047                 if (ctdb_replay_transaction(h) != 0) {
4048                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4049                                           "transaction on db 0x%08x, "
4050                                           "failure control =%u\n",
4051                                           h->ctdb_db->db_id,
4052                                           (unsigned)failure_control));
4053                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4054                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4055                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4056                         talloc_free(h);
4057                         return -1;
4058                 }
4059                 goto again;
4060         } else {
4061                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4062         }
4063
4064         /* do the real commit locally */
4065         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4066         if (ret != 0) {
4067                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4068                                   "on db id 0x%08x locally, "
4069                                   "failure_control=%u\n",
4070                                   h->ctdb_db->db_id,
4071                                   (unsigned)failure_control));
4072                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4073                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4074                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4075                 talloc_free(h);
4076                 return ret;
4077         }
4078
4079         /* tell ctdbd that we are finished with our local commit */
4080         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4081                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4082                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4083         talloc_free(h);
4084         return 0;
4085 }
4086
4087 /*
4088   recovery daemon ping to main daemon
4089  */
4090 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4091 {
4092         int ret;
4093         int32_t res;
4094
4095         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4096                            ctdb, NULL, &res, NULL, NULL);
4097         if (ret != 0 || res != 0) {
4098                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4099                 return -1;
4100         }
4101
4102         return 0;
4103 }
4104
4105 /* When forking the main daemon and the child process needs to connect
4106  * back to the daemon as a client process, this function can be used
4107  * to change the ctdb context from daemon into client mode.  The child
4108  * process must be created using ctdb_fork() and not fork() -
4109  * ctdb_fork() does some necessary housekeeping.
4110  */
4111 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4112 {
4113         int ret;
4114         va_list ap;
4115
4116         /* Add extra information so we can identify this in the logs */
4117         va_start(ap, fmt);
4118         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4119         va_end(ap);
4120
4121         /* get a new event context */
4122         ctdb->ev = event_context_init(ctdb);
4123         tevent_loop_allow_nesting(ctdb->ev);
4124
4125         /* Connect to main CTDB daemon */
4126         ret = ctdb_socket_connect(ctdb);
4127         if (ret != 0) {
4128                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4129                 return -1;
4130         }
4131
4132         ctdb->can_send_controls = true;
4133
4134         return 0;
4135 }
4136
4137 /*
4138   get the status of running the monitor eventscripts: NULL means never run.
4139  */
4140 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4141                 struct timeval timeout, uint32_t destnode, 
4142                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4143                 struct ctdb_scripts_wire **scripts)
4144 {
4145         int ret;
4146         TDB_DATA outdata, indata;
4147         int32_t res;
4148         uint32_t uinttype = type;
4149
4150         indata.dptr = (uint8_t *)&uinttype;
4151         indata.dsize = sizeof(uinttype);
4152
4153         ret = ctdb_control(ctdb, destnode, 0, 
4154                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4155                            mem_ctx, &outdata, &res, &timeout, NULL);
4156         if (ret != 0 || res != 0) {
4157                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4158                 return -1;
4159         }
4160
4161         if (outdata.dsize == 0) {
4162                 *scripts = NULL;
4163         } else {
4164                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4165                 talloc_free(outdata.dptr);
4166         }
4167                     
4168         return 0;
4169 }
4170
4171 /*
4172   tell the main daemon how long it took to lock the reclock file
4173  */
4174 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4175 {
4176         int ret;
4177         int32_t res;
4178         TDB_DATA data;
4179
4180         data.dptr = (uint8_t *)&latency;
4181         data.dsize = sizeof(latency);
4182
4183         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4184                            ctdb, NULL, &res, NULL, NULL);
4185         if (ret != 0 || res != 0) {
4186                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4187                 return -1;
4188         }
4189
4190         return 0;
4191 }
4192
4193 /*
4194   get the name of the reclock file
4195  */
4196 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4197                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4198                          const char **name)
4199 {
4200         int ret;
4201         int32_t res;
4202         TDB_DATA data;
4203
4204         ret = ctdb_control(ctdb, destnode, 0, 
4205                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4206                            mem_ctx, &data, &res, &timeout, NULL);
4207         if (ret != 0 || res != 0) {
4208                 return -1;
4209         }
4210
4211         if (data.dsize == 0) {
4212                 *name = NULL;
4213         } else {
4214                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4215         }
4216         talloc_free(data.dptr);
4217
4218         return 0;
4219 }
4220
4221 /*
4222   set the reclock filename for a node
4223  */
4224 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4225 {
4226         int ret;
4227         TDB_DATA data;
4228         int32_t res;
4229
4230         if (reclock == NULL) {
4231                 data.dsize = 0;
4232                 data.dptr  = NULL;
4233         } else {
4234                 data.dsize = strlen(reclock) + 1;
4235                 data.dptr  = discard_const(reclock);
4236         }
4237
4238         ret = ctdb_control(ctdb, destnode, 0, 
4239                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4240                            NULL, NULL, &res, &timeout, NULL);
4241         if (ret != 0 || res != 0) {
4242                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4243                 return -1;
4244         }
4245
4246         return 0;
4247 }
4248
4249 /*
4250   stop a node
4251  */
4252 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4253 {
4254         int ret;
4255         int32_t res;
4256
4257         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4258                            ctdb, NULL, &res, &timeout, NULL);
4259         if (ret != 0 || res != 0) {
4260                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4261                 return -1;
4262         }
4263
4264         return 0;
4265 }
4266
4267 /*
4268   continue a node
4269  */
4270 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4271 {
4272         int ret;
4273
4274         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4275                            ctdb, NULL, NULL, &timeout, NULL);
4276         if (ret != 0) {
4277                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4278                 return -1;
4279         }
4280
4281         return 0;
4282 }
4283
4284 /*
4285   set the natgw state for a node
4286  */
4287 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4288 {
4289         int ret;
4290         TDB_DATA data;
4291         int32_t res;
4292
4293         data.dsize = sizeof(natgwstate);
4294         data.dptr  = (uint8_t *)&natgwstate;
4295
4296         ret = ctdb_control(ctdb, destnode, 0, 
4297                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4298                            NULL, NULL, &res, &timeout, NULL);
4299         if (ret != 0 || res != 0) {
4300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4301                 return -1;
4302         }
4303
4304         return 0;
4305 }
4306
4307 /*
4308   set the lmaster role for a node
4309  */
4310 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4311 {
4312         int ret;
4313         TDB_DATA data;
4314         int32_t res;
4315
4316         data.dsize = sizeof(lmasterrole);
4317         data.dptr  = (uint8_t *)&lmasterrole;
4318
4319         ret = ctdb_control(ctdb, destnode, 0, 
4320                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4321                            NULL, NULL, &res, &timeout, NULL);
4322         if (ret != 0 || res != 0) {
4323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4324                 return -1;
4325         }
4326
4327         return 0;
4328 }
4329
4330 /*
4331   set the recmaster role for a node
4332  */
4333 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4334 {
4335         int ret;
4336         TDB_DATA data;
4337         int32_t res;
4338
4339         data.dsize = sizeof(recmasterrole);
4340         data.dptr  = (uint8_t *)&recmasterrole;
4341
4342         ret = ctdb_control(ctdb, destnode, 0, 
4343                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4344                            NULL, NULL, &res, &timeout, NULL);
4345         if (ret != 0 || res != 0) {
4346                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4347                 return -1;
4348         }
4349
4350         return 0;
4351 }
4352
4353 /* enable an eventscript
4354  */
4355 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4356 {
4357         int ret;
4358         TDB_DATA data;
4359         int32_t res;
4360
4361         data.dsize = strlen(script) + 1;
4362         data.dptr  = discard_const(script);
4363
4364         ret = ctdb_control(ctdb, destnode, 0, 
4365                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4366                            NULL, NULL, &res, &timeout, NULL);
4367         if (ret != 0 || res != 0) {
4368                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4369                 return -1;
4370         }
4371
4372         return 0;
4373 }
4374
4375 /* disable an eventscript
4376  */
4377 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4378 {
4379         int ret;
4380         TDB_DATA data;
4381         int32_t res;
4382
4383         data.dsize = strlen(script) + 1;
4384         data.dptr  = discard_const(script);
4385
4386         ret = ctdb_control(ctdb, destnode, 0, 
4387                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4388                            NULL, NULL, &res, &timeout, NULL);
4389         if (ret != 0 || res != 0) {
4390                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4391                 return -1;
4392         }
4393
4394         return 0;
4395 }
4396
4397
4398 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4399 {
4400         int ret;
4401         TDB_DATA data;
4402         int32_t res;
4403
4404         data.dsize = sizeof(*bantime);
4405         data.dptr  = (uint8_t *)bantime;
4406
4407         ret = ctdb_control(ctdb, destnode, 0, 
4408                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4409                            NULL, NULL, &res, &timeout, NULL);
4410         if (ret != 0 || res != 0) {
4411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4412                 return -1;
4413         }
4414
4415         return 0;
4416 }
4417
4418
4419 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4420 {
4421         int ret;
4422         TDB_DATA outdata;
4423         int32_t res;
4424         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4425
4426         ret = ctdb_control(ctdb, destnode, 0, 
4427                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4428                            tmp_ctx, &outdata, &res, &timeout, NULL);
4429         if (ret != 0 || res != 0) {
4430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4431                 talloc_free(tmp_ctx);
4432                 return -1;
4433         }
4434
4435         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4436         talloc_free(tmp_ctx);
4437
4438         return 0;
4439 }
4440
4441
4442 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4443 {
4444         int ret;
4445         int32_t res;
4446         TDB_DATA data;
4447         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4448
4449         data.dptr = (uint8_t*)db_prio;
4450         data.dsize = sizeof(*db_prio);
4451
4452         ret = ctdb_control(ctdb, destnode, 0, 
4453                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4454                            tmp_ctx, NULL, &res, &timeout, NULL);
4455         if (ret != 0 || res != 0) {
4456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4457                 talloc_free(tmp_ctx);
4458                 return -1;
4459         }
4460
4461         talloc_free(tmp_ctx);
4462
4463         return 0;
4464 }
4465
4466 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4467 {
4468         int ret;
4469         int32_t res;
4470         TDB_DATA data;
4471         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4472
4473         data.dptr = (uint8_t*)&db_id;
4474         data.dsize = sizeof(db_id);
4475
4476         ret = ctdb_control(ctdb, destnode, 0, 
4477                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4478                            tmp_ctx, NULL, &res, &timeout, NULL);
4479         if (ret != 0 || res < 0) {
4480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4481                 talloc_free(tmp_ctx);
4482                 return -1;
4483         }
4484
4485         if (priority) {
4486                 *priority = res;
4487         }
4488
4489         talloc_free(tmp_ctx);
4490
4491         return 0;
4492 }
4493
4494 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4495 {
4496         int ret;
4497         TDB_DATA outdata;
4498         int32_t res;
4499
4500         ret = ctdb_control(ctdb, destnode, 0, 
4501                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4502                            mem_ctx, &outdata, &res, &timeout, NULL);
4503         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4505                 return -1;
4506         }
4507
4508         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4509         talloc_free(outdata.dptr);
4510                     
4511         return 0;
4512 }
4513
4514 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4515 {
4516         if (h == NULL) {
4517                 return NULL;
4518         }
4519
4520         return &h->header;
4521 }
4522
4523
4524 struct ctdb_client_control_state *
4525 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4526 {
4527         struct ctdb_client_control_state *handle;
4528         struct ctdb_marshall_buffer *m;
4529         struct ctdb_rec_data *rec;
4530         TDB_DATA outdata;
4531
4532         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4533         if (m == NULL) {
4534                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4535                 return NULL;
4536         }
4537
4538         m->db_id = ctdb_db->db_id;
4539
4540         rec = ctdb_marshall_record(m, 0, key, header, data);
4541         if (rec == NULL) {
4542                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4543                 talloc_free(m);
4544                 return NULL;
4545         }
4546         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4547         if (m == NULL) {
4548                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4549                 talloc_free(m);
4550                 return NULL;
4551         }
4552         m->count++;
4553         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4554
4555
4556         outdata.dptr = (uint8_t *)m;
4557         outdata.dsize = talloc_get_size(m);
4558
4559         handle = ctdb_control_send(ctdb, destnode, 0, 
4560                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4561                            mem_ctx, &timeout, NULL);
4562         talloc_free(m);
4563         return handle;
4564 }
4565
4566 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4567 {
4568         int ret;
4569         int32_t res;
4570
4571         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4572         if ( (ret != 0) || (res != 0) ){
4573                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4574                 return -1;
4575         }
4576
4577         return 0;
4578 }
4579
4580 int
4581 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4582 {
4583         struct ctdb_client_control_state *state;
4584
4585         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4586         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4587 }
4588
4589
4590
4591
4592
4593
4594 /*
4595   set a database to be readonly
4596  */
4597 struct ctdb_client_control_state *
4598 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4599 {
4600         TDB_DATA data;
4601
4602         data.dptr = (uint8_t *)&dbid;
4603         data.dsize = sizeof(dbid);
4604
4605         return ctdb_control_send(ctdb, destnode, 0, 
4606                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4607                            ctdb, NULL, NULL);
4608 }
4609
4610 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4611 {
4612         int ret;
4613         int32_t res;
4614
4615         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4616         if (ret != 0 || res != 0) {
4617           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4618                 return -1;
4619         }
4620
4621         return 0;
4622 }
4623
4624 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4625 {
4626         struct ctdb_client_control_state *state;
4627
4628         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4629         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4630 }
4631
4632 /*
4633   set a database to be sticky
4634  */
4635 struct ctdb_client_control_state *
4636 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4637 {
4638         TDB_DATA data;
4639
4640         data.dptr = (uint8_t *)&dbid;
4641         data.dsize = sizeof(dbid);
4642
4643         return ctdb_control_send(ctdb, destnode, 0, 
4644                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4645                            ctdb, NULL, NULL);
4646 }
4647
4648 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4649 {
4650         int ret;
4651         int32_t res;
4652
4653         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4654         if (ret != 0 || res != 0) {
4655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4656                 return -1;
4657         }
4658
4659         return 0;
4660 }
4661
4662 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4663 {
4664         struct ctdb_client_control_state *state;
4665
4666         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4667         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4668 }