client: Treat empty __db_sequence_number__ record as 0
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   (long unsigned int)num*sizeof(uint8_t),
538                                   outdata.dsize));
539                 talloc_free(outdata.dptr);
540                 return -1;
541         }
542
543         for (i=0; i<num; i++) {
544                 result[i] = outdata.dptr[i];
545         }
546
547         talloc_free(outdata.dptr);
548         return 0;
549 }
550
551 /*
552   send a message - from client context
553  */
554 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
555                       uint64_t srvid, TDB_DATA data)
556 {
557         struct ctdb_req_message *r;
558         int len, res;
559
560         len = offsetof(struct ctdb_req_message, data) + data.dsize;
561         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
562                                len, struct ctdb_req_message);
563         CTDB_NO_MEMORY(ctdb, r);
564
565         r->hdr.destnode  = pnn;
566         r->srvid         = srvid;
567         r->datalen       = data.dsize;
568         memcpy(&r->data[0], data.dptr, data.dsize);
569         
570         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
571         talloc_free(r);
572         return res;
573 }
574
575
576 /*
577   cancel a ctdb_fetch_lock operation, releasing the lock
578  */
579 static int fetch_lock_destructor(struct ctdb_record_handle *h)
580 {
581         ctdb_ltdb_unlock(h->ctdb_db, h->key);
582         return 0;
583 }
584
585 /*
586   force the migration of a record to this node
587  */
588 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
589 {
590         struct ctdb_call call;
591         ZERO_STRUCT(call);
592         call.call_id = CTDB_NULL_FUNC;
593         call.key = key;
594         call.flags = CTDB_IMMEDIATE_MIGRATION;
595         return ctdb_call(ctdb_db, &call);
596 }
597
598 /*
599   try to fetch a readonly copy of a record
600  */
601 static int
602 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
603 {
604         int ret;
605
606         struct ctdb_call call;
607         ZERO_STRUCT(call);
608
609         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
610         call.call_data.dptr = NULL;
611         call.call_data.dsize = 0;
612         call.key = key;
613         call.flags = CTDB_WANT_READONLY;
614         ret = ctdb_call(ctdb_db, &call);
615
616         if (ret != 0) {
617                 return -1;
618         }
619         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
620                 return -1;
621         }
622
623         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
624         if (*hdr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 return -1;
627         }
628
629         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
630         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
631         if (data->dptr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 talloc_free(hdr);
634                 return -1;
635         }
636
637         return 0;
638 }
639
640 /*
641   get a lock on a record, and return the records data. Blocks until it gets the lock
642  */
643 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                                            TDB_DATA key, TDB_DATA *data)
645 {
646         int ret;
647         struct ctdb_record_handle *h;
648
649         /*
650           procedure is as follows:
651
652           1) get the chain lock. 
653           2) check if we are dmaster
654           3) if we are the dmaster then return handle 
655           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
656              reply from ctdbd
657           5) when we get the reply, goto (1)
658          */
659
660         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
661         if (h == NULL) {
662                 return NULL;
663         }
664
665         h->ctdb_db = ctdb_db;
666         h->key     = key;
667         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
668         if (h->key.dptr == NULL) {
669                 talloc_free(h);
670                 return NULL;
671         }
672         h->data    = data;
673
674         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
675                  (const char *)key.dptr));
676
677 again:
678         /* step 1 - get the chain lock */
679         ret = ctdb_ltdb_lock(ctdb_db, key);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
682                 talloc_free(h);
683                 return NULL;
684         }
685
686         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
687
688         talloc_set_destructor(h, fetch_lock_destructor);
689
690         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
691
692         /* when torturing, ensure we test the remote path */
693         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
694             random() % 5 == 0) {
695                 h->header.dmaster = (uint32_t)-1;
696         }
697
698
699         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
700
701         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
702                 ctdb_ltdb_unlock(ctdb_db, key);
703                 ret = ctdb_client_force_migration(ctdb_db, key);
704                 if (ret != 0) {
705                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
706                         talloc_free(h);
707                         return NULL;
708                 }
709                 goto again;
710         }
711
712         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
713         return h;
714 }
715
716 /*
717   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
718  */
719 struct ctdb_record_handle *
720 ctdb_fetch_readonly_lock(
721         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
722         TDB_DATA key, TDB_DATA *data,
723         int read_only)
724 {
725         int ret;
726         struct ctdb_record_handle *h;
727         struct ctdb_ltdb_header *roheader = NULL;
728
729         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
730         if (h == NULL) {
731                 return NULL;
732         }
733
734         h->ctdb_db = ctdb_db;
735         h->key     = key;
736         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
737         if (h->key.dptr == NULL) {
738                 talloc_free(h);
739                 return NULL;
740         }
741         h->data    = data;
742
743         data->dptr = NULL;
744         data->dsize = 0;
745
746
747 again:
748         talloc_free(roheader);
749         roheader = NULL;
750
751         talloc_free(data->dptr);
752         data->dptr = NULL;
753         data->dsize = 0;
754
755         /* Lock the record/chain */
756         ret = ctdb_ltdb_lock(ctdb_db, key);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
759                 talloc_free(h);
760                 return NULL;
761         }
762
763         talloc_set_destructor(h, fetch_lock_destructor);
764
765         /* Check if record exists yet in the TDB */
766         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
767         if (ret != 0) {
768                 ctdb_ltdb_unlock(ctdb_db, key);
769                 ret = ctdb_client_force_migration(ctdb_db, key);
770                 if (ret != 0) {
771                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
772                         talloc_free(h);
773                         return NULL;
774                 }
775                 goto again;
776         }
777
778         /* if this is a request for read/write and we have delegations
779            we have to revoke all delegations first
780         */
781         if ((read_only == 0) 
782         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
783         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_force_migration(ctdb_db, key);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
788                         talloc_free(h);
789                         return NULL;
790                 }
791                 goto again;
792         }
793
794         /* if we are dmaster, just return the handle */
795         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
796                 return h;
797         }
798
799         if (read_only != 0) {
800                 TDB_DATA rodata = {NULL, 0};
801
802                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
803                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
804                         return h;
805                 }
806
807                 ctdb_ltdb_unlock(ctdb_db, key);
808                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
809                 if (ret != 0) {
810                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
811                         ret = ctdb_client_force_migration(ctdb_db, key);
812                         if (ret != 0) {
813                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
814                                 talloc_free(h);
815                                 return NULL;
816                         }
817
818                         goto again;
819                 }
820
821                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
822                         ret = ctdb_client_force_migration(ctdb_db, key);
823                         if (ret != 0) {
824                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
825                                 talloc_free(h);
826                                 return NULL;
827                         }
828
829                         goto again;
830                 }
831
832                 ret = ctdb_ltdb_lock(ctdb_db, key);
833                 if (ret != 0) {
834                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
835                         talloc_free(h);
836                         return NULL;
837                 }
838
839                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
840                 if (ret != 0) {
841                         ctdb_ltdb_unlock(ctdb_db, key);
842
843                         ret = ctdb_client_force_migration(ctdb_db, key);
844                         if (ret != 0) {
845                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
846                                 talloc_free(h);
847                                 return NULL;
848                         }
849
850                         goto again;
851                 }
852
853                 return h;
854         }
855
856         /* we are not dmaster and this was not a request for a readonly lock
857          * so unlock the record, migrate it and try again
858          */
859         ctdb_ltdb_unlock(ctdb_db, key);
860         ret = ctdb_client_force_migration(ctdb_db, key);
861         if (ret != 0) {
862                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
863                 talloc_free(h);
864                 return NULL;
865         }
866         goto again;
867 }
868
869 /*
870   store some data to the record that was locked with ctdb_fetch_lock()
871 */
872 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
873 {
874         if (h->ctdb_db->persistent) {
875                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
876                 return -1;
877         }
878
879         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
880 }
881
882 /*
883   non-locking fetch of a record
884  */
885 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
886                TDB_DATA key, TDB_DATA *data)
887 {
888         struct ctdb_call call;
889         int ret;
890
891         call.call_id = CTDB_FETCH_FUNC;
892         call.call_data.dptr = NULL;
893         call.call_data.dsize = 0;
894         call.key = key;
895
896         ret = ctdb_call(ctdb_db, &call);
897
898         if (ret == 0) {
899                 *data = call.reply_data;
900                 talloc_steal(mem_ctx, data->dptr);
901         }
902
903         return ret;
904 }
905
906
907
908 /*
909    called when a control completes or timesout to invoke the callback
910    function the user provided
911 */
912 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
913         struct timeval t, void *private_data)
914 {
915         struct ctdb_client_control_state *state;
916         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
917         int ret;
918
919         state = talloc_get_type(private_data, struct ctdb_client_control_state);
920         talloc_steal(tmp_ctx, state);
921
922         ret = ctdb_control_recv(state->ctdb, state, state,
923                         NULL, 
924                         NULL, 
925                         NULL);
926         if (ret != 0) {
927                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
928         }
929
930         talloc_free(tmp_ctx);
931 }
932
933 /*
934   called when a CTDB_REPLY_CONTROL packet comes in in the client
935
936   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
937   contains any reply data from the control
938 */
939 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
940                                       struct ctdb_req_header *hdr)
941 {
942         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
943         struct ctdb_client_control_state *state;
944
945         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
946         if (state == NULL) {
947                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
948                 return;
949         }
950
951         if (hdr->reqid != state->reqid) {
952                 /* we found a record  but it was the wrong one */
953                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
954                 return;
955         }
956
957         state->outdata.dptr = c->data;
958         state->outdata.dsize = c->datalen;
959         state->status = c->status;
960         if (c->errorlen) {
961                 state->errormsg = talloc_strndup(state, 
962                                                  (char *)&c->data[c->datalen], 
963                                                  c->errorlen);
964         }
965
966         /* state->outdata now uses resources from c so we dont want c
967            to just dissappear from under us while state is still alive
968         */
969         talloc_steal(state, c);
970
971         state->state = CTDB_CONTROL_DONE;
972
973         /* if we had a callback registered for this control, pull the response
974            and call the callback.
975         */
976         if (state->async.fn) {
977                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
978         }
979 }
980
981
982 /*
983   destroy a ctdb_control in client
984 */
985 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
986 {
987         ctdb_reqid_remove(state->ctdb, state->reqid);
988         return 0;
989 }
990
991
992 /* time out handler for ctdb_control */
993 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
994         struct timeval t, void *private_data)
995 {
996         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
997
998         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
999                          "dstnode:%u\n", state->reqid, state->c->opcode,
1000                          state->c->hdr.destnode));
1001
1002         state->state = CTDB_CONTROL_TIMEOUT;
1003
1004         /* if we had a callback registered for this control, pull the response
1005            and call the callback.
1006         */
1007         if (state->async.fn) {
1008                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1009         }
1010 }
1011
1012 /* async version of send control request */
1013 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1014                 uint32_t destnode, uint64_t srvid, 
1015                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1016                 TALLOC_CTX *mem_ctx,
1017                 struct timeval *timeout,
1018                 char **errormsg)
1019 {
1020         struct ctdb_client_control_state *state;
1021         size_t len;
1022         struct ctdb_req_control *c;
1023         int ret;
1024
1025         if (errormsg) {
1026                 *errormsg = NULL;
1027         }
1028
1029         /* if the domain socket is not yet open, open it */
1030         if (ctdb->daemon.sd==-1) {
1031                 ctdb_socket_connect(ctdb);
1032         }
1033
1034         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1035         CTDB_NO_MEMORY_NULL(ctdb, state);
1036
1037         state->ctdb       = ctdb;
1038         state->reqid      = ctdb_reqid_new(ctdb, state);
1039         state->state      = CTDB_CONTROL_WAIT;
1040         state->errormsg   = NULL;
1041
1042         talloc_set_destructor(state, ctdb_client_control_destructor);
1043
1044         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1045         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1046                                len, struct ctdb_req_control);
1047         state->c            = c;        
1048         CTDB_NO_MEMORY_NULL(ctdb, c);
1049         c->hdr.reqid        = state->reqid;
1050         c->hdr.destnode     = destnode;
1051         c->opcode           = opcode;
1052         c->client_id        = 0;
1053         c->flags            = flags;
1054         c->srvid            = srvid;
1055         c->datalen          = data.dsize;
1056         if (data.dsize) {
1057                 memcpy(&c->data[0], data.dptr, data.dsize);
1058         }
1059
1060         /* timeout */
1061         if (timeout && !timeval_is_zero(timeout)) {
1062                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1063         }
1064
1065         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1066         if (ret != 0) {
1067                 talloc_free(state);
1068                 return NULL;
1069         }
1070
1071         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1072                 talloc_free(state);
1073                 return NULL;
1074         }
1075
1076         return state;
1077 }
1078
1079
1080 /* async version of receive control reply */
1081 int ctdb_control_recv(struct ctdb_context *ctdb, 
1082                 struct ctdb_client_control_state *state, 
1083                 TALLOC_CTX *mem_ctx,
1084                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1085 {
1086         TALLOC_CTX *tmp_ctx;
1087
1088         if (status != NULL) {
1089                 *status = -1;
1090         }
1091         if (errormsg != NULL) {
1092                 *errormsg = NULL;
1093         }
1094
1095         if (state == NULL) {
1096                 return -1;
1097         }
1098
1099         /* prevent double free of state */
1100         tmp_ctx = talloc_new(ctdb);
1101         talloc_steal(tmp_ctx, state);
1102
1103         /* loop one event at a time until we either timeout or the control
1104            completes.
1105         */
1106         while (state->state == CTDB_CONTROL_WAIT) {
1107                 event_loop_once(ctdb->ev);
1108         }
1109
1110         if (state->state != CTDB_CONTROL_DONE) {
1111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1112                 if (state->async.fn) {
1113                         state->async.fn(state);
1114                 }
1115                 talloc_free(tmp_ctx);
1116                 return -1;
1117         }
1118
1119         if (state->errormsg) {
1120                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1121                 if (errormsg) {
1122                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1123                 }
1124                 if (state->async.fn) {
1125                         state->async.fn(state);
1126                 }
1127                 talloc_free(tmp_ctx);
1128                 return -1;
1129         }
1130
1131         if (outdata) {
1132                 *outdata = state->outdata;
1133                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1134         }
1135
1136         if (status) {
1137                 *status = state->status;
1138         }
1139
1140         if (state->async.fn) {
1141                 state->async.fn(state);
1142         }
1143
1144         talloc_free(tmp_ctx);
1145         return 0;
1146 }
1147
1148
1149
1150 /*
1151   send a ctdb control message
1152   timeout specifies how long we should wait for a reply.
1153   if timeout is NULL we wait indefinitely
1154  */
1155 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1156                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1157                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1158                  struct timeval *timeout,
1159                  char **errormsg)
1160 {
1161         struct ctdb_client_control_state *state;
1162
1163         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1164                         flags, data, mem_ctx,
1165                         timeout, errormsg);
1166
1167         /* FIXME: Error conditions in ctdb_control_send return NULL without
1168          * setting errormsg.  So, there is no way to distinguish between sucess
1169          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1170         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1171                 if (status != NULL) {
1172                         *status = 0;
1173                 }
1174                 return 0;
1175         }
1176
1177         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1178                         errormsg);
1179 }
1180
1181
1182
1183
1184 /*
1185   a process exists call. Returns 0 if process exists, -1 otherwise
1186  */
1187 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1188 {
1189         int ret;
1190         TDB_DATA data;
1191         int32_t status;
1192
1193         data.dptr = (uint8_t*)&pid;
1194         data.dsize = sizeof(pid);
1195
1196         ret = ctdb_control(ctdb, destnode, 0, 
1197                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1198                            NULL, NULL, &status, NULL, NULL);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1201                 return -1;
1202         }
1203
1204         return status;
1205 }
1206
1207 /*
1208   get remote statistics
1209  */
1210 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1211 {
1212         int ret;
1213         TDB_DATA data;
1214         int32_t res;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1218                            ctdb, &data, &res, NULL, NULL);
1219         if (ret != 0 || res != 0) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1221                 return -1;
1222         }
1223
1224         if (data.dsize != sizeof(struct ctdb_statistics)) {
1225                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1226                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1227                       return -1;
1228         }
1229
1230         *status = *(struct ctdb_statistics *)data.dptr;
1231         talloc_free(data.dptr);
1232                         
1233         return 0;
1234 }
1235
1236 /*
1237  * get db statistics
1238  */
1239 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1240                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1241 {
1242         int ret;
1243         TDB_DATA indata, outdata;
1244         int32_t res;
1245         struct ctdb_db_statistics *wire, *s;
1246         char *ptr;
1247         int i;
1248
1249         indata.dptr = (uint8_t *)&dbid;
1250         indata.dsize = sizeof(dbid);
1251
1252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1253                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1254         if (ret != 0 || res != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1256                 return -1;
1257         }
1258
1259         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1260                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1261                                  outdata.dsize,
1262                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1263                 return -1;
1264         }
1265
1266         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1267         if (s == NULL) {
1268                 talloc_free(outdata.dptr);
1269                 CTDB_NO_MEMORY(ctdb, s);
1270         }
1271
1272         wire = (struct ctdb_db_statistics *)outdata.dptr;
1273         *s = *wire;
1274         ptr = &wire->hot_keys_wire[0];
1275         for (i=0; i<wire->num_hot_keys; i++) {
1276                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1277                 if (s->hot_keys[i].key.dptr == NULL) {
1278                         talloc_free(outdata.dptr);
1279                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1280                 }
1281
1282                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1283                 ptr += wire->hot_keys[i].key.dsize;
1284         }
1285
1286         talloc_free(outdata.dptr);
1287         *dbstat = s;
1288         return 0;
1289 }
1290
1291 /*
1292   shutdown a remote ctdb node
1293  */
1294 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1295 {
1296         struct ctdb_client_control_state *state;
1297
1298         state = ctdb_control_send(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1300                            NULL, &timeout, NULL);
1301         if (state == NULL) {
1302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1303                 return -1;
1304         }
1305
1306         return 0;
1307 }
1308
1309 /*
1310   get vnn map from a remote node
1311  */
1312 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1313 {
1314         int ret;
1315         TDB_DATA outdata;
1316         int32_t res;
1317         struct ctdb_vnn_map_wire *map;
1318
1319         ret = ctdb_control(ctdb, destnode, 0, 
1320                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1321                            mem_ctx, &outdata, &res, &timeout, NULL);
1322         if (ret != 0 || res != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1324                 return -1;
1325         }
1326         
1327         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1328         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1329             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1330                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1331                 return -1;
1332         }
1333
1334         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1335         CTDB_NO_MEMORY(ctdb, *vnnmap);
1336         (*vnnmap)->generation = map->generation;
1337         (*vnnmap)->size       = map->size;
1338         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1339
1340         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1341         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1342         talloc_free(outdata.dptr);
1343                     
1344         return 0;
1345 }
1346
1347
1348 /*
1349   get the recovery mode of a remote node
1350  */
1351 struct ctdb_client_control_state *
1352 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_control_send(ctdb, destnode, 0, 
1355                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1356                            mem_ctx, &timeout, NULL);
1357 }
1358
1359 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1360 {
1361         int ret;
1362         int32_t res;
1363
1364         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1367                 return -1;
1368         }
1369
1370         if (recmode) {
1371                 *recmode = (uint32_t)res;
1372         }
1373
1374         return 0;
1375 }
1376
1377 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1378 {
1379         struct ctdb_client_control_state *state;
1380
1381         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1382         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1383 }
1384
1385
1386
1387
1388 /*
1389   set the recovery mode of a remote node
1390  */
1391 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1392 {
1393         int ret;
1394         TDB_DATA data;
1395         int32_t res;
1396
1397         data.dsize = sizeof(uint32_t);
1398         data.dptr = (unsigned char *)&recmode;
1399
1400         ret = ctdb_control(ctdb, destnode, 0, 
1401                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1402                            NULL, NULL, &res, &timeout, NULL);
1403         if (ret != 0 || res != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1405                 return -1;
1406         }
1407
1408         return 0;
1409 }
1410
1411
1412
1413 /*
1414   get the recovery master of a remote node
1415  */
1416 struct ctdb_client_control_state *
1417 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1418                         struct timeval timeout, uint32_t destnode)
1419 {
1420         return ctdb_control_send(ctdb, destnode, 0, 
1421                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1422                            mem_ctx, &timeout, NULL);
1423 }
1424
1425 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1431         if (ret != 0) {
1432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1433                 return -1;
1434         }
1435
1436         if (recmaster) {
1437                 *recmaster = (uint32_t)res;
1438         }
1439
1440         return 0;
1441 }
1442
1443 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1444 {
1445         struct ctdb_client_control_state *state;
1446
1447         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1448         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1449 }
1450
1451
1452 /*
1453   set the recovery master of a remote node
1454  */
1455 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1456 {
1457         int ret;
1458         TDB_DATA data;
1459         int32_t res;
1460
1461         ZERO_STRUCT(data);
1462         data.dsize = sizeof(uint32_t);
1463         data.dptr = (unsigned char *)&recmaster;
1464
1465         ret = ctdb_control(ctdb, destnode, 0, 
1466                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1467                            NULL, NULL, &res, &timeout, NULL);
1468         if (ret != 0 || res != 0) {
1469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1470                 return -1;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   get a list of databases off a remote node
1479  */
1480 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1481                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1482 {
1483         int ret;
1484         TDB_DATA outdata;
1485         int32_t res;
1486
1487         ret = ctdb_control(ctdb, destnode, 0, 
1488                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1489                            mem_ctx, &outdata, &res, &timeout, NULL);
1490         if (ret != 0 || res != 0) {
1491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1492                 return -1;
1493         }
1494
1495         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1496         talloc_free(outdata.dptr);
1497                     
1498         return 0;
1499 }
1500
1501 /*
1502   get a list of nodes (vnn and flags ) from a remote node
1503  */
1504 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1505                 struct timeval timeout, uint32_t destnode, 
1506                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1507 {
1508         int ret;
1509         TDB_DATA outdata;
1510         int32_t res;
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1514                            mem_ctx, &outdata, &res, &timeout, NULL);
1515         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1516                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1517                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1518         }
1519         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1521                 return -1;
1522         }
1523
1524         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1525         talloc_free(outdata.dptr);
1526                     
1527         return 0;
1528 }
1529
1530 /*
1531   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1532  */
1533 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1534                 struct timeval timeout, uint32_t destnode, 
1535                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1536 {
1537         int ret, i, len;
1538         TDB_DATA outdata;
1539         struct ctdb_node_mapv4 *nodemapv4;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1551
1552         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1553         (*nodemap) = talloc_zero_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, (*nodemap));
1555
1556         (*nodemap)->num = nodemapv4->num;
1557         for (i=0; i<nodemapv4->num; i++) {
1558                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1559                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1560                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1561                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1562         }
1563                 
1564         talloc_free(outdata.dptr);
1565                     
1566         return 0;
1567 }
1568
1569 /*
1570   drop the transport, reload the nodes file and restart the transport
1571  */
1572 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1573                     struct timeval timeout, uint32_t destnode)
1574 {
1575         int ret;
1576         int32_t res;
1577
1578         ret = ctdb_control(ctdb, destnode, 0, 
1579                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1580                            NULL, NULL, &res, &timeout, NULL);
1581         if (ret != 0 || res != 0) {
1582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1583                 return -1;
1584         }
1585
1586         return 0;
1587 }
1588
1589
1590 /*
1591   set vnn map on a node
1592  */
1593 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1594                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1595 {
1596         int ret;
1597         TDB_DATA data;
1598         int32_t res;
1599         struct ctdb_vnn_map_wire *map;
1600         size_t len;
1601
1602         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1603         map = talloc_size(mem_ctx, len);
1604         CTDB_NO_MEMORY(ctdb, map);
1605
1606         map->generation = vnnmap->generation;
1607         map->size = vnnmap->size;
1608         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1609         
1610         data.dsize = len;
1611         data.dptr  = (uint8_t *)map;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1615                            NULL, NULL, &res, &timeout, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1618                 return -1;
1619         }
1620
1621         talloc_free(map);
1622
1623         return 0;
1624 }
1625
1626
1627 /*
1628   async send for pull database
1629  */
1630 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1631         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1632         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1633 {
1634         TDB_DATA indata;
1635         struct ctdb_control_pulldb *pull;
1636         struct ctdb_client_control_state *state;
1637
1638         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1639         CTDB_NO_MEMORY_NULL(ctdb, pull);
1640
1641         pull->db_id   = dbid;
1642         pull->lmaster = lmaster;
1643
1644         indata.dsize = sizeof(struct ctdb_control_pulldb);
1645         indata.dptr  = (unsigned char *)pull;
1646
1647         state = ctdb_control_send(ctdb, destnode, 0, 
1648                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1649                                   mem_ctx, &timeout, NULL);
1650         talloc_free(pull);
1651
1652         return state;
1653 }
1654
1655 /*
1656   async recv for pull database
1657  */
1658 int ctdb_ctrl_pulldb_recv(
1659         struct ctdb_context *ctdb, 
1660         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1661         TDB_DATA *outdata)
1662 {
1663         int ret;
1664         int32_t res;
1665
1666         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1667         if ( (ret != 0) || (res != 0) ){
1668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1669                 return -1;
1670         }
1671
1672         return 0;
1673 }
1674
1675 /*
1676   pull all keys and records for a specific database on a node
1677  */
1678 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1679                 uint32_t dbid, uint32_t lmaster, 
1680                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1681                 TDB_DATA *outdata)
1682 {
1683         struct ctdb_client_control_state *state;
1684
1685         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1686                                       timeout);
1687         
1688         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1689 }
1690
1691
1692 /*
1693   change dmaster for all keys in the database to the new value
1694  */
1695 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1696                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1697 {
1698         int ret;
1699         TDB_DATA indata;
1700         int32_t res;
1701
1702         indata.dsize = 2*sizeof(uint32_t);
1703         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1704
1705         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1706         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1710                            NULL, NULL, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1713                 return -1;
1714         }
1715
1716         return 0;
1717 }
1718
1719 /*
1720   ping a node, return number of clients connected
1721  */
1722 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1723 {
1724         int ret;
1725         int32_t res;
1726
1727         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1728                            tdb_null, NULL, NULL, &res, NULL, NULL);
1729         if (ret != 0) {
1730                 return -1;
1731         }
1732         return res;
1733 }
1734
1735 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1736                            struct timeval timeout, 
1737                            uint32_t destnode,
1738                            uint32_t *runstate)
1739 {
1740         TDB_DATA outdata;
1741         int32_t res;
1742         int ret;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1745                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1746         if (ret != 0 || res != 0) {
1747                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1748                 return ret != 0 ? ret : res;
1749         }
1750
1751         if (outdata.dsize != sizeof(uint32_t)) {
1752                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1753                 talloc_free(outdata.dptr);
1754                 return -1;
1755         }
1756
1757         if (runstate != NULL) {
1758                 *runstate = *(uint32_t *)outdata.dptr;
1759         }
1760         talloc_free(outdata.dptr);
1761
1762         return 0;
1763 }
1764
1765 /*
1766   find the real path to a ltdb 
1767  */
1768 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1769                    const char **path)
1770 {
1771         int ret;
1772         int32_t res;
1773         TDB_DATA data;
1774
1775         data.dptr = (uint8_t *)&dbid;
1776         data.dsize = sizeof(dbid);
1777
1778         ret = ctdb_control(ctdb, destnode, 0, 
1779                            CTDB_CONTROL_GETDBPATH, 0, data, 
1780                            mem_ctx, &data, &res, &timeout, NULL);
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1786         if ((*path) == NULL) {
1787                 return -1;
1788         }
1789
1790         talloc_free(data.dptr);
1791
1792         return 0;
1793 }
1794
1795 /*
1796   find the name of a db 
1797  */
1798 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1799                    const char **name)
1800 {
1801         int ret;
1802         int32_t res;
1803         TDB_DATA data;
1804
1805         data.dptr = (uint8_t *)&dbid;
1806         data.dsize = sizeof(dbid);
1807
1808         ret = ctdb_control(ctdb, destnode, 0, 
1809                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1810                            mem_ctx, &data, &res, &timeout, NULL);
1811         if (ret != 0 || res != 0) {
1812                 return -1;
1813         }
1814
1815         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1816         if ((*name) == NULL) {
1817                 return -1;
1818         }
1819
1820         talloc_free(data.dptr);
1821
1822         return 0;
1823 }
1824
1825 /*
1826   get the health status of a db
1827  */
1828 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1829                           struct timeval timeout,
1830                           uint32_t destnode,
1831                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1832                           const char **reason)
1833 {
1834         int ret;
1835         int32_t res;
1836         TDB_DATA data;
1837
1838         data.dptr = (uint8_t *)&dbid;
1839         data.dsize = sizeof(dbid);
1840
1841         ret = ctdb_control(ctdb, destnode, 0,
1842                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1843                            mem_ctx, &data, &res, &timeout, NULL);
1844         if (ret != 0 || res != 0) {
1845                 return -1;
1846         }
1847
1848         if (data.dsize == 0) {
1849                 (*reason) = NULL;
1850                 return 0;
1851         }
1852
1853         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1854         if ((*reason) == NULL) {
1855                 return -1;
1856         }
1857
1858         talloc_free(data.dptr);
1859
1860         return 0;
1861 }
1862
1863 /*
1864  * get db sequence number
1865  */
1866 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1867                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1868 {
1869         int ret;
1870         int32_t res;
1871         TDB_DATA data, outdata;
1872
1873         data.dptr = (uint8_t *)&dbid;
1874         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1875
1876         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1877                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1878         if (ret != 0 || res != 0) {
1879                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1880                 return -1;
1881         }
1882
1883         if (outdata.dsize != sizeof(uint64_t)) {
1884                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1885                 talloc_free(outdata.dptr);
1886                 return -1;
1887         }
1888
1889         if (seqnum != NULL) {
1890                 *seqnum = *(uint64_t *)outdata.dptr;
1891         }
1892         talloc_free(outdata.dptr);
1893
1894         return 0;
1895 }
1896
1897 /*
1898   create a database
1899  */
1900 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1901                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1902 {
1903         int ret;
1904         int32_t res;
1905         TDB_DATA data;
1906         uint64_t tdb_flags = 0;
1907
1908         data.dptr = discard_const(name);
1909         data.dsize = strlen(name)+1;
1910
1911         /* Make sure that volatile databases use jenkins hash */
1912         if (!persistent) {
1913                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1914         }
1915
1916         ret = ctdb_control(ctdb, destnode, tdb_flags,
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1918                            0, data, 
1919                            mem_ctx, &data, &res, &timeout, NULL);
1920
1921         if (ret != 0 || res != 0) {
1922                 return -1;
1923         }
1924
1925         return 0;
1926 }
1927
1928 /*
1929   get debug level on a node
1930  */
1931 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1932 {
1933         int ret;
1934         int32_t res;
1935         TDB_DATA data;
1936
1937         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1938                            ctdb, &data, &res, NULL, NULL);
1939         if (ret != 0 || res != 0) {
1940                 return -1;
1941         }
1942         if (data.dsize != sizeof(int32_t)) {
1943                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1944                          (unsigned)data.dsize));
1945                 return -1;
1946         }
1947         *level = *(int32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949         return 0;
1950 }
1951
1952 /*
1953   set debug level on a node
1954  */
1955 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1956 {
1957         int ret;
1958         int32_t res;
1959         TDB_DATA data;
1960
1961         data.dptr = (uint8_t *)&level;
1962         data.dsize = sizeof(level);
1963
1964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1965                            NULL, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 return -1;
1968         }
1969         return 0;
1970 }
1971
1972
1973 /*
1974   get a list of connected nodes
1975  */
1976 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1977                                 struct timeval timeout,
1978                                 TALLOC_CTX *mem_ctx,
1979                                 uint32_t *num_nodes)
1980 {
1981         struct ctdb_node_map *map=NULL;
1982         int ret, i;
1983         uint32_t *nodes;
1984
1985         *num_nodes = 0;
1986
1987         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1988         if (ret != 0) {
1989                 return NULL;
1990         }
1991
1992         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1993         if (nodes == NULL) {
1994                 return NULL;
1995         }
1996
1997         for (i=0;i<map->num;i++) {
1998                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1999                         nodes[*num_nodes] = map->nodes[i].pnn;
2000                         (*num_nodes)++;
2001                 }
2002         }
2003
2004         return nodes;
2005 }
2006
2007
2008 /*
2009   reset remote status
2010  */
2011 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2012 {
2013         int ret;
2014         int32_t res;
2015
2016         ret = ctdb_control(ctdb, destnode, 0, 
2017                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2018                            NULL, NULL, &res, NULL, NULL);
2019         if (ret != 0 || res != 0) {
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2021                 return -1;
2022         }
2023         return 0;
2024 }
2025
2026 /*
2027   attach to a specific database - client call
2028 */
2029 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2030                                     struct timeval timeout,
2031                                     const char *name,
2032                                     bool persistent,
2033                                     uint32_t tdb_flags)
2034 {
2035         struct ctdb_db_context *ctdb_db;
2036         TDB_DATA data;
2037         int ret;
2038         int32_t res;
2039
2040         ctdb_db = ctdb_db_handle(ctdb, name);
2041         if (ctdb_db) {
2042                 return ctdb_db;
2043         }
2044
2045         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2046         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2047
2048         ctdb_db->ctdb = ctdb;
2049         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2050         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2051
2052         data.dptr = discard_const(name);
2053         data.dsize = strlen(name)+1;
2054
2055         /* CTDB has switched to using jenkins hash for volatile databases.
2056          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2057          * always set it.
2058          */
2059         if (!persistent) {
2060                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2061         }
2062
2063         /* tell ctdb daemon to attach */
2064         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2065                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2066                            0, data, ctdb_db, &data, &res, NULL, NULL);
2067         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2068                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2069                 talloc_free(ctdb_db);
2070                 return NULL;
2071         }
2072         
2073         ctdb_db->db_id = *(uint32_t *)data.dptr;
2074         talloc_free(data.dptr);
2075
2076         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2079                 talloc_free(ctdb_db);
2080                 return NULL;
2081         }
2082
2083         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2084         if (ctdb->valgrinding) {
2085                 tdb_flags |= TDB_NOMMAP;
2086         }
2087         tdb_flags |= TDB_DISALLOW_NESTING;
2088
2089         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2090         if (ctdb_db->ltdb == NULL) {
2091                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2092                 talloc_free(ctdb_db);
2093                 return NULL;
2094         }
2095
2096         ctdb_db->persistent = persistent;
2097
2098         DLIST_ADD(ctdb->db_list, ctdb_db);
2099
2100         /* add well known functions */
2101         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2102         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2103         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2104
2105         return ctdb_db;
2106 }
2107
2108
2109 /*
2110   setup a call for a database
2111  */
2112 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2113 {
2114         struct ctdb_registered_call *call;
2115
2116 #if 0
2117         TDB_DATA data;
2118         int32_t status;
2119         struct ctdb_control_set_call c;
2120         int ret;
2121
2122         /* this is no longer valid with the separate daemon architecture */
2123         c.db_id = ctdb_db->db_id;
2124         c.fn    = fn;
2125         c.id    = id;
2126
2127         data.dptr = (uint8_t *)&c;
2128         data.dsize = sizeof(c);
2129
2130         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2131                            data, NULL, NULL, &status, NULL, NULL);
2132         if (ret != 0 || status != 0) {
2133                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2134                 return -1;
2135         }
2136 #endif
2137
2138         /* also register locally */
2139         call = talloc(ctdb_db, struct ctdb_registered_call);
2140         call->fn = fn;
2141         call->id = id;
2142
2143         DLIST_ADD(ctdb_db->calls, call);        
2144         return 0;
2145 }
2146
2147
2148 struct traverse_state {
2149         bool done;
2150         uint32_t count;
2151         ctdb_traverse_func fn;
2152         void *private_data;
2153         bool listemptyrecords;
2154 };
2155
2156 /*
2157   called on each key during a ctdb_traverse
2158  */
2159 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2160 {
2161         struct traverse_state *state = (struct traverse_state *)p;
2162         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2163         TDB_DATA key;
2164
2165         if (data.dsize < sizeof(uint32_t) ||
2166             d->length != data.dsize) {
2167                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2168                 state->done = true;
2169                 return;
2170         }
2171
2172         key.dsize = d->keylen;
2173         key.dptr  = &d->data[0];
2174         data.dsize = d->datalen;
2175         data.dptr = &d->data[d->keylen];
2176
2177         if (key.dsize == 0 && data.dsize == 0) {
2178                 /* end of traverse */
2179                 state->done = true;
2180                 return;
2181         }
2182
2183         if (!state->listemptyrecords &&
2184             data.dsize == sizeof(struct ctdb_ltdb_header))
2185         {
2186                 /* empty records are deleted records in ctdb */
2187                 return;
2188         }
2189
2190         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2191                 state->done = true;
2192         }
2193
2194         state->count++;
2195 }
2196
2197 /**
2198  * start a cluster wide traverse, calling the supplied fn on each record
2199  * return the number of records traversed, or -1 on error
2200  *
2201  * Extendet variant with a flag to signal whether empty records should
2202  * be listed.
2203  */
2204 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2205                              ctdb_traverse_func fn,
2206                              bool withemptyrecords,
2207                              void *private_data)
2208 {
2209         TDB_DATA data;
2210         struct ctdb_traverse_start_ext t;
2211         int32_t status;
2212         int ret;
2213         uint64_t srvid = (getpid() | 0xFLL<<60);
2214         struct traverse_state state;
2215
2216         state.done = false;
2217         state.count = 0;
2218         state.private_data = private_data;
2219         state.fn = fn;
2220         state.listemptyrecords = withemptyrecords;
2221
2222         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2223         if (ret != 0) {
2224                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2225                 return -1;
2226         }
2227
2228         t.db_id = ctdb_db->db_id;
2229         t.srvid = srvid;
2230         t.reqid = 0;
2231         t.withemptyrecords = withemptyrecords;
2232
2233         data.dptr = (uint8_t *)&t;
2234         data.dsize = sizeof(t);
2235
2236         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2237                            data, NULL, NULL, &status, NULL, NULL);
2238         if (ret != 0 || status != 0) {
2239                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2240                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2241                 return -1;
2242         }
2243
2244         while (!state.done) {
2245                 event_loop_once(ctdb_db->ctdb->ev);
2246         }
2247
2248         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2251                 return -1;
2252         }
2253
2254         return state.count;
2255 }
2256
2257 /**
2258  * start a cluster wide traverse, calling the supplied fn on each record
2259  * return the number of records traversed, or -1 on error
2260  *
2261  * Standard version which does not list the empty records:
2262  * These are considered deleted.
2263  */
2264 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2265 {
2266         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2267 }
2268
2269 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2270 /*
2271   called on each key during a catdb
2272  */
2273 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2274 {
2275         int i;
2276         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2277         FILE *f = c->f;
2278         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2279
2280         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2281         for (i=0;i<key.dsize;i++) {
2282                 if (ISASCII(key.dptr[i])) {
2283                         fprintf(f, "%c", key.dptr[i]);
2284                 } else {
2285                         fprintf(f, "\\%02X", key.dptr[i]);
2286                 }
2287         }
2288         fprintf(f, "\"\n");
2289
2290         fprintf(f, "dmaster: %u\n", h->dmaster);
2291         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2292
2293         if (c->printlmaster && ctdb->vnn_map != NULL) {
2294                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2295         }
2296
2297         if (c->printhash) {
2298                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2299         }
2300
2301         if (c->printrecordflags) {
2302                 fprintf(f, "flags: 0x%08x", h->flags);
2303                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2304                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2305                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2306                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2307                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2308                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2309                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2310                 fprintf(f, "\n");
2311         }
2312
2313         if (c->printdatasize) {
2314                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2315         } else {
2316                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2317                 for (i=sizeof(*h);i<data.dsize;i++) {
2318                         if (ISASCII(data.dptr[i])) {
2319                                 fprintf(f, "%c", data.dptr[i]);
2320                         } else {
2321                                 fprintf(f, "\\%02X", data.dptr[i]);
2322                         }
2323                 }
2324                 fprintf(f, "\"\n");
2325         }
2326
2327         fprintf(f, "\n");
2328
2329         return 0;
2330 }
2331
2332 /*
2333   convenience function to list all keys to stdout
2334  */
2335 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2336                  struct ctdb_dump_db_context *ctx)
2337 {
2338         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2339                                  ctx->printemptyrecords, ctx);
2340 }
2341
2342 /*
2343   get the pid of a ctdb daemon
2344  */
2345 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2346 {
2347         int ret;
2348         int32_t res;
2349
2350         ret = ctdb_control(ctdb, destnode, 0, 
2351                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2352                            NULL, NULL, &res, &timeout, NULL);
2353         if (ret != 0) {
2354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2355                 return -1;
2356         }
2357
2358         *pid = res;
2359
2360         return 0;
2361 }
2362
2363
2364 /*
2365   async freeze send control
2366  */
2367 struct ctdb_client_control_state *
2368 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2369 {
2370         return ctdb_control_send(ctdb, destnode, priority, 
2371                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2372                            mem_ctx, &timeout, NULL);
2373 }
2374
2375 /* 
2376    async freeze recv control
2377 */
2378 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2384         if ( (ret != 0) || (res != 0) ){
2385                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2386                 return -1;
2387         }
2388
2389         return 0;
2390 }
2391
2392 /*
2393   freeze databases of a certain priority
2394  */
2395 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2396 {
2397         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2398         struct ctdb_client_control_state *state;
2399         int ret;
2400
2401         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2402         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2403         talloc_free(tmp_ctx);
2404
2405         return ret;
2406 }
2407
2408 /* Freeze all databases */
2409 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2410 {
2411         int i;
2412
2413         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2414                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2415                         return -1;
2416                 }
2417         }
2418         return 0;
2419 }
2420
2421 /*
2422   thaw databases of a certain priority
2423  */
2424 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2425 {
2426         int ret;
2427         int32_t res;
2428
2429         ret = ctdb_control(ctdb, destnode, priority, 
2430                            CTDB_CONTROL_THAW, 0, tdb_null, 
2431                            NULL, NULL, &res, &timeout, NULL);
2432         if (ret != 0 || res != 0) {
2433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2434                 return -1;
2435         }
2436
2437         return 0;
2438 }
2439
2440 /* thaw all databases */
2441 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2442 {
2443         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2444 }
2445
2446 /*
2447   get pnn of a node, or -1
2448  */
2449 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2450 {
2451         int ret;
2452         int32_t res;
2453
2454         ret = ctdb_control(ctdb, destnode, 0, 
2455                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2456                            NULL, NULL, &res, &timeout, NULL);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2459                 return -1;
2460         }
2461
2462         return res;
2463 }
2464
2465 /*
2466   get the monitoring mode of a remote node
2467  */
2468 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2469 {
2470         int ret;
2471         int32_t res;
2472
2473         ret = ctdb_control(ctdb, destnode, 0, 
2474                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2475                            NULL, NULL, &res, &timeout, NULL);
2476         if (ret != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2478                 return -1;
2479         }
2480
2481         *monmode = res;
2482
2483         return 0;
2484 }
2485
2486
2487 /*
2488  set the monitoring mode of a remote node to active
2489  */
2490 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2491 {
2492         int ret;
2493         
2494
2495         ret = ctdb_control(ctdb, destnode, 0, 
2496                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2497                            NULL, NULL,NULL, &timeout, NULL);
2498         if (ret != 0) {
2499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2500                 return -1;
2501         }
2502
2503         
2504
2505         return 0;
2506 }
2507
2508 /*
2509   set the monitoring mode of a remote node to disable
2510  */
2511 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2512 {
2513         int ret;
2514         
2515
2516         ret = ctdb_control(ctdb, destnode, 0, 
2517                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2518                            NULL, NULL, NULL, &timeout, NULL);
2519         if (ret != 0) {
2520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2521                 return -1;
2522         }
2523
2524         
2525
2526         return 0;
2527 }
2528
2529
2530
2531 /* 
2532   sent to a node to make it take over an ip address
2533 */
2534 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2535                           uint32_t destnode, struct ctdb_public_ip *ip)
2536 {
2537         TDB_DATA data;
2538         struct ctdb_public_ipv4 ipv4;
2539         int ret;
2540         int32_t res;
2541
2542         if (ip->addr.sa.sa_family == AF_INET) {
2543                 ipv4.pnn = ip->pnn;
2544                 ipv4.sin = ip->addr.ip;
2545
2546                 data.dsize = sizeof(ipv4);
2547                 data.dptr  = (uint8_t *)&ipv4;
2548
2549                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2550                            NULL, &res, &timeout, NULL);
2551         } else {
2552                 data.dsize = sizeof(*ip);
2553                 data.dptr  = (uint8_t *)ip;
2554
2555                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2556                            NULL, &res, &timeout, NULL);
2557         }
2558
2559         if (ret != 0 || res != 0) {
2560                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2561                 return -1;
2562         }
2563
2564         return 0;       
2565 }
2566
2567
2568 /* 
2569   sent to a node to make it release an ip address
2570 */
2571 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2572                          uint32_t destnode, struct ctdb_public_ip *ip)
2573 {
2574         TDB_DATA data;
2575         struct ctdb_public_ipv4 ipv4;
2576         int ret;
2577         int32_t res;
2578
2579         if (ip->addr.sa.sa_family == AF_INET) {
2580                 ipv4.pnn = ip->pnn;
2581                 ipv4.sin = ip->addr.ip;
2582
2583                 data.dsize = sizeof(ipv4);
2584                 data.dptr  = (uint8_t *)&ipv4;
2585
2586                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2587                                    NULL, &res, &timeout, NULL);
2588         } else {
2589                 data.dsize = sizeof(*ip);
2590                 data.dptr  = (uint8_t *)ip;
2591
2592                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2593                                    NULL, &res, &timeout, NULL);
2594         }
2595
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;       
2602 }
2603
2604
2605 /*
2606   get a tunable
2607  */
2608 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2609                           struct timeval timeout, 
2610                           uint32_t destnode,
2611                           const char *name, uint32_t *value)
2612 {
2613         struct ctdb_control_get_tunable *t;
2614         TDB_DATA data, outdata;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2619         data.dptr  = talloc_size(ctdb, data.dsize);
2620         CTDB_NO_MEMORY(ctdb, data.dptr);
2621
2622         t = (struct ctdb_control_get_tunable *)data.dptr;
2623         t->length = strlen(name)+1;
2624         memcpy(t->name, name, t->length);
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         talloc_free(data.dptr);
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2631                 return ret != 0 ? ret : res;
2632         }
2633
2634         if (outdata.dsize != sizeof(uint32_t)) {
2635                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2636                 talloc_free(outdata.dptr);
2637                 return -1;
2638         }
2639         
2640         *value = *(uint32_t *)outdata.dptr;
2641         talloc_free(outdata.dptr);
2642
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2650                           struct timeval timeout, 
2651                           uint32_t destnode,
2652                           const char *name, uint32_t value)
2653 {
2654         struct ctdb_control_set_tunable *t;
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2660         data.dptr  = talloc_size(ctdb, data.dsize);
2661         CTDB_NO_MEMORY(ctdb, data.dptr);
2662
2663         t = (struct ctdb_control_set_tunable *)data.dptr;
2664         t->length = strlen(name)+1;
2665         memcpy(t->name, name, t->length);
2666         t->value = value;
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2669                            NULL, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2673                 return -1;
2674         }
2675
2676         return 0;
2677 }
2678
2679 /*
2680   list tunables
2681  */
2682 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2683                             struct timeval timeout, 
2684                             uint32_t destnode,
2685                             TALLOC_CTX *mem_ctx,
2686                             const char ***list, uint32_t *count)
2687 {
2688         TDB_DATA outdata;
2689         int32_t res;
2690         int ret;
2691         struct ctdb_control_list_tunable *t;
2692         char *p, *s, *ptr;
2693
2694         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2698                 return -1;
2699         }
2700
2701         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2702         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2703             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2704                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2705                 talloc_free(outdata.dptr);
2706                 return -1;              
2707         }
2708         
2709         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2710         CTDB_NO_MEMORY(ctdb, p);
2711
2712         talloc_free(outdata.dptr);
2713         
2714         (*list) = NULL;
2715         (*count) = 0;
2716
2717         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2718                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2719                 CTDB_NO_MEMORY(ctdb, *list);
2720                 (*list)[*count] = talloc_strdup(*list, s);
2721                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2722                 (*count)++;
2723         }
2724
2725         talloc_free(p);
2726
2727         return 0;
2728 }
2729
2730
2731 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2732                                    struct timeval timeout, uint32_t destnode,
2733                                    TALLOC_CTX *mem_ctx,
2734                                    uint32_t flags,
2735                                    struct ctdb_all_public_ips **ips)
2736 {
2737         int ret;
2738         TDB_DATA outdata;
2739         int32_t res;
2740
2741         ret = ctdb_control(ctdb, destnode, 0, 
2742                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2743                            mem_ctx, &outdata, &res, &timeout, NULL);
2744         if (ret == 0 && res == -1) {
2745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2746                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2747         }
2748         if (ret != 0 || res != 0) {
2749           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2750                 return -1;
2751         }
2752
2753         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2754         talloc_free(outdata.dptr);
2755                     
2756         return 0;
2757 }
2758
2759 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2760                              struct timeval timeout, uint32_t destnode,
2761                              TALLOC_CTX *mem_ctx,
2762                              struct ctdb_all_public_ips **ips)
2763 {
2764         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2765                                               destnode, mem_ctx,
2766                                               0, ips);
2767 }
2768
2769 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2770                         struct timeval timeout, uint32_t destnode, 
2771                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2772 {
2773         int ret, i, len;
2774         TDB_DATA outdata;
2775         int32_t res;
2776         struct ctdb_all_public_ipsv4 *ipsv4;
2777
2778         ret = ctdb_control(ctdb, destnode, 0, 
2779                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2780                            mem_ctx, &outdata, &res, &timeout, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2783                 return -1;
2784         }
2785
2786         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2787         len = offsetof(struct ctdb_all_public_ips, ips) +
2788                 ipsv4->num*sizeof(struct ctdb_public_ip);
2789         *ips = talloc_zero_size(mem_ctx, len);
2790         CTDB_NO_MEMORY(ctdb, *ips);
2791         (*ips)->num = ipsv4->num;
2792         for (i=0; i<ipsv4->num; i++) {
2793                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2794                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2795         }
2796
2797         talloc_free(outdata.dptr);
2798                     
2799         return 0;
2800 }
2801
2802 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2803                                  struct timeval timeout, uint32_t destnode,
2804                                  TALLOC_CTX *mem_ctx,
2805                                  const ctdb_sock_addr *addr,
2806                                  struct ctdb_control_public_ip_info **_info)
2807 {
2808         int ret;
2809         TDB_DATA indata;
2810         TDB_DATA outdata;
2811         int32_t res;
2812         struct ctdb_control_public_ip_info *info;
2813         uint32_t len;
2814         uint32_t i;
2815
2816         indata.dptr = discard_const_p(uint8_t, addr);
2817         indata.dsize = sizeof(*addr);
2818
2819         ret = ctdb_control(ctdb, destnode, 0,
2820                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2821                            mem_ctx, &outdata, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2824                                 "failed ret:%d res:%d\n",
2825                                 ret, res));
2826                 return -1;
2827         }
2828
2829         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2830         if (len > outdata.dsize) {
2831                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2832                                 "returned invalid data with size %u > %u\n",
2833                                 (unsigned int)outdata.dsize,
2834                                 (unsigned int)len));
2835                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2836                 return -1;
2837         }
2838
2839         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2840         len += info->num*sizeof(struct ctdb_control_iface_info);
2841
2842         if (len > outdata.dsize) {
2843                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2844                                 "returned invalid data with size %u > %u\n",
2845                                 (unsigned int)outdata.dsize,
2846                                 (unsigned int)len));
2847                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2848                 return -1;
2849         }
2850
2851         /* make sure we null terminate the returned strings */
2852         for (i=0; i < info->num; i++) {
2853                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2854         }
2855
2856         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2857                                                                 outdata.dptr,
2858                                                                 outdata.dsize);
2859         talloc_free(outdata.dptr);
2860         if (*_info == NULL) {
2861                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2862                                 "talloc_memdup size %u failed\n",
2863                                 (unsigned int)outdata.dsize));
2864                 return -1;
2865         }
2866
2867         return 0;
2868 }
2869
2870 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2871                          struct timeval timeout, uint32_t destnode,
2872                          TALLOC_CTX *mem_ctx,
2873                          struct ctdb_control_get_ifaces **_ifaces)
2874 {
2875         int ret;
2876         TDB_DATA outdata;
2877         int32_t res;
2878         struct ctdb_control_get_ifaces *ifaces;
2879         uint32_t len;
2880         uint32_t i;
2881
2882         ret = ctdb_control(ctdb, destnode, 0,
2883                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2884                            mem_ctx, &outdata, &res, &timeout, NULL);
2885         if (ret != 0 || res != 0) {
2886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2887                                 "failed ret:%d res:%d\n",
2888                                 ret, res));
2889                 return -1;
2890         }
2891
2892         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2893         if (len > outdata.dsize) {
2894                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2895                                 "returned invalid data with size %u > %u\n",
2896                                 (unsigned int)outdata.dsize,
2897                                 (unsigned int)len));
2898                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2899                 return -1;
2900         }
2901
2902         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2903         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2904
2905         if (len > outdata.dsize) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2907                                 "returned invalid data with size %u > %u\n",
2908                                 (unsigned int)outdata.dsize,
2909                                 (unsigned int)len));
2910                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2911                 return -1;
2912         }
2913
2914         /* make sure we null terminate the returned strings */
2915         for (i=0; i < ifaces->num; i++) {
2916                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2917         }
2918
2919         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2920                                                                   outdata.dptr,
2921                                                                   outdata.dsize);
2922         talloc_free(outdata.dptr);
2923         if (*_ifaces == NULL) {
2924                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2925                                 "talloc_memdup size %u failed\n",
2926                                 (unsigned int)outdata.dsize));
2927                 return -1;
2928         }
2929
2930         return 0;
2931 }
2932
2933 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2934                              struct timeval timeout, uint32_t destnode,
2935                              TALLOC_CTX *mem_ctx,
2936                              const struct ctdb_control_iface_info *info)
2937 {
2938         int ret;
2939         TDB_DATA indata;
2940         int32_t res;
2941
2942         indata.dptr = discard_const_p(uint8_t, info);
2943         indata.dsize = sizeof(*info);
2944
2945         ret = ctdb_control(ctdb, destnode, 0,
2946                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2947                            mem_ctx, NULL, &res, &timeout, NULL);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2950                                 "failed ret:%d res:%d\n",
2951                                 ret, res));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   set/clear the permanent disabled bit on a remote node
2960  */
2961 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2962                        uint32_t set, uint32_t clear)
2963 {
2964         int ret;
2965         TDB_DATA data;
2966         struct ctdb_node_map *nodemap=NULL;
2967         struct ctdb_node_flag_change c;
2968         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2969         uint32_t recmaster;
2970         uint32_t *nodes;
2971
2972
2973         /* find the recovery master */
2974         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2977                 talloc_free(tmp_ctx);
2978                 return ret;
2979         }
2980
2981
2982         /* read the node flags from the recmaster */
2983         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2984         if (ret != 0) {
2985                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2986                 talloc_free(tmp_ctx);
2987                 return -1;
2988         }
2989         if (destnode >= nodemap->num) {
2990                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2991                 talloc_free(tmp_ctx);
2992                 return -1;
2993         }
2994
2995         c.pnn       = destnode;
2996         c.old_flags = nodemap->nodes[destnode].flags;
2997         c.new_flags = c.old_flags;
2998         c.new_flags |= set;
2999         c.new_flags &= ~clear;
3000
3001         data.dsize = sizeof(c);
3002         data.dptr = (unsigned char *)&c;
3003
3004         /* send the flags update to all connected nodes */
3005         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3006
3007         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3008                                         nodes, 0,
3009                                         timeout, false, data,
3010                                         NULL, NULL,
3011                                         NULL) != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3013
3014                 talloc_free(tmp_ctx);
3015                 return -1;
3016         }
3017
3018         talloc_free(tmp_ctx);
3019         return 0;
3020 }
3021
3022
3023 /*
3024   get all tunables
3025  */
3026 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3027                                struct timeval timeout, 
3028                                uint32_t destnode,
3029                                struct ctdb_tunable *tunables)
3030 {
3031         TDB_DATA outdata;
3032         int ret;
3033         int32_t res;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3036                            &outdata, &res, &timeout, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3039                 return -1;
3040         }
3041
3042         if (outdata.dsize != sizeof(*tunables)) {
3043                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3044                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3045                 return -1;              
3046         }
3047
3048         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3049         talloc_free(outdata.dptr);
3050         return 0;
3051 }
3052
3053 /*
3054   add a public address to a node
3055  */
3056 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3057                       struct timeval timeout, 
3058                       uint32_t destnode,
3059                       struct ctdb_control_ip_iface *pub)
3060 {
3061         TDB_DATA data;
3062         int32_t res;
3063         int ret;
3064
3065         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3066         data.dptr  = (unsigned char *)pub;
3067
3068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3069                            NULL, &res, &timeout, NULL);
3070         if (ret != 0 || res != 0) {
3071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3072                 return -1;
3073         }
3074
3075         return 0;
3076 }
3077
3078 /*
3079   delete a public address from a node
3080  */
3081 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3082                       struct timeval timeout, 
3083                       uint32_t destnode,
3084                       struct ctdb_control_ip_iface *pub)
3085 {
3086         TDB_DATA data;
3087         int32_t res;
3088         int ret;
3089
3090         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3091         data.dptr  = (unsigned char *)pub;
3092
3093         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3094                            NULL, &res, &timeout, NULL);
3095         if (ret != 0 || res != 0) {
3096                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3097                 return -1;
3098         }
3099
3100         return 0;
3101 }
3102
3103 /*
3104   kill a tcp connection
3105  */
3106 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3107                       struct timeval timeout, 
3108                       uint32_t destnode,
3109                       struct ctdb_control_killtcp *killtcp)
3110 {
3111         TDB_DATA data;
3112         int32_t res;
3113         int ret;
3114
3115         data.dsize = sizeof(struct ctdb_control_killtcp);
3116         data.dptr  = (unsigned char *)killtcp;
3117
3118         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3119                            NULL, &res, &timeout, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128 /*
3129   send a gratious arp
3130  */
3131 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3132                       struct timeval timeout, 
3133                       uint32_t destnode,
3134                       ctdb_sock_addr *addr,
3135                       const char *ifname)
3136 {
3137         TDB_DATA data;
3138         int32_t res;
3139         int ret, len;
3140         struct ctdb_control_gratious_arp *gratious_arp;
3141         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3142
3143
3144         len = strlen(ifname)+1;
3145         gratious_arp = talloc_size(tmp_ctx, 
3146                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3147         CTDB_NO_MEMORY(ctdb, gratious_arp);
3148
3149         gratious_arp->addr = *addr;
3150         gratious_arp->len = len;
3151         memcpy(&gratious_arp->iface[0], ifname, len);
3152
3153
3154         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3155         data.dptr  = (unsigned char *)gratious_arp;
3156
3157         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3158                            NULL, &res, &timeout, NULL);
3159         if (ret != 0 || res != 0) {
3160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3161                 talloc_free(tmp_ctx);
3162                 return -1;
3163         }
3164
3165         talloc_free(tmp_ctx);
3166         return 0;
3167 }
3168
3169 /*
3170   get a list of all tcp tickles that a node knows about for a particular vnn
3171  */
3172 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3173                               struct timeval timeout, uint32_t destnode, 
3174                               TALLOC_CTX *mem_ctx, 
3175                               ctdb_sock_addr *addr,
3176                               struct ctdb_control_tcp_tickle_list **list)
3177 {
3178         int ret;
3179         TDB_DATA data, outdata;
3180         int32_t status;
3181
3182         data.dptr = (uint8_t*)addr;
3183         data.dsize = sizeof(ctdb_sock_addr);
3184
3185         ret = ctdb_control(ctdb, destnode, 0, 
3186                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3187                            mem_ctx, &outdata, &status, NULL, NULL);
3188         if (ret != 0 || status != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3190                 return -1;
3191         }
3192
3193         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3194
3195         return status;
3196 }
3197
3198 /*
3199   register a server id
3200  */
3201 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3202                       struct timeval timeout, 
3203                       struct ctdb_server_id *id)
3204 {
3205         TDB_DATA data;
3206         int32_t res;
3207         int ret;
3208
3209         data.dsize = sizeof(struct ctdb_server_id);
3210         data.dptr  = (unsigned char *)id;
3211
3212         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3213                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3214                         0, data, NULL,
3215                         NULL, &res, &timeout, NULL);
3216         if (ret != 0 || res != 0) {
3217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3218                 return -1;
3219         }
3220
3221         return 0;
3222 }
3223
3224 /*
3225   unregister a server id
3226  */
3227 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3228                       struct timeval timeout, 
3229                       struct ctdb_server_id *id)
3230 {
3231         TDB_DATA data;
3232         int32_t res;
3233         int ret;
3234
3235         data.dsize = sizeof(struct ctdb_server_id);
3236         data.dptr  = (unsigned char *)id;
3237
3238         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3239                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3240                         0, data, NULL,
3241                         NULL, &res, &timeout, NULL);
3242         if (ret != 0 || res != 0) {
3243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3244                 return -1;
3245         }
3246
3247         return 0;
3248 }
3249
3250
3251 /*
3252   check if a server id exists
3253
3254   if a server id does exist, return *status == 1, otherwise *status == 0
3255  */
3256 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3257                       struct timeval timeout, 
3258                       uint32_t destnode,
3259                       struct ctdb_server_id *id,
3260                       uint32_t *status)
3261 {
3262         TDB_DATA data;
3263         int32_t res;
3264         int ret;
3265
3266         data.dsize = sizeof(struct ctdb_server_id);
3267         data.dptr  = (unsigned char *)id;
3268
3269         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3270                         0, data, NULL,
3271                         NULL, &res, &timeout, NULL);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3274                 return -1;
3275         }
3276
3277         if (res) {
3278                 *status = 1;
3279         } else {
3280                 *status = 0;
3281         }
3282
3283         return 0;
3284 }
3285
3286 /*
3287    get the list of server ids that are registered on a node
3288 */
3289 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3290                 TALLOC_CTX *mem_ctx,
3291                 struct timeval timeout, uint32_t destnode, 
3292                 struct ctdb_server_id_list **svid_list)
3293 {
3294         int ret;
3295         TDB_DATA outdata;
3296         int32_t res;
3297
3298         ret = ctdb_control(ctdb, destnode, 0, 
3299                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3300                            mem_ctx, &outdata, &res, &timeout, NULL);
3301         if (ret != 0 || res != 0) {
3302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3303                 return -1;
3304         }
3305
3306         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3307                     
3308         return 0;
3309 }
3310
3311 /*
3312   initialise the ctdb daemon for client applications
3313
3314   NOTE: In current code the daemon does not fork. This is for testing purposes only
3315   and to simplify the code.
3316 */
3317 struct ctdb_context *ctdb_init(struct event_context *ev)
3318 {
3319         int ret;
3320         struct ctdb_context *ctdb;
3321
3322         ctdb = talloc_zero(ev, struct ctdb_context);
3323         if (ctdb == NULL) {
3324                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3325                 return NULL;
3326         }
3327         ctdb->ev  = ev;
3328         ctdb->idr = idr_init(ctdb);
3329         /* Wrap early to exercise code. */
3330         ctdb->lastid = INT_MAX-200;
3331         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3332
3333         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3336                 talloc_free(ctdb);
3337                 return NULL;
3338         }
3339
3340         ctdb->statistics.statistics_start_time = timeval_current();
3341
3342         return ctdb;
3343 }
3344
3345
3346 /*
3347   set some ctdb flags
3348 */
3349 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3350 {
3351         ctdb->flags |= flags;
3352 }
3353
3354 /*
3355   setup the local socket name
3356 */
3357 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3358 {
3359         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3360         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3361
3362         return 0;
3363 }
3364
3365 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3366 {
3367         return ctdb->daemon.name;
3368 }
3369
3370 /*
3371   return the pnn of this node
3372 */
3373 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3374 {
3375         return ctdb->pnn;
3376 }
3377
3378
3379 /*
3380   get the uptime of a remote node
3381  */
3382 struct ctdb_client_control_state *
3383 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3384 {
3385         return ctdb_control_send(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3387                            mem_ctx, &timeout, NULL);
3388 }
3389
3390 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3391 {
3392         int ret;
3393         int32_t res;
3394         TDB_DATA outdata;
3395
3396         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3397         if (ret != 0 || res != 0) {
3398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3399                 return -1;
3400         }
3401
3402         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3403
3404         return 0;
3405 }
3406
3407 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3408 {
3409         struct ctdb_client_control_state *state;
3410
3411         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3412         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3413 }
3414
3415 /*
3416   send a control to execute the "recovered" event script on a node
3417  */
3418 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3419 {
3420         int ret;
3421         int32_t status;
3422
3423         ret = ctdb_control(ctdb, destnode, 0, 
3424                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3425                            NULL, NULL, &status, &timeout, NULL);
3426         if (ret != 0 || status != 0) {
3427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3428                 return -1;
3429         }
3430
3431         return 0;
3432 }
3433
3434 /* 
3435   callback for the async helpers used when sending the same control
3436   to multiple nodes in parallell.
3437 */
3438 static void async_callback(struct ctdb_client_control_state *state)
3439 {
3440         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3441         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res = -1;
3445         uint32_t destnode = state->c->hdr.destnode;
3446
3447         outdata.dsize = 0;
3448         outdata.dptr = NULL;
3449
3450         /* one more node has responded with recmode data */
3451         data->count--;
3452
3453         /* if we failed to push the db, then return an error and let
3454            the main loop try again.
3455         */
3456         if (state->state != CTDB_CONTROL_DONE) {
3457                 if ( !data->dont_log_errors) {
3458                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3459                 }
3460                 data->fail_count++;
3461                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3462                         res = -ETIME;
3463                 } else {
3464                         res = -1;
3465                 }
3466                 if (data->fail_callback) {
3467                         data->fail_callback(ctdb, destnode, res, outdata,
3468                                         data->callback_data);
3469                 }
3470                 return;
3471         }
3472         
3473         state->async.fn = NULL;
3474
3475         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3476         if ((ret != 0) || (res != 0)) {
3477                 if ( !data->dont_log_errors) {
3478                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3479                 }
3480                 data->fail_count++;
3481                 if (data->fail_callback) {
3482                         data->fail_callback(ctdb, destnode, res, outdata,
3483                                         data->callback_data);
3484                 }
3485         }
3486         if ((ret == 0) && (data->callback != NULL)) {
3487                 data->callback(ctdb, destnode, res, outdata,
3488                                         data->callback_data);
3489         }
3490 }
3491
3492
3493 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3494 {
3495         /* set up the callback functions */
3496         state->async.fn = async_callback;
3497         state->async.private_data = data;
3498         
3499         /* one more control to wait for to complete */
3500         data->count++;
3501 }
3502
3503
3504 /* wait for up to the maximum number of seconds allowed
3505    or until all nodes we expect a response from has replied
3506 */
3507 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3508 {
3509         while (data->count > 0) {
3510                 event_loop_once(ctdb->ev);
3511         }
3512         if (data->fail_count != 0) {
3513                 if (!data->dont_log_errors) {
3514                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3515                                  data->fail_count));
3516                 }
3517                 return -1;
3518         }
3519         return 0;
3520 }
3521
3522
3523 /* 
3524    perform a simple control on the listed nodes
3525    The control cannot return data
3526  */
3527 int ctdb_client_async_control(struct ctdb_context *ctdb,
3528                                 enum ctdb_controls opcode,
3529                                 uint32_t *nodes,
3530                                 uint64_t srvid,
3531                                 struct timeval timeout,
3532                                 bool dont_log_errors,
3533                                 TDB_DATA data,
3534                                 client_async_callback client_callback,
3535                                 client_async_callback fail_callback,
3536                                 void *callback_data)
3537 {
3538         struct client_async_data *async_data;
3539         struct ctdb_client_control_state *state;
3540         int j, num_nodes;
3541
3542         async_data = talloc_zero(ctdb, struct client_async_data);
3543         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3544         async_data->dont_log_errors = dont_log_errors;
3545         async_data->callback = client_callback;
3546         async_data->fail_callback = fail_callback;
3547         async_data->callback_data = callback_data;
3548         async_data->opcode        = opcode;
3549
3550         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3551
3552         /* loop over all nodes and send an async control to each of them */
3553         for (j=0; j<num_nodes; j++) {
3554                 uint32_t pnn = nodes[j];
3555
3556                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3557                                           0, data, async_data, &timeout, NULL);
3558                 if (state == NULL) {
3559                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3560                         talloc_free(async_data);
3561                         return -1;
3562                 }
3563                 
3564                 ctdb_client_async_add(async_data, state);
3565         }
3566
3567         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3568                 talloc_free(async_data);
3569                 return -1;
3570         }
3571
3572         talloc_free(async_data);
3573         return 0;
3574 }
3575
3576 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3577                                 struct ctdb_vnn_map *vnn_map,
3578                                 TALLOC_CTX *mem_ctx,
3579                                 bool include_self)
3580 {
3581         int i, j, num_nodes;
3582         uint32_t *nodes;
3583
3584         for (i=num_nodes=0;i<vnn_map->size;i++) {
3585                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3586                         continue;
3587                 }
3588                 num_nodes++;
3589         } 
3590
3591         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3592         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3593
3594         for (i=j=0;i<vnn_map->size;i++) {
3595                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3596                         continue;
3597                 }
3598                 nodes[j++] = vnn_map->map[i];
3599         } 
3600
3601         return nodes;
3602 }
3603
3604 /* Get list of nodes not including those with flags specified by mask.
3605  * If exclude_pnn is not -1 then exclude that pnn from the list.
3606  */
3607 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3608                         struct ctdb_node_map *node_map,
3609                         TALLOC_CTX *mem_ctx,
3610                         uint32_t mask,
3611                         int exclude_pnn)
3612 {
3613         int i, j, num_nodes;
3614         uint32_t *nodes;
3615
3616         for (i=num_nodes=0;i<node_map->num;i++) {
3617                 if (node_map->nodes[i].flags & mask) {
3618                         continue;
3619                 }
3620                 if (node_map->nodes[i].pnn == exclude_pnn) {
3621                         continue;
3622                 }
3623                 num_nodes++;
3624         } 
3625
3626         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3627         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3628
3629         for (i=j=0;i<node_map->num;i++) {
3630                 if (node_map->nodes[i].flags & mask) {
3631                         continue;
3632                 }
3633                 if (node_map->nodes[i].pnn == exclude_pnn) {
3634                         continue;
3635                 }
3636                 nodes[j++] = node_map->nodes[i].pnn;
3637         } 
3638
3639         return nodes;
3640 }
3641
3642 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3643                                 struct ctdb_node_map *node_map,
3644                                 TALLOC_CTX *mem_ctx,
3645                                 bool include_self)
3646 {
3647         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3648                              include_self ? -1 : ctdb->pnn);
3649 }
3650
3651 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3652                                 struct ctdb_node_map *node_map,
3653                                 TALLOC_CTX *mem_ctx,
3654                                 bool include_self)
3655 {
3656         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3657                              include_self ? -1 : ctdb->pnn);
3658 }
3659
3660 /* 
3661   this is used to test if a pnn lock exists and if it exists will return
3662   the number of connections that pnn has reported or -1 if that recovery
3663   daemon is not running.
3664 */
3665 int
3666 ctdb_read_pnn_lock(int fd, int32_t pnn)
3667 {
3668         struct flock lock;
3669         char c;
3670
3671         lock.l_type = F_WRLCK;
3672         lock.l_whence = SEEK_SET;
3673         lock.l_start = pnn;
3674         lock.l_len = 1;
3675         lock.l_pid = 0;
3676
3677         if (fcntl(fd, F_GETLK, &lock) != 0) {
3678                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3679                 return -1;
3680         }
3681
3682         if (lock.l_type == F_UNLCK) {
3683                 return -1;
3684         }
3685
3686         if (pread(fd, &c, 1, pnn) == -1) {
3687                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3688                 return -1;
3689         }
3690
3691         return c;
3692 }
3693
3694 /*
3695   get capabilities of a remote node
3696  */
3697 struct ctdb_client_control_state *
3698 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3699 {
3700         return ctdb_control_send(ctdb, destnode, 0, 
3701                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3702                            mem_ctx, &timeout, NULL);
3703 }
3704
3705 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3706 {
3707         int ret;
3708         int32_t res;
3709         TDB_DATA outdata;
3710
3711         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3712         if ( (ret != 0) || (res != 0) ) {
3713                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3714                 return -1;
3715         }
3716
3717         if (capabilities) {
3718                 *capabilities = *((uint32_t *)outdata.dptr);
3719         }
3720
3721         return 0;
3722 }
3723
3724 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3725 {
3726         struct ctdb_client_control_state *state;
3727         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3728         int ret;
3729
3730         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3731         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3732         talloc_free(tmp_ctx);
3733         return ret;
3734 }
3735
3736 struct server_id {
3737         uint64_t pid;
3738         uint32_t task_id;
3739         uint32_t vnn;
3740         uint64_t unique_id;
3741 };
3742
3743 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3744 {
3745         struct server_id id;
3746
3747         id.pid = getpid();
3748         id.task_id = reqid;
3749         id.vnn = ctdb_get_pnn(ctdb);
3750         id.unique_id = id.vnn;
3751         id.unique_id = (id.unique_id << 32) | reqid;
3752
3753         return id;
3754 }
3755
3756 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3757 {
3758         if (id1->pid != id2->pid) {
3759                 return false;
3760         }
3761
3762         if (id1->task_id != id2->task_id) {
3763                 return false;
3764         }
3765
3766         if (id1->vnn != id2->vnn) {
3767                 return false;
3768         }
3769
3770         if (id1->unique_id != id2->unique_id) {
3771                 return false;
3772         }
3773
3774         return true;
3775 }
3776
3777 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3778 {
3779         struct ctdb_server_id sid;
3780         int ret;
3781         uint32_t result;
3782
3783         sid.type = SERVER_TYPE_SAMBA;
3784         sid.pnn = id->vnn;
3785         sid.server_id = id->pid;
3786
3787         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3788                                         id->vnn, &sid, &result);
3789         if (ret != 0) {
3790                 /* If control times out, assume server_id exists. */
3791                 return true;
3792         }
3793
3794         if (result) {
3795                 return true;
3796         }
3797
3798         return false;
3799 }
3800
3801
3802 enum g_lock_type {
3803         G_LOCK_READ = 0,
3804         G_LOCK_WRITE = 1,
3805 };
3806
3807 struct g_lock_rec {
3808         enum g_lock_type type;
3809         struct server_id id;
3810 };
3811
3812 struct g_lock_recs {
3813         unsigned int num;
3814         struct g_lock_rec *lock;
3815 };
3816
3817 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3818                          struct g_lock_recs **locks)
3819 {
3820         struct g_lock_recs *recs;
3821
3822         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3823         if (recs == NULL) {
3824                 return false;
3825         }
3826
3827         if (data.dsize == 0) {
3828                 goto done;
3829         }
3830
3831         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3832                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3833                                   (unsigned long)data.dsize));
3834                 talloc_free(recs);
3835                 return false;
3836         }
3837
3838         recs->num = data.dsize / sizeof(struct g_lock_rec);
3839         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3840         if (recs->lock == NULL) {
3841                 talloc_free(recs);
3842                 return false;
3843         }
3844
3845 done:
3846         if (locks != NULL) {
3847                 *locks = recs;
3848         }
3849
3850         return true;
3851 }
3852
3853
3854 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3855                         struct ctdb_db_context *ctdb_db,
3856                         const char *keyname, uint32_t reqid)
3857 {
3858         TDB_DATA key, data;
3859         struct ctdb_record_handle *h;
3860         struct g_lock_recs *locks;
3861         struct server_id id;
3862         struct timeval t_start;
3863         int i;
3864
3865         key.dptr = (uint8_t *)discard_const(keyname);
3866         key.dsize = strlen(keyname) + 1;
3867
3868         t_start = timeval_current();
3869
3870 again:
3871         /* Keep trying for an hour. */
3872         if (timeval_elapsed(&t_start) > 3600) {
3873                 return false;
3874         }
3875
3876         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3877         if (h == NULL) {
3878                 return false;
3879         }
3880
3881         if (!g_lock_parse(h, data, &locks)) {
3882                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3883                 talloc_free(data.dptr);
3884                 talloc_free(h);
3885                 return false;
3886         }
3887
3888         talloc_free(data.dptr);
3889
3890         id = server_id_get(ctdb_db->ctdb, reqid);
3891
3892         i = 0;
3893         while (i < locks->num) {
3894                 if (server_id_equal(&locks->lock[i].id, &id)) {
3895                         /* Internal error */
3896                         talloc_free(h);
3897                         return false;
3898                 }
3899
3900                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3901                         if (i < locks->num-1) {
3902                                 locks->lock[i] = locks->lock[locks->num-1];
3903                         }
3904                         locks->num--;
3905                         continue;
3906                 }
3907
3908                 /* This entry is locked. */
3909                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3910                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3911                                    (unsigned long long)id.pid,
3912                                    id.task_id, id.vnn,
3913                                    (unsigned long long)id.unique_id));
3914                 talloc_free(h);
3915                 goto again;
3916         }
3917
3918         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3919                                      locks->num+1);
3920         if (locks->lock == NULL) {
3921                 talloc_free(h);
3922                 return false;
3923         }
3924
3925         locks->lock[locks->num].type = G_LOCK_WRITE;
3926         locks->lock[locks->num].id = id;
3927         locks->num++;
3928
3929         data.dptr = (uint8_t *)locks->lock;
3930         data.dsize = locks->num * sizeof(struct g_lock_rec);
3931
3932         if (ctdb_record_store(h, data) != 0) {
3933                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3934                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3935                                   (unsigned long long)id.pid,
3936                                   id.task_id, id.vnn,
3937                                   (unsigned long long)id.unique_id));
3938                 talloc_free(h);
3939                 return false;
3940         }
3941
3942         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3943                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3944                            (unsigned long long)id.pid,
3945                            id.task_id, id.vnn,
3946                            (unsigned long long)id.unique_id));
3947
3948         talloc_free(h);
3949         return true;
3950 }
3951
3952 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3953                           struct ctdb_db_context *ctdb_db,
3954                           const char *keyname, uint32_t reqid)
3955 {
3956         TDB_DATA key, data;
3957         struct ctdb_record_handle *h;
3958         struct g_lock_recs *locks;
3959         struct server_id id;
3960         int i;
3961         bool found = false;
3962
3963         key.dptr = (uint8_t *)discard_const(keyname);
3964         key.dsize = strlen(keyname) + 1;
3965         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3966         if (h == NULL) {
3967                 return false;
3968         }
3969
3970         if (!g_lock_parse(h, data, &locks)) {
3971                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3972                 talloc_free(data.dptr);
3973                 talloc_free(h);
3974                 return false;
3975         }
3976
3977         talloc_free(data.dptr);
3978
3979         id = server_id_get(ctdb_db->ctdb, reqid);
3980
3981         for (i=0; i<locks->num; i++) {
3982                 if (server_id_equal(&locks->lock[i].id, &id)) {
3983                         if (i < locks->num-1) {
3984                                 locks->lock[i] = locks->lock[locks->num-1];
3985                         }
3986                         locks->num--;
3987                         found = true;
3988                         break;
3989                 }
3990         }
3991
3992         if (!found) {
3993                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3994                 talloc_free(h);
3995                 return false;
3996         }
3997
3998         data.dptr = (uint8_t *)locks->lock;
3999         data.dsize = locks->num * sizeof(struct g_lock_rec);
4000
4001         if (ctdb_record_store(h, data) != 0) {
4002                 talloc_free(h);
4003                 return false;
4004         }
4005
4006         talloc_free(h);
4007         return true;
4008 }
4009
4010
4011 struct ctdb_transaction_handle {
4012         struct ctdb_db_context *ctdb_db;
4013         struct ctdb_db_context *g_lock_db;
4014         char *lock_name;
4015         uint32_t reqid;
4016         /*
4017          * we store reads and writes done under a transaction:
4018          * - one list stores both reads and writes (m_all)
4019          * - the other just writes (m_write)
4020          */
4021         struct ctdb_marshall_buffer *m_all;
4022         struct ctdb_marshall_buffer *m_write;
4023 };
4024
4025 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4026 {
4027         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4028         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4029         return 0;
4030 }
4031
4032
4033 /**
4034  * start a transaction on a database
4035  */
4036 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4037                                                        TALLOC_CTX *mem_ctx)
4038 {
4039         struct ctdb_transaction_handle *h;
4040         struct ctdb_server_id id;
4041
4042         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4043         if (h == NULL) {
4044                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4045                 return NULL;
4046         }
4047
4048         h->ctdb_db = ctdb_db;
4049         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4050                                        (unsigned int)ctdb_db->db_id);
4051         if (h->lock_name == NULL) {
4052                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4053                 talloc_free(h);
4054                 return NULL;
4055         }
4056
4057         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4058                                    "g_lock.tdb", false, 0);
4059         if (!h->g_lock_db) {
4060                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4061                 talloc_free(h);
4062                 return NULL;
4063         }
4064
4065         id.type = SERVER_TYPE_SAMBA;
4066         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4067         id.server_id = getpid();
4068
4069         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4070                                          &id) != 0) {
4071                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4072                 talloc_free(h);
4073                 return NULL;
4074         }
4075
4076         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4077
4078         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4079                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4080                 talloc_free(h);
4081                 return NULL;
4082         }
4083
4084         talloc_set_destructor(h, ctdb_transaction_destructor);
4085         return h;
4086 }
4087
4088 /**
4089  * fetch a record inside a transaction
4090  */
4091 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4092                            TALLOC_CTX *mem_ctx,
4093                            TDB_DATA key, TDB_DATA *data)
4094 {
4095         struct ctdb_ltdb_header header;
4096         int ret;
4097
4098         ZERO_STRUCT(header);
4099
4100         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4101         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4102                 /* record doesn't exist yet */
4103                 *data = tdb_null;
4104                 ret = 0;
4105         }
4106
4107         if (ret != 0) {
4108                 return ret;
4109         }
4110
4111         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4112         if (h->m_all == NULL) {
4113                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4114                 return -1;
4115         }
4116
4117         return 0;
4118 }
4119
4120 /**
4121  * stores a record inside a transaction
4122  */
4123 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4124                            TDB_DATA key, TDB_DATA data)
4125 {
4126         TALLOC_CTX *tmp_ctx = talloc_new(h);
4127         struct ctdb_ltdb_header header;
4128         TDB_DATA olddata;
4129         int ret;
4130
4131         /* we need the header so we can update the RSN */
4132         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4133         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4134                 /* the record doesn't exist - create one with us as dmaster.
4135                    This is only safe because we are in a transaction and this
4136                    is a persistent database */
4137                 ZERO_STRUCT(header);
4138         } else if (ret != 0) {
4139                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4140                 talloc_free(tmp_ctx);
4141                 return ret;
4142         }
4143
4144         if (data.dsize == olddata.dsize &&
4145             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4146             header.rsn != 0) {
4147                 /* save writing the same data */
4148                 talloc_free(tmp_ctx);
4149                 return 0;
4150         }
4151
4152         header.dmaster = h->ctdb_db->ctdb->pnn;
4153         header.rsn++;
4154
4155         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4156         if (h->m_all == NULL) {
4157                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4158                 talloc_free(tmp_ctx);
4159                 return -1;
4160         }
4161
4162         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4163         if (h->m_write == NULL) {
4164                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4165                 talloc_free(tmp_ctx);
4166                 return -1;
4167         }
4168
4169         talloc_free(tmp_ctx);
4170         return 0;
4171 }
4172
4173 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4174 {
4175         const char *keyname = CTDB_DB_SEQNUM_KEY;
4176         TDB_DATA key, data;
4177         struct ctdb_ltdb_header header;
4178         int ret;
4179
4180         key.dptr = (uint8_t *)discard_const(keyname);
4181         key.dsize = strlen(keyname) + 1;
4182
4183         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4184         if (ret != 0) {
4185                 *seqnum = 0;
4186                 return 0;
4187         }
4188
4189         if (data.dsize == 0) {
4190                 *seqnum = 0;
4191                 return 0;
4192         }
4193
4194         if (data.dsize != sizeof(*seqnum)) {
4195                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4196                                   data.dsize));
4197                 talloc_free(data.dptr);
4198                 return -1;
4199         }
4200
4201         *seqnum = *(uint64_t *)data.dptr;
4202         talloc_free(data.dptr);
4203
4204         return 0;
4205 }
4206
4207
4208 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4209                                 uint64_t seqnum)
4210 {
4211         const char *keyname = CTDB_DB_SEQNUM_KEY;
4212         TDB_DATA key, data;
4213
4214         key.dptr = (uint8_t *)discard_const(keyname);
4215         key.dsize = strlen(keyname) + 1;
4216
4217         data.dptr = (uint8_t *)&seqnum;
4218         data.dsize = sizeof(seqnum);
4219
4220         return ctdb_transaction_store(h, key, data);
4221 }
4222
4223
4224 /**
4225  * commit a transaction
4226  */
4227 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4228 {
4229         int ret;
4230         uint64_t old_seqnum, new_seqnum;
4231         int32_t status;
4232         struct timeval timeout;
4233
4234         if (h->m_write == NULL) {
4235                 /* no changes were made */
4236                 talloc_free(h);
4237                 return 0;
4238         }
4239
4240         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4241         if (ret != 0) {
4242                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4243                 ret = -1;
4244                 goto done;
4245         }
4246
4247         new_seqnum = old_seqnum + 1;
4248         ret = ctdb_store_db_seqnum(h, new_seqnum);
4249         if (ret != 0) {
4250                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4251                 ret = -1;
4252                 goto done;
4253         }
4254
4255 again:
4256         timeout = timeval_current_ofs(3,0);
4257         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4258                            h->ctdb_db->db_id,
4259                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4260                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4261                            &status, &timeout, NULL);
4262         if (ret != 0 || status != 0) {
4263                 /*
4264                  * TRANS3_COMMIT control will only fail if recovery has been
4265                  * triggered.  Check if the database has been updated or not.
4266                  */
4267                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4268                 if (ret != 0) {
4269                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4270                         goto done;
4271                 }
4272
4273                 if (new_seqnum == old_seqnum) {
4274                         /* Database not yet updated, try again */
4275                         goto again;
4276                 }
4277
4278                 if (new_seqnum != (old_seqnum + 1)) {
4279                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4280                                           (long long unsigned)new_seqnum,
4281                                           (long long unsigned)old_seqnum));
4282                         ret = -1;
4283                         goto done;
4284                 }
4285         }
4286
4287         ret = 0;
4288
4289 done:
4290         talloc_free(h);
4291         return ret;
4292 }
4293
4294 /**
4295  * cancel a transaction
4296  */
4297 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4298 {
4299         talloc_free(h);
4300         return 0;
4301 }
4302
4303
4304 /*
4305   recovery daemon ping to main daemon
4306  */
4307 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4308 {
4309         int ret;
4310         int32_t res;
4311
4312         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4313                            ctdb, NULL, &res, NULL, NULL);
4314         if (ret != 0 || res != 0) {
4315                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4316                 return -1;
4317         }
4318
4319         return 0;
4320 }
4321
4322 /* When forking the main daemon and the child process needs to connect
4323  * back to the daemon as a client process, this function can be used
4324  * to change the ctdb context from daemon into client mode.  The child
4325  * process must be created using ctdb_fork() and not fork() -
4326  * ctdb_fork() does some necessary housekeeping.
4327  */
4328 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4329 {
4330         int ret;
4331         va_list ap;
4332
4333         /* Add extra information so we can identify this in the logs */
4334         va_start(ap, fmt);
4335         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4336         va_end(ap);
4337
4338         /* get a new event context */
4339         ctdb->ev = event_context_init(ctdb);
4340         tevent_loop_allow_nesting(ctdb->ev);
4341
4342         /* Connect to main CTDB daemon */
4343         ret = ctdb_socket_connect(ctdb);
4344         if (ret != 0) {
4345                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4346                 return -1;
4347         }
4348
4349         ctdb->can_send_controls = true;
4350
4351         return 0;
4352 }
4353
4354 /*
4355   get the status of running the monitor eventscripts: NULL means never run.
4356  */
4357 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4358                 struct timeval timeout, uint32_t destnode, 
4359                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4360                 struct ctdb_scripts_wire **scripts)
4361 {
4362         int ret;
4363         TDB_DATA outdata, indata;
4364         int32_t res;
4365         uint32_t uinttype = type;
4366
4367         indata.dptr = (uint8_t *)&uinttype;
4368         indata.dsize = sizeof(uinttype);
4369
4370         ret = ctdb_control(ctdb, destnode, 0, 
4371                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4372                            mem_ctx, &outdata, &res, &timeout, NULL);
4373         if (ret != 0 || res != 0) {
4374                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4375                 return -1;
4376         }
4377
4378         if (outdata.dsize == 0) {
4379                 *scripts = NULL;
4380         } else {
4381                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4382                 talloc_free(outdata.dptr);
4383         }
4384                     
4385         return 0;
4386 }
4387
4388 /*
4389   tell the main daemon how long it took to lock the reclock file
4390  */
4391 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4392 {
4393         int ret;
4394         int32_t res;
4395         TDB_DATA data;
4396
4397         data.dptr = (uint8_t *)&latency;
4398         data.dsize = sizeof(latency);
4399
4400         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4401                            ctdb, NULL, &res, NULL, NULL);
4402         if (ret != 0 || res != 0) {
4403                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4404                 return -1;
4405         }
4406
4407         return 0;
4408 }
4409
4410 /*
4411   get the name of the reclock file
4412  */
4413 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4414                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4415                          const char **name)
4416 {
4417         int ret;
4418         int32_t res;
4419         TDB_DATA data;
4420
4421         ret = ctdb_control(ctdb, destnode, 0, 
4422                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4423                            mem_ctx, &data, &res, &timeout, NULL);
4424         if (ret != 0 || res != 0) {
4425                 return -1;
4426         }
4427
4428         if (data.dsize == 0) {
4429                 *name = NULL;
4430         } else {
4431                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4432         }
4433         talloc_free(data.dptr);
4434
4435         return 0;
4436 }
4437
4438 /*
4439   set the reclock filename for a node
4440  */
4441 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4442 {
4443         int ret;
4444         TDB_DATA data;
4445         int32_t res;
4446
4447         if (reclock == NULL) {
4448                 data.dsize = 0;
4449                 data.dptr  = NULL;
4450         } else {
4451                 data.dsize = strlen(reclock) + 1;
4452                 data.dptr  = discard_const(reclock);
4453         }
4454
4455         ret = ctdb_control(ctdb, destnode, 0, 
4456                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4457                            NULL, NULL, &res, &timeout, NULL);
4458         if (ret != 0 || res != 0) {
4459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4460                 return -1;
4461         }
4462
4463         return 0;
4464 }
4465
4466 /*
4467   stop a node
4468  */
4469 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4470 {
4471         int ret;
4472         int32_t res;
4473
4474         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4475                            ctdb, NULL, &res, &timeout, NULL);
4476         if (ret != 0 || res != 0) {
4477                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4478                 return -1;
4479         }
4480
4481         return 0;
4482 }
4483
4484 /*
4485   continue a node
4486  */
4487 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4488 {
4489         int ret;
4490
4491         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4492                            ctdb, NULL, NULL, &timeout, NULL);
4493         if (ret != 0) {
4494                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4495                 return -1;
4496         }
4497
4498         return 0;
4499 }
4500
4501 /*
4502   set the natgw state for a node
4503  */
4504 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4505 {
4506         int ret;
4507         TDB_DATA data;
4508         int32_t res;
4509
4510         data.dsize = sizeof(natgwstate);
4511         data.dptr  = (uint8_t *)&natgwstate;
4512
4513         ret = ctdb_control(ctdb, destnode, 0, 
4514                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4515                            NULL, NULL, &res, &timeout, NULL);
4516         if (ret != 0 || res != 0) {
4517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4518                 return -1;
4519         }
4520
4521         return 0;
4522 }
4523
4524 /*
4525   set the lmaster role for a node
4526  */
4527 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4528 {
4529         int ret;
4530         TDB_DATA data;
4531         int32_t res;
4532
4533         data.dsize = sizeof(lmasterrole);
4534         data.dptr  = (uint8_t *)&lmasterrole;
4535
4536         ret = ctdb_control(ctdb, destnode, 0, 
4537                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4538                            NULL, NULL, &res, &timeout, NULL);
4539         if (ret != 0 || res != 0) {
4540                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4541                 return -1;
4542         }
4543
4544         return 0;
4545 }
4546
4547 /*
4548   set the recmaster role for a node
4549  */
4550 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4551 {
4552         int ret;
4553         TDB_DATA data;
4554         int32_t res;
4555
4556         data.dsize = sizeof(recmasterrole);
4557         data.dptr  = (uint8_t *)&recmasterrole;
4558
4559         ret = ctdb_control(ctdb, destnode, 0, 
4560                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4561                            NULL, NULL, &res, &timeout, NULL);
4562         if (ret != 0 || res != 0) {
4563                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4564                 return -1;
4565         }
4566
4567         return 0;
4568 }
4569
4570 /* enable an eventscript
4571  */
4572 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4573 {
4574         int ret;
4575         TDB_DATA data;
4576         int32_t res;
4577
4578         data.dsize = strlen(script) + 1;
4579         data.dptr  = discard_const(script);
4580
4581         ret = ctdb_control(ctdb, destnode, 0, 
4582                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4583                            NULL, NULL, &res, &timeout, NULL);
4584         if (ret != 0 || res != 0) {
4585                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4586                 return -1;
4587         }
4588
4589         return 0;
4590 }
4591
4592 /* disable an eventscript
4593  */
4594 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4595 {
4596         int ret;
4597         TDB_DATA data;
4598         int32_t res;
4599
4600         data.dsize = strlen(script) + 1;
4601         data.dptr  = discard_const(script);
4602
4603         ret = ctdb_control(ctdb, destnode, 0, 
4604                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4605                            NULL, NULL, &res, &timeout, NULL);
4606         if (ret != 0 || res != 0) {
4607                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4608                 return -1;
4609         }
4610
4611         return 0;
4612 }
4613
4614
4615 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4616 {
4617         int ret;
4618         TDB_DATA data;
4619         int32_t res;
4620
4621         data.dsize = sizeof(*bantime);
4622         data.dptr  = (uint8_t *)bantime;
4623
4624         ret = ctdb_control(ctdb, destnode, 0, 
4625                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4626                            NULL, NULL, &res, &timeout, NULL);
4627         if (ret != 0 || res != 0) {
4628                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4629                 return -1;
4630         }
4631
4632         return 0;
4633 }
4634
4635
4636 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4637 {
4638         int ret;
4639         TDB_DATA outdata;
4640         int32_t res;
4641         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4642
4643         ret = ctdb_control(ctdb, destnode, 0, 
4644                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4645                            tmp_ctx, &outdata, &res, &timeout, NULL);
4646         if (ret != 0 || res != 0) {
4647                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4648                 talloc_free(tmp_ctx);
4649                 return -1;
4650         }
4651
4652         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4653         talloc_free(tmp_ctx);
4654
4655         return 0;
4656 }
4657
4658
4659 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4660 {
4661         int ret;
4662         int32_t res;
4663         TDB_DATA data;
4664         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4665
4666         data.dptr = (uint8_t*)db_prio;
4667         data.dsize = sizeof(*db_prio);
4668
4669         ret = ctdb_control(ctdb, destnode, 0, 
4670                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4671                            tmp_ctx, NULL, &res, &timeout, NULL);
4672         if (ret != 0 || res != 0) {
4673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4674                 talloc_free(tmp_ctx);
4675                 return -1;
4676         }
4677
4678         talloc_free(tmp_ctx);
4679
4680         return 0;
4681 }
4682
4683 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4684 {
4685         int ret;
4686         int32_t res;
4687         TDB_DATA data;
4688         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4689
4690         data.dptr = (uint8_t*)&db_id;
4691         data.dsize = sizeof(db_id);
4692
4693         ret = ctdb_control(ctdb, destnode, 0, 
4694                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4695                            tmp_ctx, NULL, &res, &timeout, NULL);
4696         if (ret != 0 || res < 0) {
4697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4698                 talloc_free(tmp_ctx);
4699                 return -1;
4700         }
4701
4702         if (priority) {
4703                 *priority = res;
4704         }
4705
4706         talloc_free(tmp_ctx);
4707
4708         return 0;
4709 }
4710
4711 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4712 {
4713         int ret;
4714         TDB_DATA outdata;
4715         int32_t res;
4716
4717         ret = ctdb_control(ctdb, destnode, 0, 
4718                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4719                            mem_ctx, &outdata, &res, &timeout, NULL);
4720         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4721                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4722                 return -1;
4723         }
4724
4725         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4726         talloc_free(outdata.dptr);
4727                     
4728         return 0;
4729 }
4730
4731 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4732 {
4733         if (h == NULL) {
4734                 return NULL;
4735         }
4736
4737         return &h->header;
4738 }
4739
4740
4741 struct ctdb_client_control_state *
4742 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4743 {
4744         struct ctdb_client_control_state *handle;
4745         struct ctdb_marshall_buffer *m;
4746         struct ctdb_rec_data *rec;
4747         TDB_DATA outdata;
4748
4749         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4750         if (m == NULL) {
4751                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4752                 return NULL;
4753         }
4754
4755         m->db_id = ctdb_db->db_id;
4756
4757         rec = ctdb_marshall_record(m, 0, key, header, data);
4758         if (rec == NULL) {
4759                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4760                 talloc_free(m);
4761                 return NULL;
4762         }
4763         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4764         if (m == NULL) {
4765                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4766                 talloc_free(m);
4767                 return NULL;
4768         }
4769         m->count++;
4770         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4771
4772
4773         outdata.dptr = (uint8_t *)m;
4774         outdata.dsize = talloc_get_size(m);
4775
4776         handle = ctdb_control_send(ctdb, destnode, 0, 
4777                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4778                            mem_ctx, &timeout, NULL);
4779         talloc_free(m);
4780         return handle;
4781 }
4782
4783 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4784 {
4785         int ret;
4786         int32_t res;
4787
4788         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4789         if ( (ret != 0) || (res != 0) ){
4790                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4791                 return -1;
4792         }
4793
4794         return 0;
4795 }
4796
4797 int
4798 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4799 {
4800         struct ctdb_client_control_state *state;
4801
4802         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4803         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4804 }
4805
4806
4807
4808
4809
4810
4811 /*
4812   set a database to be readonly
4813  */
4814 struct ctdb_client_control_state *
4815 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4816 {
4817         TDB_DATA data;
4818
4819         data.dptr = (uint8_t *)&dbid;
4820         data.dsize = sizeof(dbid);
4821
4822         return ctdb_control_send(ctdb, destnode, 0, 
4823                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4824                            ctdb, NULL, NULL);
4825 }
4826
4827 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4828 {
4829         int ret;
4830         int32_t res;
4831
4832         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4833         if (ret != 0 || res != 0) {
4834           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4835                 return -1;
4836         }
4837
4838         return 0;
4839 }
4840
4841 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4842 {
4843         struct ctdb_client_control_state *state;
4844
4845         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4846         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4847 }
4848
4849 /*
4850   set a database to be sticky
4851  */
4852 struct ctdb_client_control_state *
4853 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4854 {
4855         TDB_DATA data;
4856
4857         data.dptr = (uint8_t *)&dbid;
4858         data.dsize = sizeof(dbid);
4859
4860         return ctdb_control_send(ctdb, destnode, 0, 
4861                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4862                            ctdb, NULL, NULL);
4863 }
4864
4865 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4866 {
4867         int ret;
4868         int32_t res;
4869
4870         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4871         if (ret != 0 || res != 0) {
4872                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4873                 return -1;
4874         }
4875
4876         return 0;
4877 }
4878
4879 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4880 {
4881         struct ctdb_client_control_state *state;
4882
4883         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4884         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4885 }