client: Reimplement persistent transaction code using TRANS3_COMMIT
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   (long unsigned int)num*sizeof(uint8_t),
538                                   outdata.dsize));
539                 talloc_free(outdata.dptr);
540                 return -1;
541         }
542
543         for (i=0; i<num; i++) {
544                 result[i] = outdata.dptr[i];
545         }
546
547         talloc_free(outdata.dptr);
548         return 0;
549 }
550
551 /*
552   send a message - from client context
553  */
554 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
555                       uint64_t srvid, TDB_DATA data)
556 {
557         struct ctdb_req_message *r;
558         int len, res;
559
560         len = offsetof(struct ctdb_req_message, data) + data.dsize;
561         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
562                                len, struct ctdb_req_message);
563         CTDB_NO_MEMORY(ctdb, r);
564
565         r->hdr.destnode  = pnn;
566         r->srvid         = srvid;
567         r->datalen       = data.dsize;
568         memcpy(&r->data[0], data.dptr, data.dsize);
569         
570         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
571         talloc_free(r);
572         return res;
573 }
574
575
576 /*
577   cancel a ctdb_fetch_lock operation, releasing the lock
578  */
579 static int fetch_lock_destructor(struct ctdb_record_handle *h)
580 {
581         ctdb_ltdb_unlock(h->ctdb_db, h->key);
582         return 0;
583 }
584
585 /*
586   force the migration of a record to this node
587  */
588 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
589 {
590         struct ctdb_call call;
591         ZERO_STRUCT(call);
592         call.call_id = CTDB_NULL_FUNC;
593         call.key = key;
594         call.flags = CTDB_IMMEDIATE_MIGRATION;
595         return ctdb_call(ctdb_db, &call);
596 }
597
598 /*
599   try to fetch a readonly copy of a record
600  */
601 static int
602 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
603 {
604         int ret;
605
606         struct ctdb_call call;
607         ZERO_STRUCT(call);
608
609         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
610         call.call_data.dptr = NULL;
611         call.call_data.dsize = 0;
612         call.key = key;
613         call.flags = CTDB_WANT_READONLY;
614         ret = ctdb_call(ctdb_db, &call);
615
616         if (ret != 0) {
617                 return -1;
618         }
619         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
620                 return -1;
621         }
622
623         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
624         if (*hdr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 return -1;
627         }
628
629         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
630         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
631         if (data->dptr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 talloc_free(hdr);
634                 return -1;
635         }
636
637         return 0;
638 }
639
640 /*
641   get a lock on a record, and return the records data. Blocks until it gets the lock
642  */
643 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                                            TDB_DATA key, TDB_DATA *data)
645 {
646         int ret;
647         struct ctdb_record_handle *h;
648
649         /*
650           procedure is as follows:
651
652           1) get the chain lock. 
653           2) check if we are dmaster
654           3) if we are the dmaster then return handle 
655           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
656              reply from ctdbd
657           5) when we get the reply, goto (1)
658          */
659
660         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
661         if (h == NULL) {
662                 return NULL;
663         }
664
665         h->ctdb_db = ctdb_db;
666         h->key     = key;
667         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
668         if (h->key.dptr == NULL) {
669                 talloc_free(h);
670                 return NULL;
671         }
672         h->data    = data;
673
674         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
675                  (const char *)key.dptr));
676
677 again:
678         /* step 1 - get the chain lock */
679         ret = ctdb_ltdb_lock(ctdb_db, key);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
682                 talloc_free(h);
683                 return NULL;
684         }
685
686         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
687
688         talloc_set_destructor(h, fetch_lock_destructor);
689
690         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
691
692         /* when torturing, ensure we test the remote path */
693         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
694             random() % 5 == 0) {
695                 h->header.dmaster = (uint32_t)-1;
696         }
697
698
699         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
700
701         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
702                 ctdb_ltdb_unlock(ctdb_db, key);
703                 ret = ctdb_client_force_migration(ctdb_db, key);
704                 if (ret != 0) {
705                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
706                         talloc_free(h);
707                         return NULL;
708                 }
709                 goto again;
710         }
711
712         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
713         return h;
714 }
715
716 /*
717   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
718  */
719 struct ctdb_record_handle *
720 ctdb_fetch_readonly_lock(
721         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
722         TDB_DATA key, TDB_DATA *data,
723         int read_only)
724 {
725         int ret;
726         struct ctdb_record_handle *h;
727         struct ctdb_ltdb_header *roheader = NULL;
728
729         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
730         if (h == NULL) {
731                 return NULL;
732         }
733
734         h->ctdb_db = ctdb_db;
735         h->key     = key;
736         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
737         if (h->key.dptr == NULL) {
738                 talloc_free(h);
739                 return NULL;
740         }
741         h->data    = data;
742
743         data->dptr = NULL;
744         data->dsize = 0;
745
746
747 again:
748         talloc_free(roheader);
749         roheader = NULL;
750
751         talloc_free(data->dptr);
752         data->dptr = NULL;
753         data->dsize = 0;
754
755         /* Lock the record/chain */
756         ret = ctdb_ltdb_lock(ctdb_db, key);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
759                 talloc_free(h);
760                 return NULL;
761         }
762
763         talloc_set_destructor(h, fetch_lock_destructor);
764
765         /* Check if record exists yet in the TDB */
766         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
767         if (ret != 0) {
768                 ctdb_ltdb_unlock(ctdb_db, key);
769                 ret = ctdb_client_force_migration(ctdb_db, key);
770                 if (ret != 0) {
771                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
772                         talloc_free(h);
773                         return NULL;
774                 }
775                 goto again;
776         }
777
778         /* if this is a request for read/write and we have delegations
779            we have to revoke all delegations first
780         */
781         if ((read_only == 0) 
782         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
783         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_force_migration(ctdb_db, key);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
788                         talloc_free(h);
789                         return NULL;
790                 }
791                 goto again;
792         }
793
794         /* if we are dmaster, just return the handle */
795         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
796                 return h;
797         }
798
799         if (read_only != 0) {
800                 TDB_DATA rodata = {NULL, 0};
801
802                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
803                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
804                         return h;
805                 }
806
807                 ctdb_ltdb_unlock(ctdb_db, key);
808                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
809                 if (ret != 0) {
810                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
811                         ret = ctdb_client_force_migration(ctdb_db, key);
812                         if (ret != 0) {
813                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
814                                 talloc_free(h);
815                                 return NULL;
816                         }
817
818                         goto again;
819                 }
820
821                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
822                         ret = ctdb_client_force_migration(ctdb_db, key);
823                         if (ret != 0) {
824                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
825                                 talloc_free(h);
826                                 return NULL;
827                         }
828
829                         goto again;
830                 }
831
832                 ret = ctdb_ltdb_lock(ctdb_db, key);
833                 if (ret != 0) {
834                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
835                         talloc_free(h);
836                         return NULL;
837                 }
838
839                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
840                 if (ret != 0) {
841                         ctdb_ltdb_unlock(ctdb_db, key);
842
843                         ret = ctdb_client_force_migration(ctdb_db, key);
844                         if (ret != 0) {
845                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
846                                 talloc_free(h);
847                                 return NULL;
848                         }
849
850                         goto again;
851                 }
852
853                 return h;
854         }
855
856         /* we are not dmaster and this was not a request for a readonly lock
857          * so unlock the record, migrate it and try again
858          */
859         ctdb_ltdb_unlock(ctdb_db, key);
860         ret = ctdb_client_force_migration(ctdb_db, key);
861         if (ret != 0) {
862                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
863                 talloc_free(h);
864                 return NULL;
865         }
866         goto again;
867 }
868
869 /*
870   store some data to the record that was locked with ctdb_fetch_lock()
871 */
872 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
873 {
874         if (h->ctdb_db->persistent) {
875                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
876                 return -1;
877         }
878
879         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
880 }
881
882 /*
883   non-locking fetch of a record
884  */
885 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
886                TDB_DATA key, TDB_DATA *data)
887 {
888         struct ctdb_call call;
889         int ret;
890
891         call.call_id = CTDB_FETCH_FUNC;
892         call.call_data.dptr = NULL;
893         call.call_data.dsize = 0;
894         call.key = key;
895
896         ret = ctdb_call(ctdb_db, &call);
897
898         if (ret == 0) {
899                 *data = call.reply_data;
900                 talloc_steal(mem_ctx, data->dptr);
901         }
902
903         return ret;
904 }
905
906
907
908 /*
909    called when a control completes or timesout to invoke the callback
910    function the user provided
911 */
912 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
913         struct timeval t, void *private_data)
914 {
915         struct ctdb_client_control_state *state;
916         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
917         int ret;
918
919         state = talloc_get_type(private_data, struct ctdb_client_control_state);
920         talloc_steal(tmp_ctx, state);
921
922         ret = ctdb_control_recv(state->ctdb, state, state,
923                         NULL, 
924                         NULL, 
925                         NULL);
926         if (ret != 0) {
927                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
928         }
929
930         talloc_free(tmp_ctx);
931 }
932
933 /*
934   called when a CTDB_REPLY_CONTROL packet comes in in the client
935
936   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
937   contains any reply data from the control
938 */
939 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
940                                       struct ctdb_req_header *hdr)
941 {
942         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
943         struct ctdb_client_control_state *state;
944
945         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
946         if (state == NULL) {
947                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
948                 return;
949         }
950
951         if (hdr->reqid != state->reqid) {
952                 /* we found a record  but it was the wrong one */
953                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
954                 return;
955         }
956
957         state->outdata.dptr = c->data;
958         state->outdata.dsize = c->datalen;
959         state->status = c->status;
960         if (c->errorlen) {
961                 state->errormsg = talloc_strndup(state, 
962                                                  (char *)&c->data[c->datalen], 
963                                                  c->errorlen);
964         }
965
966         /* state->outdata now uses resources from c so we dont want c
967            to just dissappear from under us while state is still alive
968         */
969         talloc_steal(state, c);
970
971         state->state = CTDB_CONTROL_DONE;
972
973         /* if we had a callback registered for this control, pull the response
974            and call the callback.
975         */
976         if (state->async.fn) {
977                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
978         }
979 }
980
981
982 /*
983   destroy a ctdb_control in client
984 */
985 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
986 {
987         ctdb_reqid_remove(state->ctdb, state->reqid);
988         return 0;
989 }
990
991
992 /* time out handler for ctdb_control */
993 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
994         struct timeval t, void *private_data)
995 {
996         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
997
998         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
999                          "dstnode:%u\n", state->reqid, state->c->opcode,
1000                          state->c->hdr.destnode));
1001
1002         state->state = CTDB_CONTROL_TIMEOUT;
1003
1004         /* if we had a callback registered for this control, pull the response
1005            and call the callback.
1006         */
1007         if (state->async.fn) {
1008                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1009         }
1010 }
1011
1012 /* async version of send control request */
1013 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1014                 uint32_t destnode, uint64_t srvid, 
1015                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1016                 TALLOC_CTX *mem_ctx,
1017                 struct timeval *timeout,
1018                 char **errormsg)
1019 {
1020         struct ctdb_client_control_state *state;
1021         size_t len;
1022         struct ctdb_req_control *c;
1023         int ret;
1024
1025         if (errormsg) {
1026                 *errormsg = NULL;
1027         }
1028
1029         /* if the domain socket is not yet open, open it */
1030         if (ctdb->daemon.sd==-1) {
1031                 ctdb_socket_connect(ctdb);
1032         }
1033
1034         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1035         CTDB_NO_MEMORY_NULL(ctdb, state);
1036
1037         state->ctdb       = ctdb;
1038         state->reqid      = ctdb_reqid_new(ctdb, state);
1039         state->state      = CTDB_CONTROL_WAIT;
1040         state->errormsg   = NULL;
1041
1042         talloc_set_destructor(state, ctdb_client_control_destructor);
1043
1044         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1045         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1046                                len, struct ctdb_req_control);
1047         state->c            = c;        
1048         CTDB_NO_MEMORY_NULL(ctdb, c);
1049         c->hdr.reqid        = state->reqid;
1050         c->hdr.destnode     = destnode;
1051         c->opcode           = opcode;
1052         c->client_id        = 0;
1053         c->flags            = flags;
1054         c->srvid            = srvid;
1055         c->datalen          = data.dsize;
1056         if (data.dsize) {
1057                 memcpy(&c->data[0], data.dptr, data.dsize);
1058         }
1059
1060         /* timeout */
1061         if (timeout && !timeval_is_zero(timeout)) {
1062                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1063         }
1064
1065         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1066         if (ret != 0) {
1067                 talloc_free(state);
1068                 return NULL;
1069         }
1070
1071         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1072                 talloc_free(state);
1073                 return NULL;
1074         }
1075
1076         return state;
1077 }
1078
1079
1080 /* async version of receive control reply */
1081 int ctdb_control_recv(struct ctdb_context *ctdb, 
1082                 struct ctdb_client_control_state *state, 
1083                 TALLOC_CTX *mem_ctx,
1084                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1085 {
1086         TALLOC_CTX *tmp_ctx;
1087
1088         if (status != NULL) {
1089                 *status = -1;
1090         }
1091         if (errormsg != NULL) {
1092                 *errormsg = NULL;
1093         }
1094
1095         if (state == NULL) {
1096                 return -1;
1097         }
1098
1099         /* prevent double free of state */
1100         tmp_ctx = talloc_new(ctdb);
1101         talloc_steal(tmp_ctx, state);
1102
1103         /* loop one event at a time until we either timeout or the control
1104            completes.
1105         */
1106         while (state->state == CTDB_CONTROL_WAIT) {
1107                 event_loop_once(ctdb->ev);
1108         }
1109
1110         if (state->state != CTDB_CONTROL_DONE) {
1111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1112                 if (state->async.fn) {
1113                         state->async.fn(state);
1114                 }
1115                 talloc_free(tmp_ctx);
1116                 return -1;
1117         }
1118
1119         if (state->errormsg) {
1120                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1121                 if (errormsg) {
1122                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1123                 }
1124                 if (state->async.fn) {
1125                         state->async.fn(state);
1126                 }
1127                 talloc_free(tmp_ctx);
1128                 return -1;
1129         }
1130
1131         if (outdata) {
1132                 *outdata = state->outdata;
1133                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1134         }
1135
1136         if (status) {
1137                 *status = state->status;
1138         }
1139
1140         if (state->async.fn) {
1141                 state->async.fn(state);
1142         }
1143
1144         talloc_free(tmp_ctx);
1145         return 0;
1146 }
1147
1148
1149
1150 /*
1151   send a ctdb control message
1152   timeout specifies how long we should wait for a reply.
1153   if timeout is NULL we wait indefinitely
1154  */
1155 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1156                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1157                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1158                  struct timeval *timeout,
1159                  char **errormsg)
1160 {
1161         struct ctdb_client_control_state *state;
1162
1163         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1164                         flags, data, mem_ctx,
1165                         timeout, errormsg);
1166
1167         /* FIXME: Error conditions in ctdb_control_send return NULL without
1168          * setting errormsg.  So, there is no way to distinguish between sucess
1169          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1170         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1171                 if (status != NULL) {
1172                         *status = 0;
1173                 }
1174                 return 0;
1175         }
1176
1177         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1178                         errormsg);
1179 }
1180
1181
1182
1183
1184 /*
1185   a process exists call. Returns 0 if process exists, -1 otherwise
1186  */
1187 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1188 {
1189         int ret;
1190         TDB_DATA data;
1191         int32_t status;
1192
1193         data.dptr = (uint8_t*)&pid;
1194         data.dsize = sizeof(pid);
1195
1196         ret = ctdb_control(ctdb, destnode, 0, 
1197                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1198                            NULL, NULL, &status, NULL, NULL);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1201                 return -1;
1202         }
1203
1204         return status;
1205 }
1206
1207 /*
1208   get remote statistics
1209  */
1210 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1211 {
1212         int ret;
1213         TDB_DATA data;
1214         int32_t res;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1218                            ctdb, &data, &res, NULL, NULL);
1219         if (ret != 0 || res != 0) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1221                 return -1;
1222         }
1223
1224         if (data.dsize != sizeof(struct ctdb_statistics)) {
1225                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1226                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1227                       return -1;
1228         }
1229
1230         *status = *(struct ctdb_statistics *)data.dptr;
1231         talloc_free(data.dptr);
1232                         
1233         return 0;
1234 }
1235
1236 /*
1237  * get db statistics
1238  */
1239 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1240                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1241 {
1242         int ret;
1243         TDB_DATA indata, outdata;
1244         int32_t res;
1245         struct ctdb_db_statistics *wire, *s;
1246         char *ptr;
1247         int i;
1248
1249         indata.dptr = (uint8_t *)&dbid;
1250         indata.dsize = sizeof(dbid);
1251
1252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1253                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1254         if (ret != 0 || res != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1256                 return -1;
1257         }
1258
1259         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1260                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1261                                  outdata.dsize,
1262                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1263                 return -1;
1264         }
1265
1266         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1267         if (s == NULL) {
1268                 talloc_free(outdata.dptr);
1269                 CTDB_NO_MEMORY(ctdb, s);
1270         }
1271
1272         wire = (struct ctdb_db_statistics *)outdata.dptr;
1273         *s = *wire;
1274         ptr = &wire->hot_keys_wire[0];
1275         for (i=0; i<wire->num_hot_keys; i++) {
1276                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1277                 if (s->hot_keys[i].key.dptr == NULL) {
1278                         talloc_free(outdata.dptr);
1279                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1280                 }
1281
1282                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1283                 ptr += wire->hot_keys[i].key.dsize;
1284         }
1285
1286         talloc_free(outdata.dptr);
1287         *dbstat = s;
1288         return 0;
1289 }
1290
1291 /*
1292   shutdown a remote ctdb node
1293  */
1294 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1295 {
1296         struct ctdb_client_control_state *state;
1297
1298         state = ctdb_control_send(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1300                            NULL, &timeout, NULL);
1301         if (state == NULL) {
1302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1303                 return -1;
1304         }
1305
1306         return 0;
1307 }
1308
1309 /*
1310   get vnn map from a remote node
1311  */
1312 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1313 {
1314         int ret;
1315         TDB_DATA outdata;
1316         int32_t res;
1317         struct ctdb_vnn_map_wire *map;
1318
1319         ret = ctdb_control(ctdb, destnode, 0, 
1320                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1321                            mem_ctx, &outdata, &res, &timeout, NULL);
1322         if (ret != 0 || res != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1324                 return -1;
1325         }
1326         
1327         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1328         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1329             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1330                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1331                 return -1;
1332         }
1333
1334         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1335         CTDB_NO_MEMORY(ctdb, *vnnmap);
1336         (*vnnmap)->generation = map->generation;
1337         (*vnnmap)->size       = map->size;
1338         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1339
1340         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1341         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1342         talloc_free(outdata.dptr);
1343                     
1344         return 0;
1345 }
1346
1347
1348 /*
1349   get the recovery mode of a remote node
1350  */
1351 struct ctdb_client_control_state *
1352 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_control_send(ctdb, destnode, 0, 
1355                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1356                            mem_ctx, &timeout, NULL);
1357 }
1358
1359 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1360 {
1361         int ret;
1362         int32_t res;
1363
1364         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1367                 return -1;
1368         }
1369
1370         if (recmode) {
1371                 *recmode = (uint32_t)res;
1372         }
1373
1374         return 0;
1375 }
1376
1377 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1378 {
1379         struct ctdb_client_control_state *state;
1380
1381         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1382         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1383 }
1384
1385
1386
1387
1388 /*
1389   set the recovery mode of a remote node
1390  */
1391 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1392 {
1393         int ret;
1394         TDB_DATA data;
1395         int32_t res;
1396
1397         data.dsize = sizeof(uint32_t);
1398         data.dptr = (unsigned char *)&recmode;
1399
1400         ret = ctdb_control(ctdb, destnode, 0, 
1401                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1402                            NULL, NULL, &res, &timeout, NULL);
1403         if (ret != 0 || res != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1405                 return -1;
1406         }
1407
1408         return 0;
1409 }
1410
1411
1412
1413 /*
1414   get the recovery master of a remote node
1415  */
1416 struct ctdb_client_control_state *
1417 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1418                         struct timeval timeout, uint32_t destnode)
1419 {
1420         return ctdb_control_send(ctdb, destnode, 0, 
1421                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1422                            mem_ctx, &timeout, NULL);
1423 }
1424
1425 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1431         if (ret != 0) {
1432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1433                 return -1;
1434         }
1435
1436         if (recmaster) {
1437                 *recmaster = (uint32_t)res;
1438         }
1439
1440         return 0;
1441 }
1442
1443 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1444 {
1445         struct ctdb_client_control_state *state;
1446
1447         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1448         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1449 }
1450
1451
1452 /*
1453   set the recovery master of a remote node
1454  */
1455 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1456 {
1457         int ret;
1458         TDB_DATA data;
1459         int32_t res;
1460
1461         ZERO_STRUCT(data);
1462         data.dsize = sizeof(uint32_t);
1463         data.dptr = (unsigned char *)&recmaster;
1464
1465         ret = ctdb_control(ctdb, destnode, 0, 
1466                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1467                            NULL, NULL, &res, &timeout, NULL);
1468         if (ret != 0 || res != 0) {
1469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1470                 return -1;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   get a list of databases off a remote node
1479  */
1480 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1481                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1482 {
1483         int ret;
1484         TDB_DATA outdata;
1485         int32_t res;
1486
1487         ret = ctdb_control(ctdb, destnode, 0, 
1488                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1489                            mem_ctx, &outdata, &res, &timeout, NULL);
1490         if (ret != 0 || res != 0) {
1491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1492                 return -1;
1493         }
1494
1495         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1496         talloc_free(outdata.dptr);
1497                     
1498         return 0;
1499 }
1500
1501 /*
1502   get a list of nodes (vnn and flags ) from a remote node
1503  */
1504 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1505                 struct timeval timeout, uint32_t destnode, 
1506                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1507 {
1508         int ret;
1509         TDB_DATA outdata;
1510         int32_t res;
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1514                            mem_ctx, &outdata, &res, &timeout, NULL);
1515         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1516                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1517                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1518         }
1519         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1521                 return -1;
1522         }
1523
1524         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1525         talloc_free(outdata.dptr);
1526                     
1527         return 0;
1528 }
1529
1530 /*
1531   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1532  */
1533 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1534                 struct timeval timeout, uint32_t destnode, 
1535                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1536 {
1537         int ret, i, len;
1538         TDB_DATA outdata;
1539         struct ctdb_node_mapv4 *nodemapv4;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1551
1552         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1553         (*nodemap) = talloc_zero_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, (*nodemap));
1555
1556         (*nodemap)->num = nodemapv4->num;
1557         for (i=0; i<nodemapv4->num; i++) {
1558                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1559                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1560                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1561                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1562         }
1563                 
1564         talloc_free(outdata.dptr);
1565                     
1566         return 0;
1567 }
1568
1569 /*
1570   drop the transport, reload the nodes file and restart the transport
1571  */
1572 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1573                     struct timeval timeout, uint32_t destnode)
1574 {
1575         int ret;
1576         int32_t res;
1577
1578         ret = ctdb_control(ctdb, destnode, 0, 
1579                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1580                            NULL, NULL, &res, &timeout, NULL);
1581         if (ret != 0 || res != 0) {
1582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1583                 return -1;
1584         }
1585
1586         return 0;
1587 }
1588
1589
1590 /*
1591   set vnn map on a node
1592  */
1593 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1594                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1595 {
1596         int ret;
1597         TDB_DATA data;
1598         int32_t res;
1599         struct ctdb_vnn_map_wire *map;
1600         size_t len;
1601
1602         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1603         map = talloc_size(mem_ctx, len);
1604         CTDB_NO_MEMORY(ctdb, map);
1605
1606         map->generation = vnnmap->generation;
1607         map->size = vnnmap->size;
1608         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1609         
1610         data.dsize = len;
1611         data.dptr  = (uint8_t *)map;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1615                            NULL, NULL, &res, &timeout, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1618                 return -1;
1619         }
1620
1621         talloc_free(map);
1622
1623         return 0;
1624 }
1625
1626
1627 /*
1628   async send for pull database
1629  */
1630 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1631         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1632         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1633 {
1634         TDB_DATA indata;
1635         struct ctdb_control_pulldb *pull;
1636         struct ctdb_client_control_state *state;
1637
1638         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1639         CTDB_NO_MEMORY_NULL(ctdb, pull);
1640
1641         pull->db_id   = dbid;
1642         pull->lmaster = lmaster;
1643
1644         indata.dsize = sizeof(struct ctdb_control_pulldb);
1645         indata.dptr  = (unsigned char *)pull;
1646
1647         state = ctdb_control_send(ctdb, destnode, 0, 
1648                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1649                                   mem_ctx, &timeout, NULL);
1650         talloc_free(pull);
1651
1652         return state;
1653 }
1654
1655 /*
1656   async recv for pull database
1657  */
1658 int ctdb_ctrl_pulldb_recv(
1659         struct ctdb_context *ctdb, 
1660         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1661         TDB_DATA *outdata)
1662 {
1663         int ret;
1664         int32_t res;
1665
1666         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1667         if ( (ret != 0) || (res != 0) ){
1668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1669                 return -1;
1670         }
1671
1672         return 0;
1673 }
1674
1675 /*
1676   pull all keys and records for a specific database on a node
1677  */
1678 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1679                 uint32_t dbid, uint32_t lmaster, 
1680                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1681                 TDB_DATA *outdata)
1682 {
1683         struct ctdb_client_control_state *state;
1684
1685         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1686                                       timeout);
1687         
1688         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1689 }
1690
1691
1692 /*
1693   change dmaster for all keys in the database to the new value
1694  */
1695 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1696                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1697 {
1698         int ret;
1699         TDB_DATA indata;
1700         int32_t res;
1701
1702         indata.dsize = 2*sizeof(uint32_t);
1703         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1704
1705         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1706         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1710                            NULL, NULL, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1713                 return -1;
1714         }
1715
1716         return 0;
1717 }
1718
1719 /*
1720   ping a node, return number of clients connected
1721  */
1722 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1723 {
1724         int ret;
1725         int32_t res;
1726
1727         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1728                            tdb_null, NULL, NULL, &res, NULL, NULL);
1729         if (ret != 0) {
1730                 return -1;
1731         }
1732         return res;
1733 }
1734
1735 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1736                            struct timeval timeout, 
1737                            uint32_t destnode,
1738                            uint32_t *runstate)
1739 {
1740         TDB_DATA outdata;
1741         int32_t res;
1742         int ret;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1745                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1746         if (ret != 0 || res != 0) {
1747                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1748                 return ret != 0 ? ret : res;
1749         }
1750
1751         if (outdata.dsize != sizeof(uint32_t)) {
1752                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1753                 talloc_free(outdata.dptr);
1754                 return -1;
1755         }
1756
1757         if (runstate != NULL) {
1758                 *runstate = *(uint32_t *)outdata.dptr;
1759         }
1760         talloc_free(outdata.dptr);
1761
1762         return 0;
1763 }
1764
1765 /*
1766   find the real path to a ltdb 
1767  */
1768 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1769                    const char **path)
1770 {
1771         int ret;
1772         int32_t res;
1773         TDB_DATA data;
1774
1775         data.dptr = (uint8_t *)&dbid;
1776         data.dsize = sizeof(dbid);
1777
1778         ret = ctdb_control(ctdb, destnode, 0, 
1779                            CTDB_CONTROL_GETDBPATH, 0, data, 
1780                            mem_ctx, &data, &res, &timeout, NULL);
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1786         if ((*path) == NULL) {
1787                 return -1;
1788         }
1789
1790         talloc_free(data.dptr);
1791
1792         return 0;
1793 }
1794
1795 /*
1796   find the name of a db 
1797  */
1798 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1799                    const char **name)
1800 {
1801         int ret;
1802         int32_t res;
1803         TDB_DATA data;
1804
1805         data.dptr = (uint8_t *)&dbid;
1806         data.dsize = sizeof(dbid);
1807
1808         ret = ctdb_control(ctdb, destnode, 0, 
1809                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1810                            mem_ctx, &data, &res, &timeout, NULL);
1811         if (ret != 0 || res != 0) {
1812                 return -1;
1813         }
1814
1815         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1816         if ((*name) == NULL) {
1817                 return -1;
1818         }
1819
1820         talloc_free(data.dptr);
1821
1822         return 0;
1823 }
1824
1825 /*
1826   get the health status of a db
1827  */
1828 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1829                           struct timeval timeout,
1830                           uint32_t destnode,
1831                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1832                           const char **reason)
1833 {
1834         int ret;
1835         int32_t res;
1836         TDB_DATA data;
1837
1838         data.dptr = (uint8_t *)&dbid;
1839         data.dsize = sizeof(dbid);
1840
1841         ret = ctdb_control(ctdb, destnode, 0,
1842                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1843                            mem_ctx, &data, &res, &timeout, NULL);
1844         if (ret != 0 || res != 0) {
1845                 return -1;
1846         }
1847
1848         if (data.dsize == 0) {
1849                 (*reason) = NULL;
1850                 return 0;
1851         }
1852
1853         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1854         if ((*reason) == NULL) {
1855                 return -1;
1856         }
1857
1858         talloc_free(data.dptr);
1859
1860         return 0;
1861 }
1862
1863 /*
1864  * get db sequence number
1865  */
1866 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1867                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1868 {
1869         int ret;
1870         int32_t res;
1871         TDB_DATA data, outdata;
1872
1873         data.dptr = (uint8_t *)&dbid;
1874         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1875
1876         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1877                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1878         if (ret != 0 || res != 0) {
1879                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1880                 return -1;
1881         }
1882
1883         if (outdata.dsize != sizeof(uint64_t)) {
1884                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1885                 talloc_free(outdata.dptr);
1886                 return -1;
1887         }
1888
1889         if (seqnum != NULL) {
1890                 *seqnum = *(uint64_t *)outdata.dptr;
1891         }
1892         talloc_free(outdata.dptr);
1893
1894         return 0;
1895 }
1896
1897 /*
1898   create a database
1899  */
1900 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1901                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1902 {
1903         int ret;
1904         int32_t res;
1905         TDB_DATA data;
1906         uint64_t tdb_flags = 0;
1907
1908         data.dptr = discard_const(name);
1909         data.dsize = strlen(name)+1;
1910
1911         /* Make sure that volatile databases use jenkins hash */
1912         if (!persistent) {
1913                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1914         }
1915
1916         ret = ctdb_control(ctdb, destnode, tdb_flags,
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1918                            0, data, 
1919                            mem_ctx, &data, &res, &timeout, NULL);
1920
1921         if (ret != 0 || res != 0) {
1922                 return -1;
1923         }
1924
1925         return 0;
1926 }
1927
1928 /*
1929   get debug level on a node
1930  */
1931 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1932 {
1933         int ret;
1934         int32_t res;
1935         TDB_DATA data;
1936
1937         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1938                            ctdb, &data, &res, NULL, NULL);
1939         if (ret != 0 || res != 0) {
1940                 return -1;
1941         }
1942         if (data.dsize != sizeof(int32_t)) {
1943                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1944                          (unsigned)data.dsize));
1945                 return -1;
1946         }
1947         *level = *(int32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949         return 0;
1950 }
1951
1952 /*
1953   set debug level on a node
1954  */
1955 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1956 {
1957         int ret;
1958         int32_t res;
1959         TDB_DATA data;
1960
1961         data.dptr = (uint8_t *)&level;
1962         data.dsize = sizeof(level);
1963
1964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1965                            NULL, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 return -1;
1968         }
1969         return 0;
1970 }
1971
1972
1973 /*
1974   get a list of connected nodes
1975  */
1976 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1977                                 struct timeval timeout,
1978                                 TALLOC_CTX *mem_ctx,
1979                                 uint32_t *num_nodes)
1980 {
1981         struct ctdb_node_map *map=NULL;
1982         int ret, i;
1983         uint32_t *nodes;
1984
1985         *num_nodes = 0;
1986
1987         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1988         if (ret != 0) {
1989                 return NULL;
1990         }
1991
1992         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1993         if (nodes == NULL) {
1994                 return NULL;
1995         }
1996
1997         for (i=0;i<map->num;i++) {
1998                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1999                         nodes[*num_nodes] = map->nodes[i].pnn;
2000                         (*num_nodes)++;
2001                 }
2002         }
2003
2004         return nodes;
2005 }
2006
2007
2008 /*
2009   reset remote status
2010  */
2011 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2012 {
2013         int ret;
2014         int32_t res;
2015
2016         ret = ctdb_control(ctdb, destnode, 0, 
2017                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2018                            NULL, NULL, &res, NULL, NULL);
2019         if (ret != 0 || res != 0) {
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2021                 return -1;
2022         }
2023         return 0;
2024 }
2025
2026 /*
2027   attach to a specific database - client call
2028 */
2029 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2030                                     struct timeval timeout,
2031                                     const char *name,
2032                                     bool persistent,
2033                                     uint32_t tdb_flags)
2034 {
2035         struct ctdb_db_context *ctdb_db;
2036         TDB_DATA data;
2037         int ret;
2038         int32_t res;
2039
2040         ctdb_db = ctdb_db_handle(ctdb, name);
2041         if (ctdb_db) {
2042                 return ctdb_db;
2043         }
2044
2045         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2046         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2047
2048         ctdb_db->ctdb = ctdb;
2049         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2050         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2051
2052         data.dptr = discard_const(name);
2053         data.dsize = strlen(name)+1;
2054
2055         /* CTDB has switched to using jenkins hash for volatile databases.
2056          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2057          * always set it.
2058          */
2059         if (!persistent) {
2060                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2061         }
2062
2063         /* tell ctdb daemon to attach */
2064         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2065                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2066                            0, data, ctdb_db, &data, &res, NULL, NULL);
2067         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2068                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2069                 talloc_free(ctdb_db);
2070                 return NULL;
2071         }
2072         
2073         ctdb_db->db_id = *(uint32_t *)data.dptr;
2074         talloc_free(data.dptr);
2075
2076         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2079                 talloc_free(ctdb_db);
2080                 return NULL;
2081         }
2082
2083         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2084         if (ctdb->valgrinding) {
2085                 tdb_flags |= TDB_NOMMAP;
2086         }
2087         tdb_flags |= TDB_DISALLOW_NESTING;
2088
2089         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2090         if (ctdb_db->ltdb == NULL) {
2091                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2092                 talloc_free(ctdb_db);
2093                 return NULL;
2094         }
2095
2096         ctdb_db->persistent = persistent;
2097
2098         DLIST_ADD(ctdb->db_list, ctdb_db);
2099
2100         /* add well known functions */
2101         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2102         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2103         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2104
2105         return ctdb_db;
2106 }
2107
2108
2109 /*
2110   setup a call for a database
2111  */
2112 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2113 {
2114         struct ctdb_registered_call *call;
2115
2116 #if 0
2117         TDB_DATA data;
2118         int32_t status;
2119         struct ctdb_control_set_call c;
2120         int ret;
2121
2122         /* this is no longer valid with the separate daemon architecture */
2123         c.db_id = ctdb_db->db_id;
2124         c.fn    = fn;
2125         c.id    = id;
2126
2127         data.dptr = (uint8_t *)&c;
2128         data.dsize = sizeof(c);
2129
2130         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2131                            data, NULL, NULL, &status, NULL, NULL);
2132         if (ret != 0 || status != 0) {
2133                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2134                 return -1;
2135         }
2136 #endif
2137
2138         /* also register locally */
2139         call = talloc(ctdb_db, struct ctdb_registered_call);
2140         call->fn = fn;
2141         call->id = id;
2142
2143         DLIST_ADD(ctdb_db->calls, call);        
2144         return 0;
2145 }
2146
2147
2148 struct traverse_state {
2149         bool done;
2150         uint32_t count;
2151         ctdb_traverse_func fn;
2152         void *private_data;
2153         bool listemptyrecords;
2154 };
2155
2156 /*
2157   called on each key during a ctdb_traverse
2158  */
2159 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2160 {
2161         struct traverse_state *state = (struct traverse_state *)p;
2162         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2163         TDB_DATA key;
2164
2165         if (data.dsize < sizeof(uint32_t) ||
2166             d->length != data.dsize) {
2167                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2168                 state->done = true;
2169                 return;
2170         }
2171
2172         key.dsize = d->keylen;
2173         key.dptr  = &d->data[0];
2174         data.dsize = d->datalen;
2175         data.dptr = &d->data[d->keylen];
2176
2177         if (key.dsize == 0 && data.dsize == 0) {
2178                 /* end of traverse */
2179                 state->done = true;
2180                 return;
2181         }
2182
2183         if (!state->listemptyrecords &&
2184             data.dsize == sizeof(struct ctdb_ltdb_header))
2185         {
2186                 /* empty records are deleted records in ctdb */
2187                 return;
2188         }
2189
2190         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2191                 state->done = true;
2192         }
2193
2194         state->count++;
2195 }
2196
2197 /**
2198  * start a cluster wide traverse, calling the supplied fn on each record
2199  * return the number of records traversed, or -1 on error
2200  *
2201  * Extendet variant with a flag to signal whether empty records should
2202  * be listed.
2203  */
2204 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2205                              ctdb_traverse_func fn,
2206                              bool withemptyrecords,
2207                              void *private_data)
2208 {
2209         TDB_DATA data;
2210         struct ctdb_traverse_start_ext t;
2211         int32_t status;
2212         int ret;
2213         uint64_t srvid = (getpid() | 0xFLL<<60);
2214         struct traverse_state state;
2215
2216         state.done = false;
2217         state.count = 0;
2218         state.private_data = private_data;
2219         state.fn = fn;
2220         state.listemptyrecords = withemptyrecords;
2221
2222         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2223         if (ret != 0) {
2224                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2225                 return -1;
2226         }
2227
2228         t.db_id = ctdb_db->db_id;
2229         t.srvid = srvid;
2230         t.reqid = 0;
2231         t.withemptyrecords = withemptyrecords;
2232
2233         data.dptr = (uint8_t *)&t;
2234         data.dsize = sizeof(t);
2235
2236         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2237                            data, NULL, NULL, &status, NULL, NULL);
2238         if (ret != 0 || status != 0) {
2239                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2240                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2241                 return -1;
2242         }
2243
2244         while (!state.done) {
2245                 event_loop_once(ctdb_db->ctdb->ev);
2246         }
2247
2248         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2251                 return -1;
2252         }
2253
2254         return state.count;
2255 }
2256
2257 /**
2258  * start a cluster wide traverse, calling the supplied fn on each record
2259  * return the number of records traversed, or -1 on error
2260  *
2261  * Standard version which does not list the empty records:
2262  * These are considered deleted.
2263  */
2264 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2265 {
2266         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2267 }
2268
2269 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2270 /*
2271   called on each key during a catdb
2272  */
2273 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2274 {
2275         int i;
2276         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2277         FILE *f = c->f;
2278         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2279
2280         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2281         for (i=0;i<key.dsize;i++) {
2282                 if (ISASCII(key.dptr[i])) {
2283                         fprintf(f, "%c", key.dptr[i]);
2284                 } else {
2285                         fprintf(f, "\\%02X", key.dptr[i]);
2286                 }
2287         }
2288         fprintf(f, "\"\n");
2289
2290         fprintf(f, "dmaster: %u\n", h->dmaster);
2291         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2292
2293         if (c->printlmaster && ctdb->vnn_map != NULL) {
2294                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2295         }
2296
2297         if (c->printhash) {
2298                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2299         }
2300
2301         if (c->printrecordflags) {
2302                 fprintf(f, "flags: 0x%08x", h->flags);
2303                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2304                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2305                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2306                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2307                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2308                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2309                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2310                 fprintf(f, "\n");
2311         }
2312
2313         if (c->printdatasize) {
2314                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2315         } else {
2316                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2317                 for (i=sizeof(*h);i<data.dsize;i++) {
2318                         if (ISASCII(data.dptr[i])) {
2319                                 fprintf(f, "%c", data.dptr[i]);
2320                         } else {
2321                                 fprintf(f, "\\%02X", data.dptr[i]);
2322                         }
2323                 }
2324                 fprintf(f, "\"\n");
2325         }
2326
2327         fprintf(f, "\n");
2328
2329         return 0;
2330 }
2331
2332 /*
2333   convenience function to list all keys to stdout
2334  */
2335 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2336                  struct ctdb_dump_db_context *ctx)
2337 {
2338         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2339                                  ctx->printemptyrecords, ctx);
2340 }
2341
2342 /*
2343   get the pid of a ctdb daemon
2344  */
2345 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2346 {
2347         int ret;
2348         int32_t res;
2349
2350         ret = ctdb_control(ctdb, destnode, 0, 
2351                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2352                            NULL, NULL, &res, &timeout, NULL);
2353         if (ret != 0) {
2354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2355                 return -1;
2356         }
2357
2358         *pid = res;
2359
2360         return 0;
2361 }
2362
2363
2364 /*
2365   async freeze send control
2366  */
2367 struct ctdb_client_control_state *
2368 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2369 {
2370         return ctdb_control_send(ctdb, destnode, priority, 
2371                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2372                            mem_ctx, &timeout, NULL);
2373 }
2374
2375 /* 
2376    async freeze recv control
2377 */
2378 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2384         if ( (ret != 0) || (res != 0) ){
2385                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2386                 return -1;
2387         }
2388
2389         return 0;
2390 }
2391
2392 /*
2393   freeze databases of a certain priority
2394  */
2395 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2396 {
2397         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2398         struct ctdb_client_control_state *state;
2399         int ret;
2400
2401         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2402         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2403         talloc_free(tmp_ctx);
2404
2405         return ret;
2406 }
2407
2408 /* Freeze all databases */
2409 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2410 {
2411         int i;
2412
2413         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2414                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2415                         return -1;
2416                 }
2417         }
2418         return 0;
2419 }
2420
2421 /*
2422   thaw databases of a certain priority
2423  */
2424 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2425 {
2426         int ret;
2427         int32_t res;
2428
2429         ret = ctdb_control(ctdb, destnode, priority, 
2430                            CTDB_CONTROL_THAW, 0, tdb_null, 
2431                            NULL, NULL, &res, &timeout, NULL);
2432         if (ret != 0 || res != 0) {
2433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2434                 return -1;
2435         }
2436
2437         return 0;
2438 }
2439
2440 /* thaw all databases */
2441 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2442 {
2443         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2444 }
2445
2446 /*
2447   get pnn of a node, or -1
2448  */
2449 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2450 {
2451         int ret;
2452         int32_t res;
2453
2454         ret = ctdb_control(ctdb, destnode, 0, 
2455                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2456                            NULL, NULL, &res, &timeout, NULL);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2459                 return -1;
2460         }
2461
2462         return res;
2463 }
2464
2465 /*
2466   get the monitoring mode of a remote node
2467  */
2468 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2469 {
2470         int ret;
2471         int32_t res;
2472
2473         ret = ctdb_control(ctdb, destnode, 0, 
2474                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2475                            NULL, NULL, &res, &timeout, NULL);
2476         if (ret != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2478                 return -1;
2479         }
2480
2481         *monmode = res;
2482
2483         return 0;
2484 }
2485
2486
2487 /*
2488  set the monitoring mode of a remote node to active
2489  */
2490 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2491 {
2492         int ret;
2493         
2494
2495         ret = ctdb_control(ctdb, destnode, 0, 
2496                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2497                            NULL, NULL,NULL, &timeout, NULL);
2498         if (ret != 0) {
2499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2500                 return -1;
2501         }
2502
2503         
2504
2505         return 0;
2506 }
2507
2508 /*
2509   set the monitoring mode of a remote node to disable
2510  */
2511 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2512 {
2513         int ret;
2514         
2515
2516         ret = ctdb_control(ctdb, destnode, 0, 
2517                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2518                            NULL, NULL, NULL, &timeout, NULL);
2519         if (ret != 0) {
2520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2521                 return -1;
2522         }
2523
2524         
2525
2526         return 0;
2527 }
2528
2529
2530
2531 /* 
2532   sent to a node to make it take over an ip address
2533 */
2534 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2535                           uint32_t destnode, struct ctdb_public_ip *ip)
2536 {
2537         TDB_DATA data;
2538         struct ctdb_public_ipv4 ipv4;
2539         int ret;
2540         int32_t res;
2541
2542         if (ip->addr.sa.sa_family == AF_INET) {
2543                 ipv4.pnn = ip->pnn;
2544                 ipv4.sin = ip->addr.ip;
2545
2546                 data.dsize = sizeof(ipv4);
2547                 data.dptr  = (uint8_t *)&ipv4;
2548
2549                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2550                            NULL, &res, &timeout, NULL);
2551         } else {
2552                 data.dsize = sizeof(*ip);
2553                 data.dptr  = (uint8_t *)ip;
2554
2555                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2556                            NULL, &res, &timeout, NULL);
2557         }
2558
2559         if (ret != 0 || res != 0) {
2560                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2561                 return -1;
2562         }
2563
2564         return 0;       
2565 }
2566
2567
2568 /* 
2569   sent to a node to make it release an ip address
2570 */
2571 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2572                          uint32_t destnode, struct ctdb_public_ip *ip)
2573 {
2574         TDB_DATA data;
2575         struct ctdb_public_ipv4 ipv4;
2576         int ret;
2577         int32_t res;
2578
2579         if (ip->addr.sa.sa_family == AF_INET) {
2580                 ipv4.pnn = ip->pnn;
2581                 ipv4.sin = ip->addr.ip;
2582
2583                 data.dsize = sizeof(ipv4);
2584                 data.dptr  = (uint8_t *)&ipv4;
2585
2586                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2587                                    NULL, &res, &timeout, NULL);
2588         } else {
2589                 data.dsize = sizeof(*ip);
2590                 data.dptr  = (uint8_t *)ip;
2591
2592                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2593                                    NULL, &res, &timeout, NULL);
2594         }
2595
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;       
2602 }
2603
2604
2605 /*
2606   get a tunable
2607  */
2608 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2609                           struct timeval timeout, 
2610                           uint32_t destnode,
2611                           const char *name, uint32_t *value)
2612 {
2613         struct ctdb_control_get_tunable *t;
2614         TDB_DATA data, outdata;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2619         data.dptr  = talloc_size(ctdb, data.dsize);
2620         CTDB_NO_MEMORY(ctdb, data.dptr);
2621
2622         t = (struct ctdb_control_get_tunable *)data.dptr;
2623         t->length = strlen(name)+1;
2624         memcpy(t->name, name, t->length);
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         talloc_free(data.dptr);
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2631                 return ret != 0 ? ret : res;
2632         }
2633
2634         if (outdata.dsize != sizeof(uint32_t)) {
2635                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2636                 talloc_free(outdata.dptr);
2637                 return -1;
2638         }
2639         
2640         *value = *(uint32_t *)outdata.dptr;
2641         talloc_free(outdata.dptr);
2642
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2650                           struct timeval timeout, 
2651                           uint32_t destnode,
2652                           const char *name, uint32_t value)
2653 {
2654         struct ctdb_control_set_tunable *t;
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2660         data.dptr  = talloc_size(ctdb, data.dsize);
2661         CTDB_NO_MEMORY(ctdb, data.dptr);
2662
2663         t = (struct ctdb_control_set_tunable *)data.dptr;
2664         t->length = strlen(name)+1;
2665         memcpy(t->name, name, t->length);
2666         t->value = value;
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2669                            NULL, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2673                 return -1;
2674         }
2675
2676         return 0;
2677 }
2678
2679 /*
2680   list tunables
2681  */
2682 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2683                             struct timeval timeout, 
2684                             uint32_t destnode,
2685                             TALLOC_CTX *mem_ctx,
2686                             const char ***list, uint32_t *count)
2687 {
2688         TDB_DATA outdata;
2689         int32_t res;
2690         int ret;
2691         struct ctdb_control_list_tunable *t;
2692         char *p, *s, *ptr;
2693
2694         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2698                 return -1;
2699         }
2700
2701         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2702         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2703             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2704                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2705                 talloc_free(outdata.dptr);
2706                 return -1;              
2707         }
2708         
2709         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2710         CTDB_NO_MEMORY(ctdb, p);
2711
2712         talloc_free(outdata.dptr);
2713         
2714         (*list) = NULL;
2715         (*count) = 0;
2716
2717         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2718                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2719                 CTDB_NO_MEMORY(ctdb, *list);
2720                 (*list)[*count] = talloc_strdup(*list, s);
2721                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2722                 (*count)++;
2723         }
2724
2725         talloc_free(p);
2726
2727         return 0;
2728 }
2729
2730
2731 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2732                                    struct timeval timeout, uint32_t destnode,
2733                                    TALLOC_CTX *mem_ctx,
2734                                    uint32_t flags,
2735                                    struct ctdb_all_public_ips **ips)
2736 {
2737         int ret;
2738         TDB_DATA outdata;
2739         int32_t res;
2740
2741         ret = ctdb_control(ctdb, destnode, 0, 
2742                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2743                            mem_ctx, &outdata, &res, &timeout, NULL);
2744         if (ret == 0 && res == -1) {
2745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2746                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2747         }
2748         if (ret != 0 || res != 0) {
2749           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2750                 return -1;
2751         }
2752
2753         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2754         talloc_free(outdata.dptr);
2755                     
2756         return 0;
2757 }
2758
2759 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2760                              struct timeval timeout, uint32_t destnode,
2761                              TALLOC_CTX *mem_ctx,
2762                              struct ctdb_all_public_ips **ips)
2763 {
2764         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2765                                               destnode, mem_ctx,
2766                                               0, ips);
2767 }
2768
2769 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2770                         struct timeval timeout, uint32_t destnode, 
2771                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2772 {
2773         int ret, i, len;
2774         TDB_DATA outdata;
2775         int32_t res;
2776         struct ctdb_all_public_ipsv4 *ipsv4;
2777
2778         ret = ctdb_control(ctdb, destnode, 0, 
2779                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2780                            mem_ctx, &outdata, &res, &timeout, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2783                 return -1;
2784         }
2785
2786         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2787         len = offsetof(struct ctdb_all_public_ips, ips) +
2788                 ipsv4->num*sizeof(struct ctdb_public_ip);
2789         *ips = talloc_zero_size(mem_ctx, len);
2790         CTDB_NO_MEMORY(ctdb, *ips);
2791         (*ips)->num = ipsv4->num;
2792         for (i=0; i<ipsv4->num; i++) {
2793                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2794                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2795         }
2796
2797         talloc_free(outdata.dptr);
2798                     
2799         return 0;
2800 }
2801
2802 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2803                                  struct timeval timeout, uint32_t destnode,
2804                                  TALLOC_CTX *mem_ctx,
2805                                  const ctdb_sock_addr *addr,
2806                                  struct ctdb_control_public_ip_info **_info)
2807 {
2808         int ret;
2809         TDB_DATA indata;
2810         TDB_DATA outdata;
2811         int32_t res;
2812         struct ctdb_control_public_ip_info *info;
2813         uint32_t len;
2814         uint32_t i;
2815
2816         indata.dptr = discard_const_p(uint8_t, addr);
2817         indata.dsize = sizeof(*addr);
2818
2819         ret = ctdb_control(ctdb, destnode, 0,
2820                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2821                            mem_ctx, &outdata, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2824                                 "failed ret:%d res:%d\n",
2825                                 ret, res));
2826                 return -1;
2827         }
2828
2829         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2830         if (len > outdata.dsize) {
2831                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2832                                 "returned invalid data with size %u > %u\n",
2833                                 (unsigned int)outdata.dsize,
2834                                 (unsigned int)len));
2835                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2836                 return -1;
2837         }
2838
2839         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2840         len += info->num*sizeof(struct ctdb_control_iface_info);
2841
2842         if (len > outdata.dsize) {
2843                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2844                                 "returned invalid data with size %u > %u\n",
2845                                 (unsigned int)outdata.dsize,
2846                                 (unsigned int)len));
2847                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2848                 return -1;
2849         }
2850
2851         /* make sure we null terminate the returned strings */
2852         for (i=0; i < info->num; i++) {
2853                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2854         }
2855
2856         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2857                                                                 outdata.dptr,
2858                                                                 outdata.dsize);
2859         talloc_free(outdata.dptr);
2860         if (*_info == NULL) {
2861                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2862                                 "talloc_memdup size %u failed\n",
2863                                 (unsigned int)outdata.dsize));
2864                 return -1;
2865         }
2866
2867         return 0;
2868 }
2869
2870 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2871                          struct timeval timeout, uint32_t destnode,
2872                          TALLOC_CTX *mem_ctx,
2873                          struct ctdb_control_get_ifaces **_ifaces)
2874 {
2875         int ret;
2876         TDB_DATA outdata;
2877         int32_t res;
2878         struct ctdb_control_get_ifaces *ifaces;
2879         uint32_t len;
2880         uint32_t i;
2881
2882         ret = ctdb_control(ctdb, destnode, 0,
2883                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2884                            mem_ctx, &outdata, &res, &timeout, NULL);
2885         if (ret != 0 || res != 0) {
2886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2887                                 "failed ret:%d res:%d\n",
2888                                 ret, res));
2889                 return -1;
2890         }
2891
2892         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2893         if (len > outdata.dsize) {
2894                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2895                                 "returned invalid data with size %u > %u\n",
2896                                 (unsigned int)outdata.dsize,
2897                                 (unsigned int)len));
2898                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2899                 return -1;
2900         }
2901
2902         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2903         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2904
2905         if (len > outdata.dsize) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2907                                 "returned invalid data with size %u > %u\n",
2908                                 (unsigned int)outdata.dsize,
2909                                 (unsigned int)len));
2910                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2911                 return -1;
2912         }
2913
2914         /* make sure we null terminate the returned strings */
2915         for (i=0; i < ifaces->num; i++) {
2916                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2917         }
2918
2919         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2920                                                                   outdata.dptr,
2921                                                                   outdata.dsize);
2922         talloc_free(outdata.dptr);
2923         if (*_ifaces == NULL) {
2924                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2925                                 "talloc_memdup size %u failed\n",
2926                                 (unsigned int)outdata.dsize));
2927                 return -1;
2928         }
2929
2930         return 0;
2931 }
2932
2933 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2934                              struct timeval timeout, uint32_t destnode,
2935                              TALLOC_CTX *mem_ctx,
2936                              const struct ctdb_control_iface_info *info)
2937 {
2938         int ret;
2939         TDB_DATA indata;
2940         int32_t res;
2941
2942         indata.dptr = discard_const_p(uint8_t, info);
2943         indata.dsize = sizeof(*info);
2944
2945         ret = ctdb_control(ctdb, destnode, 0,
2946                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2947                            mem_ctx, NULL, &res, &timeout, NULL);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2950                                 "failed ret:%d res:%d\n",
2951                                 ret, res));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   set/clear the permanent disabled bit on a remote node
2960  */
2961 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2962                        uint32_t set, uint32_t clear)
2963 {
2964         int ret;
2965         TDB_DATA data;
2966         struct ctdb_node_map *nodemap=NULL;
2967         struct ctdb_node_flag_change c;
2968         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2969         uint32_t recmaster;
2970         uint32_t *nodes;
2971
2972
2973         /* find the recovery master */
2974         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2977                 talloc_free(tmp_ctx);
2978                 return ret;
2979         }
2980
2981
2982         /* read the node flags from the recmaster */
2983         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2984         if (ret != 0) {
2985                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2986                 talloc_free(tmp_ctx);
2987                 return -1;
2988         }
2989         if (destnode >= nodemap->num) {
2990                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2991                 talloc_free(tmp_ctx);
2992                 return -1;
2993         }
2994
2995         c.pnn       = destnode;
2996         c.old_flags = nodemap->nodes[destnode].flags;
2997         c.new_flags = c.old_flags;
2998         c.new_flags |= set;
2999         c.new_flags &= ~clear;
3000
3001         data.dsize = sizeof(c);
3002         data.dptr = (unsigned char *)&c;
3003
3004         /* send the flags update to all connected nodes */
3005         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3006
3007         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3008                                         nodes, 0,
3009                                         timeout, false, data,
3010                                         NULL, NULL,
3011                                         NULL) != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3013
3014                 talloc_free(tmp_ctx);
3015                 return -1;
3016         }
3017
3018         talloc_free(tmp_ctx);
3019         return 0;
3020 }
3021
3022
3023 /*
3024   get all tunables
3025  */
3026 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3027                                struct timeval timeout, 
3028                                uint32_t destnode,
3029                                struct ctdb_tunable *tunables)
3030 {
3031         TDB_DATA outdata;
3032         int ret;
3033         int32_t res;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3036                            &outdata, &res, &timeout, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3039                 return -1;
3040         }
3041
3042         if (outdata.dsize != sizeof(*tunables)) {
3043                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3044                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3045                 return -1;              
3046         }
3047
3048         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3049         talloc_free(outdata.dptr);
3050         return 0;
3051 }
3052
3053 /*
3054   add a public address to a node
3055  */
3056 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3057                       struct timeval timeout, 
3058                       uint32_t destnode,
3059                       struct ctdb_control_ip_iface *pub)
3060 {
3061         TDB_DATA data;
3062         int32_t res;
3063         int ret;
3064
3065         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3066         data.dptr  = (unsigned char *)pub;
3067
3068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3069                            NULL, &res, &timeout, NULL);
3070         if (ret != 0 || res != 0) {
3071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3072                 return -1;
3073         }
3074
3075         return 0;
3076 }
3077
3078 /*
3079   delete a public address from a node
3080  */
3081 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3082                       struct timeval timeout, 
3083                       uint32_t destnode,
3084                       struct ctdb_control_ip_iface *pub)
3085 {
3086         TDB_DATA data;
3087         int32_t res;
3088         int ret;
3089
3090         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3091         data.dptr  = (unsigned char *)pub;
3092
3093         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3094                            NULL, &res, &timeout, NULL);
3095         if (ret != 0 || res != 0) {
3096                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3097                 return -1;
3098         }
3099
3100         return 0;
3101 }
3102
3103 /*
3104   kill a tcp connection
3105  */
3106 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3107                       struct timeval timeout, 
3108                       uint32_t destnode,
3109                       struct ctdb_control_killtcp *killtcp)
3110 {
3111         TDB_DATA data;
3112         int32_t res;
3113         int ret;
3114
3115         data.dsize = sizeof(struct ctdb_control_killtcp);
3116         data.dptr  = (unsigned char *)killtcp;
3117
3118         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3119                            NULL, &res, &timeout, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128 /*
3129   send a gratious arp
3130  */
3131 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3132                       struct timeval timeout, 
3133                       uint32_t destnode,
3134                       ctdb_sock_addr *addr,
3135                       const char *ifname)
3136 {
3137         TDB_DATA data;
3138         int32_t res;
3139         int ret, len;
3140         struct ctdb_control_gratious_arp *gratious_arp;
3141         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3142
3143
3144         len = strlen(ifname)+1;
3145         gratious_arp = talloc_size(tmp_ctx, 
3146                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3147         CTDB_NO_MEMORY(ctdb, gratious_arp);
3148
3149         gratious_arp->addr = *addr;
3150         gratious_arp->len = len;
3151         memcpy(&gratious_arp->iface[0], ifname, len);
3152
3153
3154         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3155         data.dptr  = (unsigned char *)gratious_arp;
3156
3157         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3158                            NULL, &res, &timeout, NULL);
3159         if (ret != 0 || res != 0) {
3160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3161                 talloc_free(tmp_ctx);
3162                 return -1;
3163         }
3164
3165         talloc_free(tmp_ctx);
3166         return 0;
3167 }
3168
3169 /*
3170   get a list of all tcp tickles that a node knows about for a particular vnn
3171  */
3172 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3173                               struct timeval timeout, uint32_t destnode, 
3174                               TALLOC_CTX *mem_ctx, 
3175                               ctdb_sock_addr *addr,
3176                               struct ctdb_control_tcp_tickle_list **list)
3177 {
3178         int ret;
3179         TDB_DATA data, outdata;
3180         int32_t status;
3181
3182         data.dptr = (uint8_t*)addr;
3183         data.dsize = sizeof(ctdb_sock_addr);
3184
3185         ret = ctdb_control(ctdb, destnode, 0, 
3186                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3187                            mem_ctx, &outdata, &status, NULL, NULL);
3188         if (ret != 0 || status != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3190                 return -1;
3191         }
3192
3193         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3194
3195         return status;
3196 }
3197
3198 /*
3199   register a server id
3200  */
3201 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3202                       struct timeval timeout, 
3203                       struct ctdb_server_id *id)
3204 {
3205         TDB_DATA data;
3206         int32_t res;
3207         int ret;
3208
3209         data.dsize = sizeof(struct ctdb_server_id);
3210         data.dptr  = (unsigned char *)id;
3211
3212         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3213                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3214                         0, data, NULL,
3215                         NULL, &res, &timeout, NULL);
3216         if (ret != 0 || res != 0) {
3217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3218                 return -1;
3219         }
3220
3221         return 0;
3222 }
3223
3224 /*
3225   unregister a server id
3226  */
3227 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3228                       struct timeval timeout, 
3229                       struct ctdb_server_id *id)
3230 {
3231         TDB_DATA data;
3232         int32_t res;
3233         int ret;
3234
3235         data.dsize = sizeof(struct ctdb_server_id);
3236         data.dptr  = (unsigned char *)id;
3237
3238         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3239                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3240                         0, data, NULL,
3241                         NULL, &res, &timeout, NULL);
3242         if (ret != 0 || res != 0) {
3243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3244                 return -1;
3245         }
3246
3247         return 0;
3248 }
3249
3250
3251 /*
3252   check if a server id exists
3253
3254   if a server id does exist, return *status == 1, otherwise *status == 0
3255  */
3256 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3257                       struct timeval timeout, 
3258                       uint32_t destnode,
3259                       struct ctdb_server_id *id,
3260                       uint32_t *status)
3261 {
3262         TDB_DATA data;
3263         int32_t res;
3264         int ret;
3265
3266         data.dsize = sizeof(struct ctdb_server_id);
3267         data.dptr  = (unsigned char *)id;
3268
3269         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3270                         0, data, NULL,
3271                         NULL, &res, &timeout, NULL);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3274                 return -1;
3275         }
3276
3277         if (res) {
3278                 *status = 1;
3279         } else {
3280                 *status = 0;
3281         }
3282
3283         return 0;
3284 }
3285
3286 /*
3287    get the list of server ids that are registered on a node
3288 */
3289 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3290                 TALLOC_CTX *mem_ctx,
3291                 struct timeval timeout, uint32_t destnode, 
3292                 struct ctdb_server_id_list **svid_list)
3293 {
3294         int ret;
3295         TDB_DATA outdata;
3296         int32_t res;
3297
3298         ret = ctdb_control(ctdb, destnode, 0, 
3299                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3300                            mem_ctx, &outdata, &res, &timeout, NULL);
3301         if (ret != 0 || res != 0) {
3302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3303                 return -1;
3304         }
3305
3306         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3307                     
3308         return 0;
3309 }
3310
3311 /*
3312   initialise the ctdb daemon for client applications
3313
3314   NOTE: In current code the daemon does not fork. This is for testing purposes only
3315   and to simplify the code.
3316 */
3317 struct ctdb_context *ctdb_init(struct event_context *ev)
3318 {
3319         int ret;
3320         struct ctdb_context *ctdb;
3321
3322         ctdb = talloc_zero(ev, struct ctdb_context);
3323         if (ctdb == NULL) {
3324                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3325                 return NULL;
3326         }
3327         ctdb->ev  = ev;
3328         ctdb->idr = idr_init(ctdb);
3329         /* Wrap early to exercise code. */
3330         ctdb->lastid = INT_MAX-200;
3331         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3332
3333         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3336                 talloc_free(ctdb);
3337                 return NULL;
3338         }
3339
3340         ctdb->statistics.statistics_start_time = timeval_current();
3341
3342         return ctdb;
3343 }
3344
3345
3346 /*
3347   set some ctdb flags
3348 */
3349 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3350 {
3351         ctdb->flags |= flags;
3352 }
3353
3354 /*
3355   setup the local socket name
3356 */
3357 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3358 {
3359         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3360         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3361
3362         return 0;
3363 }
3364
3365 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3366 {
3367         return ctdb->daemon.name;
3368 }
3369
3370 /*
3371   return the pnn of this node
3372 */
3373 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3374 {
3375         return ctdb->pnn;
3376 }
3377
3378
3379 /*
3380   get the uptime of a remote node
3381  */
3382 struct ctdb_client_control_state *
3383 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3384 {
3385         return ctdb_control_send(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3387                            mem_ctx, &timeout, NULL);
3388 }
3389
3390 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3391 {
3392         int ret;
3393         int32_t res;
3394         TDB_DATA outdata;
3395
3396         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3397         if (ret != 0 || res != 0) {
3398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3399                 return -1;
3400         }
3401
3402         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3403
3404         return 0;
3405 }
3406
3407 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3408 {
3409         struct ctdb_client_control_state *state;
3410
3411         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3412         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3413 }
3414
3415 /*
3416   send a control to execute the "recovered" event script on a node
3417  */
3418 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3419 {
3420         int ret;
3421         int32_t status;
3422
3423         ret = ctdb_control(ctdb, destnode, 0, 
3424                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3425                            NULL, NULL, &status, &timeout, NULL);
3426         if (ret != 0 || status != 0) {
3427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3428                 return -1;
3429         }
3430
3431         return 0;
3432 }
3433
3434 /* 
3435   callback for the async helpers used when sending the same control
3436   to multiple nodes in parallell.
3437 */
3438 static void async_callback(struct ctdb_client_control_state *state)
3439 {
3440         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3441         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res = -1;
3445         uint32_t destnode = state->c->hdr.destnode;
3446
3447         /* one more node has responded with recmode data */
3448         data->count--;
3449
3450         /* if we failed to push the db, then return an error and let
3451            the main loop try again.
3452         */
3453         if (state->state != CTDB_CONTROL_DONE) {
3454                 if ( !data->dont_log_errors) {
3455                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3456                 }
3457                 data->fail_count++;
3458                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3459                         res = -ETIME;
3460                 } else {
3461                         res = -1;
3462                 }
3463                 if (data->fail_callback) {
3464                         data->fail_callback(ctdb, destnode, res, outdata,
3465                                         data->callback_data);
3466                 }
3467                 return;
3468         }
3469         
3470         state->async.fn = NULL;
3471
3472         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3473         if ((ret != 0) || (res != 0)) {
3474                 if ( !data->dont_log_errors) {
3475                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3476                 }
3477                 data->fail_count++;
3478                 if (data->fail_callback) {
3479                         data->fail_callback(ctdb, destnode, res, outdata,
3480                                         data->callback_data);
3481                 }
3482         }
3483         if ((ret == 0) && (data->callback != NULL)) {
3484                 data->callback(ctdb, destnode, res, outdata,
3485                                         data->callback_data);
3486         }
3487 }
3488
3489
3490 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3491 {
3492         /* set up the callback functions */
3493         state->async.fn = async_callback;
3494         state->async.private_data = data;
3495         
3496         /* one more control to wait for to complete */
3497         data->count++;
3498 }
3499
3500
3501 /* wait for up to the maximum number of seconds allowed
3502    or until all nodes we expect a response from has replied
3503 */
3504 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3505 {
3506         while (data->count > 0) {
3507                 event_loop_once(ctdb->ev);
3508         }
3509         if (data->fail_count != 0) {
3510                 if (!data->dont_log_errors) {
3511                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3512                                  data->fail_count));
3513                 }
3514                 return -1;
3515         }
3516         return 0;
3517 }
3518
3519
3520 /* 
3521    perform a simple control on the listed nodes
3522    The control cannot return data
3523  */
3524 int ctdb_client_async_control(struct ctdb_context *ctdb,
3525                                 enum ctdb_controls opcode,
3526                                 uint32_t *nodes,
3527                                 uint64_t srvid,
3528                                 struct timeval timeout,
3529                                 bool dont_log_errors,
3530                                 TDB_DATA data,
3531                                 client_async_callback client_callback,
3532                                 client_async_callback fail_callback,
3533                                 void *callback_data)
3534 {
3535         struct client_async_data *async_data;
3536         struct ctdb_client_control_state *state;
3537         int j, num_nodes;
3538
3539         async_data = talloc_zero(ctdb, struct client_async_data);
3540         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3541         async_data->dont_log_errors = dont_log_errors;
3542         async_data->callback = client_callback;
3543         async_data->fail_callback = fail_callback;
3544         async_data->callback_data = callback_data;
3545         async_data->opcode        = opcode;
3546
3547         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3548
3549         /* loop over all nodes and send an async control to each of them */
3550         for (j=0; j<num_nodes; j++) {
3551                 uint32_t pnn = nodes[j];
3552
3553                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3554                                           0, data, async_data, &timeout, NULL);
3555                 if (state == NULL) {
3556                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3557                         talloc_free(async_data);
3558                         return -1;
3559                 }
3560                 
3561                 ctdb_client_async_add(async_data, state);
3562         }
3563
3564         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3565                 talloc_free(async_data);
3566                 return -1;
3567         }
3568
3569         talloc_free(async_data);
3570         return 0;
3571 }
3572
3573 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3574                                 struct ctdb_vnn_map *vnn_map,
3575                                 TALLOC_CTX *mem_ctx,
3576                                 bool include_self)
3577 {
3578         int i, j, num_nodes;
3579         uint32_t *nodes;
3580
3581         for (i=num_nodes=0;i<vnn_map->size;i++) {
3582                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3583                         continue;
3584                 }
3585                 num_nodes++;
3586         } 
3587
3588         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3589         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3590
3591         for (i=j=0;i<vnn_map->size;i++) {
3592                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3593                         continue;
3594                 }
3595                 nodes[j++] = vnn_map->map[i];
3596         } 
3597
3598         return nodes;
3599 }
3600
3601 /* Get list of nodes not including those with flags specified by mask.
3602  * If exclude_pnn is not -1 then exclude that pnn from the list.
3603  */
3604 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3605                         struct ctdb_node_map *node_map,
3606                         TALLOC_CTX *mem_ctx,
3607                         uint32_t mask,
3608                         int exclude_pnn)
3609 {
3610         int i, j, num_nodes;
3611         uint32_t *nodes;
3612
3613         for (i=num_nodes=0;i<node_map->num;i++) {
3614                 if (node_map->nodes[i].flags & mask) {
3615                         continue;
3616                 }
3617                 if (node_map->nodes[i].pnn == exclude_pnn) {
3618                         continue;
3619                 }
3620                 num_nodes++;
3621         } 
3622
3623         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3624         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3625
3626         for (i=j=0;i<node_map->num;i++) {
3627                 if (node_map->nodes[i].flags & mask) {
3628                         continue;
3629                 }
3630                 if (node_map->nodes[i].pnn == exclude_pnn) {
3631                         continue;
3632                 }
3633                 nodes[j++] = node_map->nodes[i].pnn;
3634         } 
3635
3636         return nodes;
3637 }
3638
3639 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3640                                 struct ctdb_node_map *node_map,
3641                                 TALLOC_CTX *mem_ctx,
3642                                 bool include_self)
3643 {
3644         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3645                              include_self ? -1 : ctdb->pnn);
3646 }
3647
3648 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3649                                 struct ctdb_node_map *node_map,
3650                                 TALLOC_CTX *mem_ctx,
3651                                 bool include_self)
3652 {
3653         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3654                              include_self ? -1 : ctdb->pnn);
3655 }
3656
3657 /* 
3658   this is used to test if a pnn lock exists and if it exists will return
3659   the number of connections that pnn has reported or -1 if that recovery
3660   daemon is not running.
3661 */
3662 int
3663 ctdb_read_pnn_lock(int fd, int32_t pnn)
3664 {
3665         struct flock lock;
3666         char c;
3667
3668         lock.l_type = F_WRLCK;
3669         lock.l_whence = SEEK_SET;
3670         lock.l_start = pnn;
3671         lock.l_len = 1;
3672         lock.l_pid = 0;
3673
3674         if (fcntl(fd, F_GETLK, &lock) != 0) {
3675                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3676                 return -1;
3677         }
3678
3679         if (lock.l_type == F_UNLCK) {
3680                 return -1;
3681         }
3682
3683         if (pread(fd, &c, 1, pnn) == -1) {
3684                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3685                 return -1;
3686         }
3687
3688         return c;
3689 }
3690
3691 /*
3692   get capabilities of a remote node
3693  */
3694 struct ctdb_client_control_state *
3695 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3696 {
3697         return ctdb_control_send(ctdb, destnode, 0, 
3698                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3699                            mem_ctx, &timeout, NULL);
3700 }
3701
3702 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3703 {
3704         int ret;
3705         int32_t res;
3706         TDB_DATA outdata;
3707
3708         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3709         if ( (ret != 0) || (res != 0) ) {
3710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3711                 return -1;
3712         }
3713
3714         if (capabilities) {
3715                 *capabilities = *((uint32_t *)outdata.dptr);
3716         }
3717
3718         return 0;
3719 }
3720
3721 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3722 {
3723         struct ctdb_client_control_state *state;
3724         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3725         int ret;
3726
3727         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3728         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3729         talloc_free(tmp_ctx);
3730         return ret;
3731 }
3732
3733 struct server_id {
3734         uint64_t pid;
3735         uint32_t task_id;
3736         uint32_t vnn;
3737         uint64_t unique_id;
3738 };
3739
3740 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3741 {
3742         struct server_id id;
3743
3744         id.pid = getpid();
3745         id.task_id = reqid;
3746         id.vnn = ctdb_get_pnn(ctdb);
3747         id.unique_id = id.vnn;
3748         id.unique_id = (id.unique_id << 32) | reqid;
3749
3750         return id;
3751 }
3752
3753 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3754 {
3755         if (id1->pid != id2->pid) {
3756                 return false;
3757         }
3758
3759         if (id1->task_id != id2->task_id) {
3760                 return false;
3761         }
3762
3763         if (id1->vnn != id2->vnn) {
3764                 return false;
3765         }
3766
3767         if (id1->unique_id != id2->unique_id) {
3768                 return false;
3769         }
3770
3771         return true;
3772 }
3773
3774 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3775 {
3776         struct ctdb_server_id sid;
3777         int ret;
3778         uint32_t result;
3779
3780         sid.type = SERVER_TYPE_SAMBA;
3781         sid.pnn = id->vnn;
3782         sid.server_id = id->pid;
3783
3784         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3785                                         id->vnn, &sid, &result);
3786         if (ret != 0) {
3787                 /* If control times out, assume server_id exists. */
3788                 return true;
3789         }
3790
3791         if (result) {
3792                 return true;
3793         }
3794
3795         return false;
3796 }
3797
3798
3799 enum g_lock_type {
3800         G_LOCK_READ = 0,
3801         G_LOCK_WRITE = 1,
3802 };
3803
3804 struct g_lock_rec {
3805         enum g_lock_type type;
3806         struct server_id id;
3807 };
3808
3809 struct g_lock_recs {
3810         unsigned int num;
3811         struct g_lock_rec *lock;
3812 };
3813
3814 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3815                          struct g_lock_recs **locks)
3816 {
3817         struct g_lock_recs *recs;
3818
3819         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3820         if (recs == NULL) {
3821                 return false;
3822         }
3823
3824         if (data.dsize == 0) {
3825                 goto done;
3826         }
3827
3828         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3829                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3830                                   data.dsize));
3831                 talloc_free(recs);
3832                 return false;
3833         }
3834
3835         recs->num = data.dsize / sizeof(struct g_lock_rec);
3836         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3837         if (recs->lock == NULL) {
3838                 talloc_free(recs);
3839                 return false;
3840         }
3841
3842 done:
3843         if (locks != NULL) {
3844                 *locks = recs;
3845         }
3846
3847         return true;
3848 }
3849
3850
3851 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3852                         struct ctdb_db_context *ctdb_db,
3853                         const char *keyname, uint32_t reqid)
3854 {
3855         TDB_DATA key, data;
3856         struct ctdb_record_handle *h;
3857         struct g_lock_recs *locks;
3858         struct server_id id;
3859         int i;
3860
3861         key.dptr = (uint8_t *)discard_const(keyname);
3862         key.dsize = strlen(keyname) + 1;
3863         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3864         if (h == NULL) {
3865                 return false;
3866         }
3867
3868         if (!g_lock_parse(h, data, &locks)) {
3869                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3870                 talloc_free(data.dptr);
3871                 talloc_free(h);
3872                 return false;
3873         }
3874
3875         talloc_free(data.dptr);
3876
3877         id = server_id_get(ctdb_db->ctdb, reqid);
3878
3879         i = 0;
3880         while (i < locks->num) {
3881                 if (server_id_equal(&locks->lock[i].id, &id)) {
3882                         /* Internal error */
3883                         talloc_free(h);
3884                         return false;
3885                 }
3886
3887                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3888                         if (i < locks->num-1) {
3889                                 locks->lock[i] = locks->lock[locks->num-1];
3890                         }
3891                         locks->num--;
3892                         continue;
3893                 }
3894
3895                 /* This entry is locked. */
3896                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3897                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3898                                    (unsigned long long)id.pid,
3899                                    id.task_id, id.vnn,
3900                                    (unsigned long long)id.unique_id));
3901                 talloc_free(h);
3902                 return false;
3903         }
3904
3905         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3906                                      locks->num+1);
3907         if (locks->lock == NULL) {
3908                 talloc_free(h);
3909                 return false;
3910         }
3911
3912         locks->lock[locks->num].type = G_LOCK_WRITE;
3913         locks->lock[locks->num].id = id;
3914         locks->num++;
3915
3916         data.dptr = (uint8_t *)locks->lock;
3917         data.dsize = locks->num * sizeof(struct g_lock_rec);
3918
3919         if (ctdb_record_store(h, data) != 0) {
3920                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3921                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3922                                   (unsigned long long)id.pid,
3923                                   id.task_id, id.vnn,
3924                                   (unsigned long long)id.unique_id));
3925                 talloc_free(h);
3926                 return false;
3927         }
3928
3929         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3930                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3931                            (unsigned long long)id.pid,
3932                            id.task_id, id.vnn,
3933                            (unsigned long long)id.unique_id));
3934
3935         talloc_free(h);
3936         return true;
3937 }
3938
3939 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3940                           struct ctdb_db_context *ctdb_db,
3941                           const char *keyname, uint32_t reqid)
3942 {
3943         TDB_DATA key, data;
3944         struct ctdb_record_handle *h;
3945         struct g_lock_recs *locks;
3946         struct server_id id;
3947         int i;
3948         bool found = false;
3949
3950         key.dptr = (uint8_t *)discard_const(keyname);
3951         key.dsize = strlen(keyname) + 1;
3952         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3953         if (h == NULL) {
3954                 return false;
3955         }
3956
3957         if (!g_lock_parse(h, data, &locks)) {
3958                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3959                 talloc_free(data.dptr);
3960                 talloc_free(h);
3961                 return false;
3962         }
3963
3964         talloc_free(data.dptr);
3965
3966         id = server_id_get(ctdb_db->ctdb, reqid);
3967
3968         for (i=0; i<locks->num; i++) {
3969                 if (server_id_equal(&locks->lock[i].id, &id)) {
3970                         if (i < locks->num-1) {
3971                                 locks->lock[i] = locks->lock[locks->num-1];
3972                         }
3973                         locks->num--;
3974                         found = true;
3975                         break;
3976                 }
3977         }
3978
3979         if (!found) {
3980                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3981                 talloc_free(h);
3982                 return false;
3983         }
3984
3985         data.dptr = (uint8_t *)locks->lock;
3986         data.dsize = locks->num * sizeof(struct g_lock_rec);
3987
3988         if (ctdb_record_store(h, data) != 0) {
3989                 talloc_free(h);
3990                 return false;
3991         }
3992
3993         talloc_free(h);
3994         return true;
3995 }
3996
3997
3998 struct ctdb_transaction_handle {
3999         struct ctdb_db_context *ctdb_db;
4000         struct ctdb_db_context *g_lock_db;
4001         char *lock_name;
4002         uint32_t reqid;
4003         /*
4004          * we store reads and writes done under a transaction:
4005          * - one list stores both reads and writes (m_all)
4006          * - the other just writes (m_write)
4007          */
4008         struct ctdb_marshall_buffer *m_all;
4009         struct ctdb_marshall_buffer *m_write;
4010 };
4011
4012 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4013 {
4014         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4015         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4016         return 0;
4017 }
4018
4019
4020 /**
4021  * start a transaction on a database
4022  */
4023 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4024                                                        TALLOC_CTX *mem_ctx)
4025 {
4026         struct ctdb_transaction_handle *h;
4027         struct ctdb_server_id id;
4028
4029         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4030         if (h == NULL) {
4031                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4032                 return NULL;
4033         }
4034
4035         h->ctdb_db = ctdb_db;
4036         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4037                                        (unsigned int)ctdb_db->db_id);
4038         if (h->lock_name == NULL) {
4039                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4040                 talloc_free(h);
4041                 return NULL;
4042         }
4043
4044         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4045                                    "g_lock.tdb", false, 0);
4046         if (!h->g_lock_db) {
4047                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4048                 talloc_free(h);
4049                 return NULL;
4050         }
4051
4052         id.type = SERVER_TYPE_SAMBA;
4053         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4054         id.server_id = getpid();
4055
4056         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4057                                          &id) != 0) {
4058                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4059                 talloc_free(h);
4060                 return NULL;
4061         }
4062
4063         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4064
4065         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4066                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4067                 talloc_free(h);
4068                 return NULL;
4069         }
4070
4071         talloc_set_destructor(h, ctdb_transaction_destructor);
4072         return h;
4073 }
4074
4075 /**
4076  * fetch a record inside a transaction
4077  */
4078 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4079                            TALLOC_CTX *mem_ctx,
4080                            TDB_DATA key, TDB_DATA *data)
4081 {
4082         struct ctdb_ltdb_header header;
4083         int ret;
4084
4085         ZERO_STRUCT(header);
4086
4087         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4088         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4089                 /* record doesn't exist yet */
4090                 *data = tdb_null;
4091                 ret = 0;
4092         }
4093
4094         if (ret != 0) {
4095                 return ret;
4096         }
4097
4098         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4099         if (h->m_all == NULL) {
4100                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4101                 return -1;
4102         }
4103
4104         return 0;
4105 }
4106
4107 /**
4108  * stores a record inside a transaction
4109  */
4110 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4111                            TDB_DATA key, TDB_DATA data)
4112 {
4113         TALLOC_CTX *tmp_ctx = talloc_new(h);
4114         struct ctdb_ltdb_header header;
4115         TDB_DATA olddata;
4116         int ret;
4117
4118         /* we need the header so we can update the RSN */
4119         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4120         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4121                 /* the record doesn't exist - create one with us as dmaster.
4122                    This is only safe because we are in a transaction and this
4123                    is a persistent database */
4124                 ZERO_STRUCT(header);
4125         } else if (ret != 0) {
4126                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4127                 talloc_free(tmp_ctx);
4128                 return ret;
4129         }
4130
4131         if (data.dsize == olddata.dsize &&
4132             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4133             header.rsn != 0) {
4134                 /* save writing the same data */
4135                 talloc_free(tmp_ctx);
4136                 return 0;
4137         }
4138
4139         header.dmaster = h->ctdb_db->ctdb->pnn;
4140         header.rsn++;
4141
4142         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4143         if (h->m_all == NULL) {
4144                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4145                 talloc_free(tmp_ctx);
4146                 return -1;
4147         }
4148
4149         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4150         if (h->m_write == NULL) {
4151                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4152                 talloc_free(tmp_ctx);
4153                 return -1;
4154         }
4155
4156         talloc_free(tmp_ctx);
4157         return 0;
4158 }
4159
4160 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4161 {
4162         const char *keyname = CTDB_DB_SEQNUM_KEY;
4163         TDB_DATA key, data;
4164         struct ctdb_ltdb_header header;
4165         int ret;
4166
4167         key.dptr = (uint8_t *)discard_const(keyname);
4168         key.dsize = strlen(keyname) + 1;
4169
4170         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4171         if (ret != 0) {
4172                 *seqnum = 0;
4173                 return 0;
4174         }
4175
4176         if (data.dsize != sizeof(*seqnum)) {
4177                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4178                                   data.dsize));
4179                 talloc_free(data.dptr);
4180                 return -1;
4181         }
4182
4183         *seqnum = *(uint64_t *)data.dptr;
4184         talloc_free(data.dptr);
4185
4186         return 0;
4187 }
4188
4189
4190 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4191                                 uint64_t seqnum)
4192 {
4193         const char *keyname = CTDB_DB_SEQNUM_KEY;
4194         TDB_DATA key, data;
4195
4196         key.dptr = (uint8_t *)discard_const(keyname);
4197         key.dsize = strlen(keyname) + 1;
4198
4199         data.dptr = (uint8_t *)&seqnum;
4200         data.dsize = sizeof(seqnum);
4201
4202         return ctdb_transaction_store(h, key, data);
4203 }
4204
4205
4206 /**
4207  * commit a transaction
4208  */
4209 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4210 {
4211         int ret;
4212         uint64_t old_seqnum, new_seqnum;
4213         int32_t status;
4214         struct timeval timeout;
4215
4216         if (h->m_write == NULL) {
4217                 /* no changes were made */
4218                 talloc_free(h);
4219                 return 0;
4220         }
4221
4222         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4223         if (ret != 0) {
4224                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4225                 ret = -1;
4226                 goto done;
4227         }
4228
4229         new_seqnum = old_seqnum + 1;
4230         ret = ctdb_store_db_seqnum(h, new_seqnum);
4231         if (ret != 0) {
4232                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4233                 ret = -1;
4234                 goto done;
4235         }
4236
4237 again:
4238         timeout = timeval_current_ofs(3,0);
4239         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4240                            h->ctdb_db->db_id,
4241                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4242                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4243                            &status, &timeout, NULL);
4244         if (ret != 0 || status != 0) {
4245                 /*
4246                  * TRANS3_COMMIT control will only fail if recovery has been
4247                  * triggered.  Check if the database has been updated or not.
4248                  */
4249                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4250                 if (ret != 0) {
4251                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4252                         goto done;
4253                 }
4254
4255                 if (new_seqnum == old_seqnum) {
4256                         /* Database not yet updated, try again */
4257                         goto again;
4258                 }
4259
4260                 if (new_seqnum != (old_seqnum + 1)) {
4261                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4262                                           (long long unsigned)new_seqnum,
4263                                           (long long unsigned)old_seqnum));
4264                         ret = -1;
4265                         goto done;
4266                 }
4267         }
4268
4269         ret = 0;
4270
4271 done:
4272         talloc_free(h);
4273         return ret;
4274 }
4275
4276 /**
4277  * cancel a transaction
4278  */
4279 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4280 {
4281         talloc_free(h);
4282         return 0;
4283 }
4284
4285 #if 0
4286 /**
4287  * check whether a transaction is active on a given db on a given node
4288  */
4289 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
4290                                      uint32_t destnode,
4291                                      uint32_t db_id)
4292 {
4293         int32_t status;
4294         int ret;
4295         TDB_DATA indata;
4296
4297         indata.dptr = (uint8_t *)&db_id;
4298         indata.dsize = sizeof(db_id);
4299
4300         ret = ctdb_control(ctdb, destnode, 0,
4301                            CTDB_CONTROL_TRANS2_ACTIVE,
4302                            0, indata, NULL, NULL, &status,
4303                            NULL, NULL);
4304
4305         if (ret != 0) {
4306                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
4307                 return -1;
4308         }
4309
4310         return status;
4311 }
4312
4313
4314 struct ctdb_transaction_handle {
4315         struct ctdb_db_context *ctdb_db;
4316         bool in_replay;
4317         /*
4318          * we store the reads and writes done under a transaction:
4319          * - one list stores both reads and writes (m_all),
4320          * - the other just writes (m_write)
4321          */
4322         struct ctdb_marshall_buffer *m_all;
4323         struct ctdb_marshall_buffer *m_write;
4324 };
4325
4326 /* start a transaction on a database */
4327 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4328 {
4329         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4330         return 0;
4331 }
4332
4333 /* start a transaction on a database */
4334 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
4335 {
4336         struct ctdb_record_handle *rh;
4337         TDB_DATA key;
4338         TDB_DATA data;
4339         struct ctdb_ltdb_header header;
4340         TALLOC_CTX *tmp_ctx;
4341         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
4342         int ret;
4343         struct ctdb_db_context *ctdb_db = h->ctdb_db;
4344         pid_t pid;
4345         int32_t status;
4346
4347         key.dptr = discard_const(keyname);
4348         key.dsize = strlen(keyname);
4349
4350         if (!ctdb_db->persistent) {
4351                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
4352                 return -1;
4353         }
4354
4355 again:
4356         tmp_ctx = talloc_new(h);
4357
4358         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
4359         if (rh == NULL) {
4360                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
4361                 talloc_free(tmp_ctx);
4362                 return -1;
4363         }
4364
4365         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
4366                                               CTDB_CURRENT_NODE,
4367                                               ctdb_db->db_id);
4368         if (status == 1) {
4369                 unsigned long int usec = (1000 + random()) % 100000;
4370                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
4371                                     "on db_id[0x%08x]. waiting for %lu "
4372                                     "microseconds\n",
4373                                     ctdb_db->db_id, usec));
4374                 talloc_free(tmp_ctx);
4375                 usleep(usec);
4376                 goto again;
4377         }
4378
4379         /*
4380          * store the pid in the database:
4381          * it is not enough that the node is dmaster...
4382          */
4383         pid = getpid();
4384         data.dptr = (unsigned char *)&pid;
4385         data.dsize = sizeof(pid_t);
4386         rh->header.rsn++;
4387         rh->header.dmaster = ctdb_db->ctdb->pnn;
4388         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
4389         if (ret != 0) {
4390                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
4391                                   "transaction record\n"));
4392                 talloc_free(tmp_ctx);
4393                 return -1;
4394         }
4395
4396         talloc_free(rh);
4397
4398         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
4399         if (ret != 0) {
4400                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
4401                 talloc_free(tmp_ctx);
4402                 return -1;
4403         }
4404
4405         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
4406         if (ret != 0) {
4407                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
4408                                  "lock record inside transaction\n"));
4409                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4410                 talloc_free(tmp_ctx);
4411                 goto again;
4412         }
4413
4414         if (header.dmaster != ctdb_db->ctdb->pnn) {
4415                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
4416                                    "transaction lock record\n"));
4417                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4418                 talloc_free(tmp_ctx);
4419                 goto again;
4420         }
4421
4422         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
4423                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
4424                                     "the transaction lock record\n"));
4425                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4426                 talloc_free(tmp_ctx);
4427                 goto again;
4428         }
4429
4430         talloc_free(tmp_ctx);
4431
4432         return 0;
4433 }
4434
4435
4436 /* start a transaction on a database */
4437 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4438                                                        TALLOC_CTX *mem_ctx)
4439 {
4440         struct ctdb_transaction_handle *h;
4441         int ret;
4442
4443         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4444         if (h == NULL) {
4445                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
4446                 return NULL;
4447         }
4448
4449         h->ctdb_db = ctdb_db;
4450
4451         ret = ctdb_transaction_fetch_start(h);
4452         if (ret != 0) {
4453                 talloc_free(h);
4454                 return NULL;
4455         }
4456
4457         talloc_set_destructor(h, ctdb_transaction_destructor);
4458
4459         return h;
4460 }
4461
4462
4463
4464 /*
4465   fetch a record inside a transaction
4466  */
4467 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
4468                            TALLOC_CTX *mem_ctx, 
4469                            TDB_DATA key, TDB_DATA *data)
4470 {
4471         struct ctdb_ltdb_header header;
4472         int ret;
4473
4474         ZERO_STRUCT(header);
4475
4476         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4477         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4478                 /* record doesn't exist yet */
4479                 *data = tdb_null;
4480                 ret = 0;
4481         }
4482         
4483         if (ret != 0) {
4484                 return ret;
4485         }
4486
4487         if (!h->in_replay) {
4488                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4489                 if (h->m_all == NULL) {
4490                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4491                         return -1;
4492                 }
4493         }
4494
4495         return 0;
4496 }
4497
4498 /*
4499   stores a record inside a transaction
4500  */
4501 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
4502                            TDB_DATA key, TDB_DATA data)
4503 {
4504         TALLOC_CTX *tmp_ctx = talloc_new(h);
4505         struct ctdb_ltdb_header header;
4506         TDB_DATA olddata;
4507         int ret;
4508
4509         ZERO_STRUCT(header);
4510
4511         /* we need the header so we can update the RSN */
4512         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4513         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4514                 /* the record doesn't exist - create one with us as dmaster.
4515                    This is only safe because we are in a transaction and this
4516                    is a persistent database */
4517                 ZERO_STRUCT(header);
4518         } else if (ret != 0) {
4519                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4520                 talloc_free(tmp_ctx);
4521                 return ret;
4522         }
4523
4524         if (data.dsize == olddata.dsize &&
4525             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
4526                 /* save writing the same data */
4527                 talloc_free(tmp_ctx);
4528                 return 0;
4529         }
4530
4531         header.dmaster = h->ctdb_db->ctdb->pnn;
4532         header.rsn++;
4533
4534         if (!h->in_replay) {
4535                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4536                 if (h->m_all == NULL) {
4537                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4538                         talloc_free(tmp_ctx);
4539                         return -1;
4540                 }
4541         }               
4542
4543         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4544         if (h->m_write == NULL) {
4545                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4546                 talloc_free(tmp_ctx);
4547                 return -1;
4548         }
4549         
4550         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
4551
4552         talloc_free(tmp_ctx);
4553         
4554         return ret;
4555 }
4556
4557 /*
4558   replay a transaction
4559  */
4560 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
4561 {
4562         int ret, i;
4563         struct ctdb_rec_data *rec = NULL;
4564
4565         h->in_replay = true;
4566         talloc_free(h->m_write);
4567         h->m_write = NULL;
4568
4569         ret = ctdb_transaction_fetch_start(h);
4570         if (ret != 0) {
4571                 return ret;
4572         }
4573
4574         for (i=0;i<h->m_all->count;i++) {
4575                 TDB_DATA key, data;
4576
4577                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
4578                 if (rec == NULL) {
4579                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
4580                         goto failed;
4581                 }
4582
4583                 if (rec->reqid == 0) {
4584                         /* its a store */
4585                         if (ctdb_transaction_store(h, key, data) != 0) {
4586                                 goto failed;
4587                         }
4588                 } else {
4589                         TDB_DATA data2;
4590                         TALLOC_CTX *tmp_ctx = talloc_new(h);
4591
4592                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
4593                                 talloc_free(tmp_ctx);
4594                                 goto failed;
4595                         }
4596                         if (data2.dsize != data.dsize ||
4597                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
4598                                 /* the record has changed on us - we have to give up */
4599                                 talloc_free(tmp_ctx);
4600                                 goto failed;
4601                         }
4602                         talloc_free(tmp_ctx);
4603                 }
4604         }
4605         
4606         return 0;
4607
4608 failed:
4609         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4610         return -1;
4611 }
4612
4613
4614 /*
4615   commit a transaction
4616  */
4617 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4618 {
4619         int ret, retries=0;
4620         int32_t status;
4621         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
4622         struct timeval timeout;
4623         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
4624
4625         talloc_set_destructor(h, NULL);
4626
4627         /* our commit strategy is quite complex.
4628
4629            - we first try to commit the changes to all other nodes
4630
4631            - if that works, then we commit locally and we are done
4632
4633            - if a commit on another node fails, then we need to cancel
4634              the transaction, then restart the transaction (thus
4635              opening a window of time for a pending recovery to
4636              complete), then replay the transaction, checking all the
4637              reads and writes (checking that reads give the same data,
4638              and writes succeed). Then we retry the transaction to the
4639              other nodes
4640         */
4641
4642 again:
4643         if (h->m_write == NULL) {
4644                 /* no changes were made */
4645                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4646                 talloc_free(h);
4647                 return 0;
4648         }
4649
4650         /* tell ctdbd to commit to the other nodes */
4651         timeout = timeval_current_ofs(1, 0);
4652         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4653                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4654                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4655                            &timeout, NULL);
4656         if (ret != 0 || status != 0) {
4657                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4658                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4659                                      ", retrying after 1 second...\n",
4660                                      (retries==0)?"":"retry "));
4661                 sleep(1);
4662
4663                 if (ret != 0) {
4664                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4665                 } else {
4666                         /* work out what error code we will give if we 
4667                            have to fail the operation */
4668                         switch ((enum ctdb_trans2_commit_error)status) {
4669                         case CTDB_TRANS2_COMMIT_SUCCESS:
4670                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4671                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4672                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4673                                 break;
4674                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4675                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4676                                 break;
4677                         }
4678                 }
4679
4680                 if (++retries == 100) {
4681                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4682                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4683                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4684                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4685                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4686                         talloc_free(h);
4687                         return -1;
4688                 }               
4689
4690                 if (ctdb_replay_transaction(h) != 0) {
4691                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4692                                           "transaction on db 0x%08x, "
4693                                           "failure control =%u\n",
4694                                           h->ctdb_db->db_id,
4695                                           (unsigned)failure_control));
4696                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4697                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4698                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4699                         talloc_free(h);
4700                         return -1;
4701                 }
4702                 goto again;
4703         } else {
4704                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4705         }
4706
4707         /* do the real commit locally */
4708         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4709         if (ret != 0) {
4710                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4711                                   "on db id 0x%08x locally, "
4712                                   "failure_control=%u\n",
4713                                   h->ctdb_db->db_id,
4714                                   (unsigned)failure_control));
4715                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4716                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4717                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4718                 talloc_free(h);
4719                 return ret;
4720         }
4721
4722         /* tell ctdbd that we are finished with our local commit */
4723         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4724                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4725                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4726         talloc_free(h);
4727         return 0;
4728 }
4729 #endif
4730
4731 /*
4732   recovery daemon ping to main daemon
4733  */
4734 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4735 {
4736         int ret;
4737         int32_t res;
4738
4739         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4740                            ctdb, NULL, &res, NULL, NULL);
4741         if (ret != 0 || res != 0) {
4742                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4743                 return -1;
4744         }
4745
4746         return 0;
4747 }
4748
4749 /* When forking the main daemon and the child process needs to connect
4750  * back to the daemon as a client process, this function can be used
4751  * to change the ctdb context from daemon into client mode.  The child
4752  * process must be created using ctdb_fork() and not fork() -
4753  * ctdb_fork() does some necessary housekeeping.
4754  */
4755 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4756 {
4757         int ret;
4758         va_list ap;
4759
4760         /* Add extra information so we can identify this in the logs */
4761         va_start(ap, fmt);
4762         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4763         va_end(ap);
4764
4765         /* get a new event context */
4766         ctdb->ev = event_context_init(ctdb);
4767         tevent_loop_allow_nesting(ctdb->ev);
4768
4769         /* Connect to main CTDB daemon */
4770         ret = ctdb_socket_connect(ctdb);
4771         if (ret != 0) {
4772                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4773                 return -1;
4774         }
4775
4776         ctdb->can_send_controls = true;
4777
4778         return 0;
4779 }
4780
4781 /*
4782   get the status of running the monitor eventscripts: NULL means never run.
4783  */
4784 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4785                 struct timeval timeout, uint32_t destnode, 
4786                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4787                 struct ctdb_scripts_wire **scripts)
4788 {
4789         int ret;
4790         TDB_DATA outdata, indata;
4791         int32_t res;
4792         uint32_t uinttype = type;
4793
4794         indata.dptr = (uint8_t *)&uinttype;
4795         indata.dsize = sizeof(uinttype);
4796
4797         ret = ctdb_control(ctdb, destnode, 0, 
4798                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4799                            mem_ctx, &outdata, &res, &timeout, NULL);
4800         if (ret != 0 || res != 0) {
4801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4802                 return -1;
4803         }
4804
4805         if (outdata.dsize == 0) {
4806                 *scripts = NULL;
4807         } else {
4808                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4809                 talloc_free(outdata.dptr);
4810         }
4811                     
4812         return 0;
4813 }
4814
4815 /*
4816   tell the main daemon how long it took to lock the reclock file
4817  */
4818 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4819 {
4820         int ret;
4821         int32_t res;
4822         TDB_DATA data;
4823
4824         data.dptr = (uint8_t *)&latency;
4825         data.dsize = sizeof(latency);
4826
4827         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4828                            ctdb, NULL, &res, NULL, NULL);
4829         if (ret != 0 || res != 0) {
4830                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4831                 return -1;
4832         }
4833
4834         return 0;
4835 }
4836
4837 /*
4838   get the name of the reclock file
4839  */
4840 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4841                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4842                          const char **name)
4843 {
4844         int ret;
4845         int32_t res;
4846         TDB_DATA data;
4847
4848         ret = ctdb_control(ctdb, destnode, 0, 
4849                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4850                            mem_ctx, &data, &res, &timeout, NULL);
4851         if (ret != 0 || res != 0) {
4852                 return -1;
4853         }
4854
4855         if (data.dsize == 0) {
4856                 *name = NULL;
4857         } else {
4858                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4859         }
4860         talloc_free(data.dptr);
4861
4862         return 0;
4863 }
4864
4865 /*
4866   set the reclock filename for a node
4867  */
4868 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4869 {
4870         int ret;
4871         TDB_DATA data;
4872         int32_t res;
4873
4874         if (reclock == NULL) {
4875                 data.dsize = 0;
4876                 data.dptr  = NULL;
4877         } else {
4878                 data.dsize = strlen(reclock) + 1;
4879                 data.dptr  = discard_const(reclock);
4880         }
4881
4882         ret = ctdb_control(ctdb, destnode, 0, 
4883                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4884                            NULL, NULL, &res, &timeout, NULL);
4885         if (ret != 0 || res != 0) {
4886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4887                 return -1;
4888         }
4889
4890         return 0;
4891 }
4892
4893 /*
4894   stop a node
4895  */
4896 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4897 {
4898         int ret;
4899         int32_t res;
4900
4901         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4902                            ctdb, NULL, &res, &timeout, NULL);
4903         if (ret != 0 || res != 0) {
4904                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4905                 return -1;
4906         }
4907
4908         return 0;
4909 }
4910
4911 /*
4912   continue a node
4913  */
4914 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4915 {
4916         int ret;
4917
4918         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4919                            ctdb, NULL, NULL, &timeout, NULL);
4920         if (ret != 0) {
4921                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4922                 return -1;
4923         }
4924
4925         return 0;
4926 }
4927
4928 /*
4929   set the natgw state for a node
4930  */
4931 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4932 {
4933         int ret;
4934         TDB_DATA data;
4935         int32_t res;
4936
4937         data.dsize = sizeof(natgwstate);
4938         data.dptr  = (uint8_t *)&natgwstate;
4939
4940         ret = ctdb_control(ctdb, destnode, 0, 
4941                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4942                            NULL, NULL, &res, &timeout, NULL);
4943         if (ret != 0 || res != 0) {
4944                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4945                 return -1;
4946         }
4947
4948         return 0;
4949 }
4950
4951 /*
4952   set the lmaster role for a node
4953  */
4954 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4955 {
4956         int ret;
4957         TDB_DATA data;
4958         int32_t res;
4959
4960         data.dsize = sizeof(lmasterrole);
4961         data.dptr  = (uint8_t *)&lmasterrole;
4962
4963         ret = ctdb_control(ctdb, destnode, 0, 
4964                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4965                            NULL, NULL, &res, &timeout, NULL);
4966         if (ret != 0 || res != 0) {
4967                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4968                 return -1;
4969         }
4970
4971         return 0;
4972 }
4973
4974 /*
4975   set the recmaster role for a node
4976  */
4977 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4978 {
4979         int ret;
4980         TDB_DATA data;
4981         int32_t res;
4982
4983         data.dsize = sizeof(recmasterrole);
4984         data.dptr  = (uint8_t *)&recmasterrole;
4985
4986         ret = ctdb_control(ctdb, destnode, 0, 
4987                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4988                            NULL, NULL, &res, &timeout, NULL);
4989         if (ret != 0 || res != 0) {
4990                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4991                 return -1;
4992         }
4993
4994         return 0;
4995 }
4996
4997 /* enable an eventscript
4998  */
4999 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
5000 {
5001         int ret;
5002         TDB_DATA data;
5003         int32_t res;
5004
5005         data.dsize = strlen(script) + 1;
5006         data.dptr  = discard_const(script);
5007
5008         ret = ctdb_control(ctdb, destnode, 0, 
5009                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
5010                            NULL, NULL, &res, &timeout, NULL);
5011         if (ret != 0 || res != 0) {
5012                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
5013                 return -1;
5014         }
5015
5016         return 0;
5017 }
5018
5019 /* disable an eventscript
5020  */
5021 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
5022 {
5023         int ret;
5024         TDB_DATA data;
5025         int32_t res;
5026
5027         data.dsize = strlen(script) + 1;
5028         data.dptr  = discard_const(script);
5029
5030         ret = ctdb_control(ctdb, destnode, 0, 
5031                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
5032                            NULL, NULL, &res, &timeout, NULL);
5033         if (ret != 0 || res != 0) {
5034                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
5035                 return -1;
5036         }
5037
5038         return 0;
5039 }
5040
5041
5042 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
5043 {
5044         int ret;
5045         TDB_DATA data;
5046         int32_t res;
5047
5048         data.dsize = sizeof(*bantime);
5049         data.dptr  = (uint8_t *)bantime;
5050
5051         ret = ctdb_control(ctdb, destnode, 0, 
5052                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
5053                            NULL, NULL, &res, &timeout, NULL);
5054         if (ret != 0 || res != 0) {
5055                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
5056                 return -1;
5057         }
5058
5059         return 0;
5060 }
5061
5062
5063 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
5064 {
5065         int ret;
5066         TDB_DATA outdata;
5067         int32_t res;
5068         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
5069
5070         ret = ctdb_control(ctdb, destnode, 0, 
5071                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
5072                            tmp_ctx, &outdata, &res, &timeout, NULL);
5073         if (ret != 0 || res != 0) {
5074                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
5075                 talloc_free(tmp_ctx);
5076                 return -1;
5077         }
5078
5079         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
5080         talloc_free(tmp_ctx);
5081
5082         return 0;
5083 }
5084
5085
5086 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
5087 {
5088         int ret;
5089         int32_t res;
5090         TDB_DATA data;
5091         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
5092
5093         data.dptr = (uint8_t*)db_prio;
5094         data.dsize = sizeof(*db_prio);
5095
5096         ret = ctdb_control(ctdb, destnode, 0, 
5097                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
5098                            tmp_ctx, NULL, &res, &timeout, NULL);
5099         if (ret != 0 || res != 0) {
5100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
5101                 talloc_free(tmp_ctx);
5102                 return -1;
5103         }
5104
5105         talloc_free(tmp_ctx);
5106
5107         return 0;
5108 }
5109
5110 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
5111 {
5112         int ret;
5113         int32_t res;
5114         TDB_DATA data;
5115         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
5116
5117         data.dptr = (uint8_t*)&db_id;
5118         data.dsize = sizeof(db_id);
5119
5120         ret = ctdb_control(ctdb, destnode, 0, 
5121                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
5122                            tmp_ctx, NULL, &res, &timeout, NULL);
5123         if (ret != 0 || res < 0) {
5124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
5125                 talloc_free(tmp_ctx);
5126                 return -1;
5127         }
5128
5129         if (priority) {
5130                 *priority = res;
5131         }
5132
5133         talloc_free(tmp_ctx);
5134
5135         return 0;
5136 }
5137
5138 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
5139 {
5140         int ret;
5141         TDB_DATA outdata;
5142         int32_t res;
5143
5144         ret = ctdb_control(ctdb, destnode, 0, 
5145                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
5146                            mem_ctx, &outdata, &res, &timeout, NULL);
5147         if (ret != 0 || res != 0 || outdata.dsize == 0) {
5148                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
5149                 return -1;
5150         }
5151
5152         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
5153         talloc_free(outdata.dptr);
5154                     
5155         return 0;
5156 }
5157
5158 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
5159 {
5160         if (h == NULL) {
5161                 return NULL;
5162         }
5163
5164         return &h->header;
5165 }
5166
5167
5168 struct ctdb_client_control_state *
5169 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
5170 {
5171         struct ctdb_client_control_state *handle;
5172         struct ctdb_marshall_buffer *m;
5173         struct ctdb_rec_data *rec;
5174         TDB_DATA outdata;
5175
5176         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
5177         if (m == NULL) {
5178                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
5179                 return NULL;
5180         }
5181
5182         m->db_id = ctdb_db->db_id;
5183
5184         rec = ctdb_marshall_record(m, 0, key, header, data);
5185         if (rec == NULL) {
5186                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
5187                 talloc_free(m);
5188                 return NULL;
5189         }
5190         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
5191         if (m == NULL) {
5192                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
5193                 talloc_free(m);
5194                 return NULL;
5195         }
5196         m->count++;
5197         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
5198
5199
5200         outdata.dptr = (uint8_t *)m;
5201         outdata.dsize = talloc_get_size(m);
5202
5203         handle = ctdb_control_send(ctdb, destnode, 0, 
5204                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
5205                            mem_ctx, &timeout, NULL);
5206         talloc_free(m);
5207         return handle;
5208 }
5209
5210 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
5211 {
5212         int ret;
5213         int32_t res;
5214
5215         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
5216         if ( (ret != 0) || (res != 0) ){
5217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
5218                 return -1;
5219         }
5220
5221         return 0;
5222 }
5223
5224 int
5225 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
5226 {
5227         struct ctdb_client_control_state *state;
5228
5229         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
5230         return ctdb_ctrl_updaterecord_recv(ctdb, state);
5231 }
5232
5233
5234
5235
5236
5237
5238 /*
5239   set a database to be readonly
5240  */
5241 struct ctdb_client_control_state *
5242 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
5243 {
5244         TDB_DATA data;
5245
5246         data.dptr = (uint8_t *)&dbid;
5247         data.dsize = sizeof(dbid);
5248
5249         return ctdb_control_send(ctdb, destnode, 0, 
5250                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
5251                            ctdb, NULL, NULL);
5252 }
5253
5254 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
5255 {
5256         int ret;
5257         int32_t res;
5258
5259         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
5260         if (ret != 0 || res != 0) {
5261           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
5262                 return -1;
5263         }
5264
5265         return 0;
5266 }
5267
5268 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
5269 {
5270         struct ctdb_client_control_state *state;
5271
5272         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
5273         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
5274 }
5275
5276 /*
5277   set a database to be sticky
5278  */
5279 struct ctdb_client_control_state *
5280 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
5281 {
5282         TDB_DATA data;
5283
5284         data.dptr = (uint8_t *)&dbid;
5285         data.dsize = sizeof(dbid);
5286
5287         return ctdb_control_send(ctdb, destnode, 0, 
5288                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
5289                            ctdb, NULL, NULL);
5290 }
5291
5292 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
5293 {
5294         int ret;
5295         int32_t res;
5296
5297         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
5298         if (ret != 0 || res != 0) {
5299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
5300                 return -1;
5301         }
5302
5303         return 0;
5304 }
5305
5306 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
5307 {
5308         struct ctdb_client_control_state *state;
5309
5310         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
5311         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
5312 }