client: Add ctdb_ctrl_getdbseqnum() function
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   num*sizeof(uint8_t), outdata.dsize));
538                 talloc_free(outdata.dptr);
539                 return -1;
540         }
541
542         for (i=0; i<num; i++) {
543                 result[i] = outdata.dptr[i];
544         }
545
546         talloc_free(outdata.dptr);
547         return 0;
548 }
549
550 /*
551   send a message - from client context
552  */
553 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
554                       uint64_t srvid, TDB_DATA data)
555 {
556         struct ctdb_req_message *r;
557         int len, res;
558
559         len = offsetof(struct ctdb_req_message, data) + data.dsize;
560         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
561                                len, struct ctdb_req_message);
562         CTDB_NO_MEMORY(ctdb, r);
563
564         r->hdr.destnode  = pnn;
565         r->srvid         = srvid;
566         r->datalen       = data.dsize;
567         memcpy(&r->data[0], data.dptr, data.dsize);
568         
569         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
570         talloc_free(r);
571         return res;
572 }
573
574
575 /*
576   cancel a ctdb_fetch_lock operation, releasing the lock
577  */
578 static int fetch_lock_destructor(struct ctdb_record_handle *h)
579 {
580         ctdb_ltdb_unlock(h->ctdb_db, h->key);
581         return 0;
582 }
583
584 /*
585   force the migration of a record to this node
586  */
587 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
588 {
589         struct ctdb_call call;
590         ZERO_STRUCT(call);
591         call.call_id = CTDB_NULL_FUNC;
592         call.key = key;
593         call.flags = CTDB_IMMEDIATE_MIGRATION;
594         return ctdb_call(ctdb_db, &call);
595 }
596
597 /*
598   try to fetch a readonly copy of a record
599  */
600 static int
601 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
602 {
603         int ret;
604
605         struct ctdb_call call;
606         ZERO_STRUCT(call);
607
608         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
609         call.call_data.dptr = NULL;
610         call.call_data.dsize = 0;
611         call.key = key;
612         call.flags = CTDB_WANT_READONLY;
613         ret = ctdb_call(ctdb_db, &call);
614
615         if (ret != 0) {
616                 return -1;
617         }
618         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
619                 return -1;
620         }
621
622         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
623         if (*hdr == NULL) {
624                 talloc_free(call.reply_data.dptr);
625                 return -1;
626         }
627
628         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
629         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
630         if (data->dptr == NULL) {
631                 talloc_free(call.reply_data.dptr);
632                 talloc_free(hdr);
633                 return -1;
634         }
635
636         return 0;
637 }
638
639 /*
640   get a lock on a record, and return the records data. Blocks until it gets the lock
641  */
642 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
643                                            TDB_DATA key, TDB_DATA *data)
644 {
645         int ret;
646         struct ctdb_record_handle *h;
647
648         /*
649           procedure is as follows:
650
651           1) get the chain lock. 
652           2) check if we are dmaster
653           3) if we are the dmaster then return handle 
654           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
655              reply from ctdbd
656           5) when we get the reply, goto (1)
657          */
658
659         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
660         if (h == NULL) {
661                 return NULL;
662         }
663
664         h->ctdb_db = ctdb_db;
665         h->key     = key;
666         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
667         if (h->key.dptr == NULL) {
668                 talloc_free(h);
669                 return NULL;
670         }
671         h->data    = data;
672
673         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
674                  (const char *)key.dptr));
675
676 again:
677         /* step 1 - get the chain lock */
678         ret = ctdb_ltdb_lock(ctdb_db, key);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
681                 talloc_free(h);
682                 return NULL;
683         }
684
685         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
686
687         talloc_set_destructor(h, fetch_lock_destructor);
688
689         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
690
691         /* when torturing, ensure we test the remote path */
692         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
693             random() % 5 == 0) {
694                 h->header.dmaster = (uint32_t)-1;
695         }
696
697
698         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
699
700         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
701                 ctdb_ltdb_unlock(ctdb_db, key);
702                 ret = ctdb_client_force_migration(ctdb_db, key);
703                 if (ret != 0) {
704                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
705                         talloc_free(h);
706                         return NULL;
707                 }
708                 goto again;
709         }
710
711         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
712         return h;
713 }
714
715 /*
716   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
717  */
718 struct ctdb_record_handle *
719 ctdb_fetch_readonly_lock(
720         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
721         TDB_DATA key, TDB_DATA *data,
722         int read_only)
723 {
724         int ret;
725         struct ctdb_record_handle *h;
726         struct ctdb_ltdb_header *roheader = NULL;
727
728         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
729         if (h == NULL) {
730                 return NULL;
731         }
732
733         h->ctdb_db = ctdb_db;
734         h->key     = key;
735         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
736         if (h->key.dptr == NULL) {
737                 talloc_free(h);
738                 return NULL;
739         }
740         h->data    = data;
741
742         data->dptr = NULL;
743         data->dsize = 0;
744
745
746 again:
747         talloc_free(roheader);
748         roheader = NULL;
749
750         talloc_free(data->dptr);
751         data->dptr = NULL;
752         data->dsize = 0;
753
754         /* Lock the record/chain */
755         ret = ctdb_ltdb_lock(ctdb_db, key);
756         if (ret != 0) {
757                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
758                 talloc_free(h);
759                 return NULL;
760         }
761
762         talloc_set_destructor(h, fetch_lock_destructor);
763
764         /* Check if record exists yet in the TDB */
765         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
766         if (ret != 0) {
767                 ctdb_ltdb_unlock(ctdb_db, key);
768                 ret = ctdb_client_force_migration(ctdb_db, key);
769                 if (ret != 0) {
770                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
771                         talloc_free(h);
772                         return NULL;
773                 }
774                 goto again;
775         }
776
777         /* if this is a request for read/write and we have delegations
778            we have to revoke all delegations first
779         */
780         if ((read_only == 0) 
781         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
782         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
783                 ctdb_ltdb_unlock(ctdb_db, key);
784                 ret = ctdb_client_force_migration(ctdb_db, key);
785                 if (ret != 0) {
786                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
787                         talloc_free(h);
788                         return NULL;
789                 }
790                 goto again;
791         }
792
793         /* if we are dmaster, just return the handle */
794         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
795                 return h;
796         }
797
798         if (read_only != 0) {
799                 TDB_DATA rodata = {NULL, 0};
800
801                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
802                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
803                         return h;
804                 }
805
806                 ctdb_ltdb_unlock(ctdb_db, key);
807                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
808                 if (ret != 0) {
809                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
810                         ret = ctdb_client_force_migration(ctdb_db, key);
811                         if (ret != 0) {
812                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
813                                 talloc_free(h);
814                                 return NULL;
815                         }
816
817                         goto again;
818                 }
819
820                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
821                         ret = ctdb_client_force_migration(ctdb_db, key);
822                         if (ret != 0) {
823                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
824                                 talloc_free(h);
825                                 return NULL;
826                         }
827
828                         goto again;
829                 }
830
831                 ret = ctdb_ltdb_lock(ctdb_db, key);
832                 if (ret != 0) {
833                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
834                         talloc_free(h);
835                         return NULL;
836                 }
837
838                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
839                 if (ret != 0) {
840                         ctdb_ltdb_unlock(ctdb_db, key);
841
842                         ret = ctdb_client_force_migration(ctdb_db, key);
843                         if (ret != 0) {
844                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
845                                 talloc_free(h);
846                                 return NULL;
847                         }
848
849                         goto again;
850                 }
851
852                 return h;
853         }
854
855         /* we are not dmaster and this was not a request for a readonly lock
856          * so unlock the record, migrate it and try again
857          */
858         ctdb_ltdb_unlock(ctdb_db, key);
859         ret = ctdb_client_force_migration(ctdb_db, key);
860         if (ret != 0) {
861                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
862                 talloc_free(h);
863                 return NULL;
864         }
865         goto again;
866 }
867
868 /*
869   store some data to the record that was locked with ctdb_fetch_lock()
870 */
871 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
872 {
873         if (h->ctdb_db->persistent) {
874                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
875                 return -1;
876         }
877
878         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
879 }
880
881 /*
882   non-locking fetch of a record
883  */
884 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
885                TDB_DATA key, TDB_DATA *data)
886 {
887         struct ctdb_call call;
888         int ret;
889
890         call.call_id = CTDB_FETCH_FUNC;
891         call.call_data.dptr = NULL;
892         call.call_data.dsize = 0;
893         call.key = key;
894
895         ret = ctdb_call(ctdb_db, &call);
896
897         if (ret == 0) {
898                 *data = call.reply_data;
899                 talloc_steal(mem_ctx, data->dptr);
900         }
901
902         return ret;
903 }
904
905
906
907 /*
908    called when a control completes or timesout to invoke the callback
909    function the user provided
910 */
911 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
912         struct timeval t, void *private_data)
913 {
914         struct ctdb_client_control_state *state;
915         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
916         int ret;
917
918         state = talloc_get_type(private_data, struct ctdb_client_control_state);
919         talloc_steal(tmp_ctx, state);
920
921         ret = ctdb_control_recv(state->ctdb, state, state,
922                         NULL, 
923                         NULL, 
924                         NULL);
925         if (ret != 0) {
926                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
927         }
928
929         talloc_free(tmp_ctx);
930 }
931
932 /*
933   called when a CTDB_REPLY_CONTROL packet comes in in the client
934
935   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
936   contains any reply data from the control
937 */
938 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
939                                       struct ctdb_req_header *hdr)
940 {
941         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
942         struct ctdb_client_control_state *state;
943
944         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
945         if (state == NULL) {
946                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
947                 return;
948         }
949
950         if (hdr->reqid != state->reqid) {
951                 /* we found a record  but it was the wrong one */
952                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
953                 return;
954         }
955
956         state->outdata.dptr = c->data;
957         state->outdata.dsize = c->datalen;
958         state->status = c->status;
959         if (c->errorlen) {
960                 state->errormsg = talloc_strndup(state, 
961                                                  (char *)&c->data[c->datalen], 
962                                                  c->errorlen);
963         }
964
965         /* state->outdata now uses resources from c so we dont want c
966            to just dissappear from under us while state is still alive
967         */
968         talloc_steal(state, c);
969
970         state->state = CTDB_CONTROL_DONE;
971
972         /* if we had a callback registered for this control, pull the response
973            and call the callback.
974         */
975         if (state->async.fn) {
976                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
977         }
978 }
979
980
981 /*
982   destroy a ctdb_control in client
983 */
984 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
985 {
986         ctdb_reqid_remove(state->ctdb, state->reqid);
987         return 0;
988 }
989
990
991 /* time out handler for ctdb_control */
992 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
993         struct timeval t, void *private_data)
994 {
995         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
996
997         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
998                          "dstnode:%u\n", state->reqid, state->c->opcode,
999                          state->c->hdr.destnode));
1000
1001         state->state = CTDB_CONTROL_TIMEOUT;
1002
1003         /* if we had a callback registered for this control, pull the response
1004            and call the callback.
1005         */
1006         if (state->async.fn) {
1007                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1008         }
1009 }
1010
1011 /* async version of send control request */
1012 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1013                 uint32_t destnode, uint64_t srvid, 
1014                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1015                 TALLOC_CTX *mem_ctx,
1016                 struct timeval *timeout,
1017                 char **errormsg)
1018 {
1019         struct ctdb_client_control_state *state;
1020         size_t len;
1021         struct ctdb_req_control *c;
1022         int ret;
1023
1024         if (errormsg) {
1025                 *errormsg = NULL;
1026         }
1027
1028         /* if the domain socket is not yet open, open it */
1029         if (ctdb->daemon.sd==-1) {
1030                 ctdb_socket_connect(ctdb);
1031         }
1032
1033         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1034         CTDB_NO_MEMORY_NULL(ctdb, state);
1035
1036         state->ctdb       = ctdb;
1037         state->reqid      = ctdb_reqid_new(ctdb, state);
1038         state->state      = CTDB_CONTROL_WAIT;
1039         state->errormsg   = NULL;
1040
1041         talloc_set_destructor(state, ctdb_client_control_destructor);
1042
1043         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1044         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1045                                len, struct ctdb_req_control);
1046         state->c            = c;        
1047         CTDB_NO_MEMORY_NULL(ctdb, c);
1048         c->hdr.reqid        = state->reqid;
1049         c->hdr.destnode     = destnode;
1050         c->opcode           = opcode;
1051         c->client_id        = 0;
1052         c->flags            = flags;
1053         c->srvid            = srvid;
1054         c->datalen          = data.dsize;
1055         if (data.dsize) {
1056                 memcpy(&c->data[0], data.dptr, data.dsize);
1057         }
1058
1059         /* timeout */
1060         if (timeout && !timeval_is_zero(timeout)) {
1061                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1062         }
1063
1064         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1065         if (ret != 0) {
1066                 talloc_free(state);
1067                 return NULL;
1068         }
1069
1070         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1071                 talloc_free(state);
1072                 return NULL;
1073         }
1074
1075         return state;
1076 }
1077
1078
1079 /* async version of receive control reply */
1080 int ctdb_control_recv(struct ctdb_context *ctdb, 
1081                 struct ctdb_client_control_state *state, 
1082                 TALLOC_CTX *mem_ctx,
1083                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1084 {
1085         TALLOC_CTX *tmp_ctx;
1086
1087         if (status != NULL) {
1088                 *status = -1;
1089         }
1090         if (errormsg != NULL) {
1091                 *errormsg = NULL;
1092         }
1093
1094         if (state == NULL) {
1095                 return -1;
1096         }
1097
1098         /* prevent double free of state */
1099         tmp_ctx = talloc_new(ctdb);
1100         talloc_steal(tmp_ctx, state);
1101
1102         /* loop one event at a time until we either timeout or the control
1103            completes.
1104         */
1105         while (state->state == CTDB_CONTROL_WAIT) {
1106                 event_loop_once(ctdb->ev);
1107         }
1108
1109         if (state->state != CTDB_CONTROL_DONE) {
1110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1111                 if (state->async.fn) {
1112                         state->async.fn(state);
1113                 }
1114                 talloc_free(tmp_ctx);
1115                 return -1;
1116         }
1117
1118         if (state->errormsg) {
1119                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1120                 if (errormsg) {
1121                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1122                 }
1123                 if (state->async.fn) {
1124                         state->async.fn(state);
1125                 }
1126                 talloc_free(tmp_ctx);
1127                 return -1;
1128         }
1129
1130         if (outdata) {
1131                 *outdata = state->outdata;
1132                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1133         }
1134
1135         if (status) {
1136                 *status = state->status;
1137         }
1138
1139         if (state->async.fn) {
1140                 state->async.fn(state);
1141         }
1142
1143         talloc_free(tmp_ctx);
1144         return 0;
1145 }
1146
1147
1148
1149 /*
1150   send a ctdb control message
1151   timeout specifies how long we should wait for a reply.
1152   if timeout is NULL we wait indefinitely
1153  */
1154 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1155                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1156                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1157                  struct timeval *timeout,
1158                  char **errormsg)
1159 {
1160         struct ctdb_client_control_state *state;
1161
1162         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1163                         flags, data, mem_ctx,
1164                         timeout, errormsg);
1165
1166         /* FIXME: Error conditions in ctdb_control_send return NULL without
1167          * setting errormsg.  So, there is no way to distinguish between sucess
1168          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1169         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1170                 if (status != NULL) {
1171                         *status = 0;
1172                 }
1173                 return 0;
1174         }
1175
1176         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1177                         errormsg);
1178 }
1179
1180
1181
1182
1183 /*
1184   a process exists call. Returns 0 if process exists, -1 otherwise
1185  */
1186 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1187 {
1188         int ret;
1189         TDB_DATA data;
1190         int32_t status;
1191
1192         data.dptr = (uint8_t*)&pid;
1193         data.dsize = sizeof(pid);
1194
1195         ret = ctdb_control(ctdb, destnode, 0, 
1196                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1197                            NULL, NULL, &status, NULL, NULL);
1198         if (ret != 0) {
1199                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1200                 return -1;
1201         }
1202
1203         return status;
1204 }
1205
1206 /*
1207   get remote statistics
1208  */
1209 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1210 {
1211         int ret;
1212         TDB_DATA data;
1213         int32_t res;
1214
1215         ret = ctdb_control(ctdb, destnode, 0, 
1216                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1217                            ctdb, &data, &res, NULL, NULL);
1218         if (ret != 0 || res != 0) {
1219                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1220                 return -1;
1221         }
1222
1223         if (data.dsize != sizeof(struct ctdb_statistics)) {
1224                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1225                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1226                       return -1;
1227         }
1228
1229         *status = *(struct ctdb_statistics *)data.dptr;
1230         talloc_free(data.dptr);
1231                         
1232         return 0;
1233 }
1234
1235 /*
1236  * get db statistics
1237  */
1238 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1239                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1240 {
1241         int ret;
1242         TDB_DATA indata, outdata;
1243         int32_t res;
1244         struct ctdb_db_statistics *wire, *s;
1245         char *ptr;
1246         int i;
1247
1248         indata.dptr = (uint8_t *)&dbid;
1249         indata.dsize = sizeof(dbid);
1250
1251         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1252                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1253         if (ret != 0 || res != 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1255                 return -1;
1256         }
1257
1258         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1259                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1260                                  outdata.dsize, sizeof(struct ctdb_statistics)));
1261                 return -1;
1262         }
1263
1264         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1265         if (s == NULL) {
1266                 talloc_free(outdata.dptr);
1267                 CTDB_NO_MEMORY(ctdb, s);
1268         }
1269
1270         wire = (struct ctdb_db_statistics *)outdata.dptr;
1271         *s = *wire;
1272         ptr = &wire->hot_keys_wire[0];
1273         for (i=0; i<wire->num_hot_keys; i++) {
1274                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1275                 if (s->hot_keys[i].key.dptr == NULL) {
1276                         talloc_free(outdata.dptr);
1277                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1278                 }
1279
1280                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1281                 ptr += wire->hot_keys[i].key.dsize;
1282         }
1283
1284         talloc_free(outdata.dptr);
1285         *dbstat = s;
1286         return 0;
1287 }
1288
1289 /*
1290   shutdown a remote ctdb node
1291  */
1292 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1293 {
1294         struct ctdb_client_control_state *state;
1295
1296         state = ctdb_control_send(ctdb, destnode, 0, 
1297                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1298                            NULL, &timeout, NULL);
1299         if (state == NULL) {
1300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1301                 return -1;
1302         }
1303
1304         return 0;
1305 }
1306
1307 /*
1308   get vnn map from a remote node
1309  */
1310 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1311 {
1312         int ret;
1313         TDB_DATA outdata;
1314         int32_t res;
1315         struct ctdb_vnn_map_wire *map;
1316
1317         ret = ctdb_control(ctdb, destnode, 0, 
1318                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1319                            mem_ctx, &outdata, &res, &timeout, NULL);
1320         if (ret != 0 || res != 0) {
1321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1322                 return -1;
1323         }
1324         
1325         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1326         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1327             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1328                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1329                 return -1;
1330         }
1331
1332         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1333         CTDB_NO_MEMORY(ctdb, *vnnmap);
1334         (*vnnmap)->generation = map->generation;
1335         (*vnnmap)->size       = map->size;
1336         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1337
1338         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1339         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1340         talloc_free(outdata.dptr);
1341                     
1342         return 0;
1343 }
1344
1345
1346 /*
1347   get the recovery mode of a remote node
1348  */
1349 struct ctdb_client_control_state *
1350 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1351 {
1352         return ctdb_control_send(ctdb, destnode, 0, 
1353                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1354                            mem_ctx, &timeout, NULL);
1355 }
1356
1357 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1358 {
1359         int ret;
1360         int32_t res;
1361
1362         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1363         if (ret != 0) {
1364                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1365                 return -1;
1366         }
1367
1368         if (recmode) {
1369                 *recmode = (uint32_t)res;
1370         }
1371
1372         return 0;
1373 }
1374
1375 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1376 {
1377         struct ctdb_client_control_state *state;
1378
1379         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1380         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1381 }
1382
1383
1384
1385
1386 /*
1387   set the recovery mode of a remote node
1388  */
1389 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1390 {
1391         int ret;
1392         TDB_DATA data;
1393         int32_t res;
1394
1395         data.dsize = sizeof(uint32_t);
1396         data.dptr = (unsigned char *)&recmode;
1397
1398         ret = ctdb_control(ctdb, destnode, 0, 
1399                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1400                            NULL, NULL, &res, &timeout, NULL);
1401         if (ret != 0 || res != 0) {
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1403                 return -1;
1404         }
1405
1406         return 0;
1407 }
1408
1409
1410
1411 /*
1412   get the recovery master of a remote node
1413  */
1414 struct ctdb_client_control_state *
1415 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1416                         struct timeval timeout, uint32_t destnode)
1417 {
1418         return ctdb_control_send(ctdb, destnode, 0, 
1419                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1420                            mem_ctx, &timeout, NULL);
1421 }
1422
1423 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1424 {
1425         int ret;
1426         int32_t res;
1427
1428         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1431                 return -1;
1432         }
1433
1434         if (recmaster) {
1435                 *recmaster = (uint32_t)res;
1436         }
1437
1438         return 0;
1439 }
1440
1441 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1442 {
1443         struct ctdb_client_control_state *state;
1444
1445         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1446         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1447 }
1448
1449
1450 /*
1451   set the recovery master of a remote node
1452  */
1453 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1454 {
1455         int ret;
1456         TDB_DATA data;
1457         int32_t res;
1458
1459         ZERO_STRUCT(data);
1460         data.dsize = sizeof(uint32_t);
1461         data.dptr = (unsigned char *)&recmaster;
1462
1463         ret = ctdb_control(ctdb, destnode, 0, 
1464                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1465                            NULL, NULL, &res, &timeout, NULL);
1466         if (ret != 0 || res != 0) {
1467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1468                 return -1;
1469         }
1470
1471         return 0;
1472 }
1473
1474
1475 /*
1476   get a list of databases off a remote node
1477  */
1478 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1479                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1480 {
1481         int ret;
1482         TDB_DATA outdata;
1483         int32_t res;
1484
1485         ret = ctdb_control(ctdb, destnode, 0, 
1486                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1487                            mem_ctx, &outdata, &res, &timeout, NULL);
1488         if (ret != 0 || res != 0) {
1489                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1490                 return -1;
1491         }
1492
1493         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1494         talloc_free(outdata.dptr);
1495                     
1496         return 0;
1497 }
1498
1499 /*
1500   get a list of nodes (vnn and flags ) from a remote node
1501  */
1502 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1503                 struct timeval timeout, uint32_t destnode, 
1504                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1505 {
1506         int ret;
1507         TDB_DATA outdata;
1508         int32_t res;
1509
1510         ret = ctdb_control(ctdb, destnode, 0, 
1511                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1512                            mem_ctx, &outdata, &res, &timeout, NULL);
1513         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1515                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1516         }
1517         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1519                 return -1;
1520         }
1521
1522         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1523         talloc_free(outdata.dptr);
1524                     
1525         return 0;
1526 }
1527
1528 /*
1529   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1530  */
1531 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1532                 struct timeval timeout, uint32_t destnode, 
1533                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1534 {
1535         int ret, i, len;
1536         TDB_DATA outdata;
1537         struct ctdb_node_mapv4 *nodemapv4;
1538         int32_t res;
1539
1540         ret = ctdb_control(ctdb, destnode, 0, 
1541                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1542                            mem_ctx, &outdata, &res, &timeout, NULL);
1543         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1545                 return -1;
1546         }
1547
1548         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1549
1550         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1551         (*nodemap) = talloc_zero_size(mem_ctx, len);
1552         CTDB_NO_MEMORY(ctdb, (*nodemap));
1553
1554         (*nodemap)->num = nodemapv4->num;
1555         for (i=0; i<nodemapv4->num; i++) {
1556                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1557                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1558                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1559                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1560         }
1561                 
1562         talloc_free(outdata.dptr);
1563                     
1564         return 0;
1565 }
1566
1567 /*
1568   drop the transport, reload the nodes file and restart the transport
1569  */
1570 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1571                     struct timeval timeout, uint32_t destnode)
1572 {
1573         int ret;
1574         int32_t res;
1575
1576         ret = ctdb_control(ctdb, destnode, 0, 
1577                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1578                            NULL, NULL, &res, &timeout, NULL);
1579         if (ret != 0 || res != 0) {
1580                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1581                 return -1;
1582         }
1583
1584         return 0;
1585 }
1586
1587
1588 /*
1589   set vnn map on a node
1590  */
1591 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1592                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1593 {
1594         int ret;
1595         TDB_DATA data;
1596         int32_t res;
1597         struct ctdb_vnn_map_wire *map;
1598         size_t len;
1599
1600         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1601         map = talloc_size(mem_ctx, len);
1602         CTDB_NO_MEMORY(ctdb, map);
1603
1604         map->generation = vnnmap->generation;
1605         map->size = vnnmap->size;
1606         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1607         
1608         data.dsize = len;
1609         data.dptr  = (uint8_t *)map;
1610
1611         ret = ctdb_control(ctdb, destnode, 0, 
1612                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1613                            NULL, NULL, &res, &timeout, NULL);
1614         if (ret != 0 || res != 0) {
1615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1616                 return -1;
1617         }
1618
1619         talloc_free(map);
1620
1621         return 0;
1622 }
1623
1624
1625 /*
1626   async send for pull database
1627  */
1628 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1629         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1630         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1631 {
1632         TDB_DATA indata;
1633         struct ctdb_control_pulldb *pull;
1634         struct ctdb_client_control_state *state;
1635
1636         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1637         CTDB_NO_MEMORY_NULL(ctdb, pull);
1638
1639         pull->db_id   = dbid;
1640         pull->lmaster = lmaster;
1641
1642         indata.dsize = sizeof(struct ctdb_control_pulldb);
1643         indata.dptr  = (unsigned char *)pull;
1644
1645         state = ctdb_control_send(ctdb, destnode, 0, 
1646                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1647                                   mem_ctx, &timeout, NULL);
1648         talloc_free(pull);
1649
1650         return state;
1651 }
1652
1653 /*
1654   async recv for pull database
1655  */
1656 int ctdb_ctrl_pulldb_recv(
1657         struct ctdb_context *ctdb, 
1658         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1659         TDB_DATA *outdata)
1660 {
1661         int ret;
1662         int32_t res;
1663
1664         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1665         if ( (ret != 0) || (res != 0) ){
1666                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1667                 return -1;
1668         }
1669
1670         return 0;
1671 }
1672
1673 /*
1674   pull all keys and records for a specific database on a node
1675  */
1676 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1677                 uint32_t dbid, uint32_t lmaster, 
1678                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1679                 TDB_DATA *outdata)
1680 {
1681         struct ctdb_client_control_state *state;
1682
1683         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1684                                       timeout);
1685         
1686         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1687 }
1688
1689
1690 /*
1691   change dmaster for all keys in the database to the new value
1692  */
1693 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1694                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1695 {
1696         int ret;
1697         TDB_DATA indata;
1698         int32_t res;
1699
1700         indata.dsize = 2*sizeof(uint32_t);
1701         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1702
1703         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1704         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1705
1706         ret = ctdb_control(ctdb, destnode, 0, 
1707                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1708                            NULL, NULL, &res, &timeout, NULL);
1709         if (ret != 0 || res != 0) {
1710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1711                 return -1;
1712         }
1713
1714         return 0;
1715 }
1716
1717 /*
1718   ping a node, return number of clients connected
1719  */
1720 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1721 {
1722         int ret;
1723         int32_t res;
1724
1725         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1726                            tdb_null, NULL, NULL, &res, NULL, NULL);
1727         if (ret != 0) {
1728                 return -1;
1729         }
1730         return res;
1731 }
1732
1733 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1734                            struct timeval timeout, 
1735                            uint32_t destnode,
1736                            uint32_t *runstate)
1737 {
1738         TDB_DATA outdata;
1739         int32_t res;
1740         int ret;
1741
1742         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1743                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1744         if (ret != 0 || res != 0) {
1745                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1746                 return ret != 0 ? ret : res;
1747         }
1748
1749         if (outdata.dsize != sizeof(uint32_t)) {
1750                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1751                 talloc_free(outdata.dptr);
1752                 return -1;
1753         }
1754
1755         if (runstate != NULL) {
1756                 *runstate = *(uint32_t *)outdata.dptr;
1757         }
1758         talloc_free(outdata.dptr);
1759
1760         return 0;
1761 }
1762
1763 /*
1764   find the real path to a ltdb 
1765  */
1766 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1767                    const char **path)
1768 {
1769         int ret;
1770         int32_t res;
1771         TDB_DATA data;
1772
1773         data.dptr = (uint8_t *)&dbid;
1774         data.dsize = sizeof(dbid);
1775
1776         ret = ctdb_control(ctdb, destnode, 0, 
1777                            CTDB_CONTROL_GETDBPATH, 0, data, 
1778                            mem_ctx, &data, &res, &timeout, NULL);
1779         if (ret != 0 || res != 0) {
1780                 return -1;
1781         }
1782
1783         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1784         if ((*path) == NULL) {
1785                 return -1;
1786         }
1787
1788         talloc_free(data.dptr);
1789
1790         return 0;
1791 }
1792
1793 /*
1794   find the name of a db 
1795  */
1796 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1797                    const char **name)
1798 {
1799         int ret;
1800         int32_t res;
1801         TDB_DATA data;
1802
1803         data.dptr = (uint8_t *)&dbid;
1804         data.dsize = sizeof(dbid);
1805
1806         ret = ctdb_control(ctdb, destnode, 0, 
1807                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1808                            mem_ctx, &data, &res, &timeout, NULL);
1809         if (ret != 0 || res != 0) {
1810                 return -1;
1811         }
1812
1813         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1814         if ((*name) == NULL) {
1815                 return -1;
1816         }
1817
1818         talloc_free(data.dptr);
1819
1820         return 0;
1821 }
1822
1823 /*
1824   get the health status of a db
1825  */
1826 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1827                           struct timeval timeout,
1828                           uint32_t destnode,
1829                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1830                           const char **reason)
1831 {
1832         int ret;
1833         int32_t res;
1834         TDB_DATA data;
1835
1836         data.dptr = (uint8_t *)&dbid;
1837         data.dsize = sizeof(dbid);
1838
1839         ret = ctdb_control(ctdb, destnode, 0,
1840                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1841                            mem_ctx, &data, &res, &timeout, NULL);
1842         if (ret != 0 || res != 0) {
1843                 return -1;
1844         }
1845
1846         if (data.dsize == 0) {
1847                 (*reason) = NULL;
1848                 return 0;
1849         }
1850
1851         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1852         if ((*reason) == NULL) {
1853                 return -1;
1854         }
1855
1856         talloc_free(data.dptr);
1857
1858         return 0;
1859 }
1860
1861 /*
1862  * get db sequence number
1863  */
1864 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1865                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1866 {
1867         int ret;
1868         int32_t res;
1869         TDB_DATA data, outdata;
1870
1871         data.dptr = (uint8_t *)&dbid;
1872         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1873
1874         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1875                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1876         if (ret != 0 || res != 0) {
1877                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1878                 return -1;
1879         }
1880
1881         if (outdata.dsize != sizeof(uint64_t)) {
1882                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1883                 talloc_free(outdata.dptr);
1884                 return -1;
1885         }
1886
1887         if (seqnum != NULL) {
1888                 *seqnum = *(uint64_t *)outdata.dptr;
1889         }
1890         talloc_free(outdata.dptr);
1891
1892         return 0;
1893 }
1894
1895 /*
1896   create a database
1897  */
1898 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1899                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1900 {
1901         int ret;
1902         int32_t res;
1903         TDB_DATA data;
1904         uint64_t tdb_flags = 0;
1905
1906         data.dptr = discard_const(name);
1907         data.dsize = strlen(name)+1;
1908
1909         /* Make sure that volatile databases use jenkins hash */
1910         if (!persistent) {
1911                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1912         }
1913
1914         ret = ctdb_control(ctdb, destnode, tdb_flags,
1915                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1916                            0, data, 
1917                            mem_ctx, &data, &res, &timeout, NULL);
1918
1919         if (ret != 0 || res != 0) {
1920                 return -1;
1921         }
1922
1923         return 0;
1924 }
1925
1926 /*
1927   get debug level on a node
1928  */
1929 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1930 {
1931         int ret;
1932         int32_t res;
1933         TDB_DATA data;
1934
1935         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1936                            ctdb, &data, &res, NULL, NULL);
1937         if (ret != 0 || res != 0) {
1938                 return -1;
1939         }
1940         if (data.dsize != sizeof(int32_t)) {
1941                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1942                          (unsigned)data.dsize));
1943                 return -1;
1944         }
1945         *level = *(int32_t *)data.dptr;
1946         talloc_free(data.dptr);
1947         return 0;
1948 }
1949
1950 /*
1951   set debug level on a node
1952  */
1953 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1954 {
1955         int ret;
1956         int32_t res;
1957         TDB_DATA data;
1958
1959         data.dptr = (uint8_t *)&level;
1960         data.dsize = sizeof(level);
1961
1962         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1963                            NULL, NULL, &res, NULL, NULL);
1964         if (ret != 0 || res != 0) {
1965                 return -1;
1966         }
1967         return 0;
1968 }
1969
1970
1971 /*
1972   get a list of connected nodes
1973  */
1974 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1975                                 struct timeval timeout,
1976                                 TALLOC_CTX *mem_ctx,
1977                                 uint32_t *num_nodes)
1978 {
1979         struct ctdb_node_map *map=NULL;
1980         int ret, i;
1981         uint32_t *nodes;
1982
1983         *num_nodes = 0;
1984
1985         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1986         if (ret != 0) {
1987                 return NULL;
1988         }
1989
1990         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1991         if (nodes == NULL) {
1992                 return NULL;
1993         }
1994
1995         for (i=0;i<map->num;i++) {
1996                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1997                         nodes[*num_nodes] = map->nodes[i].pnn;
1998                         (*num_nodes)++;
1999                 }
2000         }
2001
2002         return nodes;
2003 }
2004
2005
2006 /*
2007   reset remote status
2008  */
2009 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2010 {
2011         int ret;
2012         int32_t res;
2013
2014         ret = ctdb_control(ctdb, destnode, 0, 
2015                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2016                            NULL, NULL, &res, NULL, NULL);
2017         if (ret != 0 || res != 0) {
2018                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2019                 return -1;
2020         }
2021         return 0;
2022 }
2023
2024 /*
2025   attach to a specific database - client call
2026 */
2027 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2028                                     struct timeval timeout,
2029                                     const char *name,
2030                                     bool persistent,
2031                                     uint32_t tdb_flags)
2032 {
2033         struct ctdb_db_context *ctdb_db;
2034         TDB_DATA data;
2035         int ret;
2036         int32_t res;
2037
2038         ctdb_db = ctdb_db_handle(ctdb, name);
2039         if (ctdb_db) {
2040                 return ctdb_db;
2041         }
2042
2043         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2044         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2045
2046         ctdb_db->ctdb = ctdb;
2047         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2048         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2049
2050         data.dptr = discard_const(name);
2051         data.dsize = strlen(name)+1;
2052
2053         /* CTDB has switched to using jenkins hash for volatile databases.
2054          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2055          * always set it.
2056          */
2057         if (!persistent) {
2058                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2059         }
2060
2061         /* tell ctdb daemon to attach */
2062         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2063                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2064                            0, data, ctdb_db, &data, &res, NULL, NULL);
2065         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2066                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2067                 talloc_free(ctdb_db);
2068                 return NULL;
2069         }
2070         
2071         ctdb_db->db_id = *(uint32_t *)data.dptr;
2072         talloc_free(data.dptr);
2073
2074         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2075         if (ret != 0) {
2076                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2077                 talloc_free(ctdb_db);
2078                 return NULL;
2079         }
2080
2081         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2082         if (ctdb->valgrinding) {
2083                 tdb_flags |= TDB_NOMMAP;
2084         }
2085         tdb_flags |= TDB_DISALLOW_NESTING;
2086
2087         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2088         if (ctdb_db->ltdb == NULL) {
2089                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2090                 talloc_free(ctdb_db);
2091                 return NULL;
2092         }
2093
2094         ctdb_db->persistent = persistent;
2095
2096         DLIST_ADD(ctdb->db_list, ctdb_db);
2097
2098         /* add well known functions */
2099         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2100         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2101         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2102
2103         return ctdb_db;
2104 }
2105
2106
2107 /*
2108   setup a call for a database
2109  */
2110 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2111 {
2112         struct ctdb_registered_call *call;
2113
2114 #if 0
2115         TDB_DATA data;
2116         int32_t status;
2117         struct ctdb_control_set_call c;
2118         int ret;
2119
2120         /* this is no longer valid with the separate daemon architecture */
2121         c.db_id = ctdb_db->db_id;
2122         c.fn    = fn;
2123         c.id    = id;
2124
2125         data.dptr = (uint8_t *)&c;
2126         data.dsize = sizeof(c);
2127
2128         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2129                            data, NULL, NULL, &status, NULL, NULL);
2130         if (ret != 0 || status != 0) {
2131                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2132                 return -1;
2133         }
2134 #endif
2135
2136         /* also register locally */
2137         call = talloc(ctdb_db, struct ctdb_registered_call);
2138         call->fn = fn;
2139         call->id = id;
2140
2141         DLIST_ADD(ctdb_db->calls, call);        
2142         return 0;
2143 }
2144
2145
2146 struct traverse_state {
2147         bool done;
2148         uint32_t count;
2149         ctdb_traverse_func fn;
2150         void *private_data;
2151         bool listemptyrecords;
2152 };
2153
2154 /*
2155   called on each key during a ctdb_traverse
2156  */
2157 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2158 {
2159         struct traverse_state *state = (struct traverse_state *)p;
2160         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2161         TDB_DATA key;
2162
2163         if (data.dsize < sizeof(uint32_t) ||
2164             d->length != data.dsize) {
2165                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2166                 state->done = true;
2167                 return;
2168         }
2169
2170         key.dsize = d->keylen;
2171         key.dptr  = &d->data[0];
2172         data.dsize = d->datalen;
2173         data.dptr = &d->data[d->keylen];
2174
2175         if (key.dsize == 0 && data.dsize == 0) {
2176                 /* end of traverse */
2177                 state->done = true;
2178                 return;
2179         }
2180
2181         if (!state->listemptyrecords &&
2182             data.dsize == sizeof(struct ctdb_ltdb_header))
2183         {
2184                 /* empty records are deleted records in ctdb */
2185                 return;
2186         }
2187
2188         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2189                 state->done = true;
2190         }
2191
2192         state->count++;
2193 }
2194
2195 /**
2196  * start a cluster wide traverse, calling the supplied fn on each record
2197  * return the number of records traversed, or -1 on error
2198  *
2199  * Extendet variant with a flag to signal whether empty records should
2200  * be listed.
2201  */
2202 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2203                              ctdb_traverse_func fn,
2204                              bool withemptyrecords,
2205                              void *private_data)
2206 {
2207         TDB_DATA data;
2208         struct ctdb_traverse_start_ext t;
2209         int32_t status;
2210         int ret;
2211         uint64_t srvid = (getpid() | 0xFLL<<60);
2212         struct traverse_state state;
2213
2214         state.done = false;
2215         state.count = 0;
2216         state.private_data = private_data;
2217         state.fn = fn;
2218         state.listemptyrecords = withemptyrecords;
2219
2220         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2221         if (ret != 0) {
2222                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2223                 return -1;
2224         }
2225
2226         t.db_id = ctdb_db->db_id;
2227         t.srvid = srvid;
2228         t.reqid = 0;
2229         t.withemptyrecords = withemptyrecords;
2230
2231         data.dptr = (uint8_t *)&t;
2232         data.dsize = sizeof(t);
2233
2234         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2235                            data, NULL, NULL, &status, NULL, NULL);
2236         if (ret != 0 || status != 0) {
2237                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2238                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2239                 return -1;
2240         }
2241
2242         while (!state.done) {
2243                 event_loop_once(ctdb_db->ctdb->ev);
2244         }
2245
2246         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2247         if (ret != 0) {
2248                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2249                 return -1;
2250         }
2251
2252         return state.count;
2253 }
2254
2255 /**
2256  * start a cluster wide traverse, calling the supplied fn on each record
2257  * return the number of records traversed, or -1 on error
2258  *
2259  * Standard version which does not list the empty records:
2260  * These are considered deleted.
2261  */
2262 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2263 {
2264         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2265 }
2266
2267 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2268 /*
2269   called on each key during a catdb
2270  */
2271 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2272 {
2273         int i;
2274         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2275         FILE *f = c->f;
2276         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2277
2278         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2279         for (i=0;i<key.dsize;i++) {
2280                 if (ISASCII(key.dptr[i])) {
2281                         fprintf(f, "%c", key.dptr[i]);
2282                 } else {
2283                         fprintf(f, "\\%02X", key.dptr[i]);
2284                 }
2285         }
2286         fprintf(f, "\"\n");
2287
2288         fprintf(f, "dmaster: %u\n", h->dmaster);
2289         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2290
2291         if (c->printlmaster && ctdb->vnn_map != NULL) {
2292                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2293         }
2294
2295         if (c->printhash) {
2296                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2297         }
2298
2299         if (c->printrecordflags) {
2300                 fprintf(f, "flags: 0x%08x", h->flags);
2301                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2302                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2303                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2304                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2305                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2306                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2307                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2308                 fprintf(f, "\n");
2309         }
2310
2311         if (c->printdatasize) {
2312                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2313         } else {
2314                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2315                 for (i=sizeof(*h);i<data.dsize;i++) {
2316                         if (ISASCII(data.dptr[i])) {
2317                                 fprintf(f, "%c", data.dptr[i]);
2318                         } else {
2319                                 fprintf(f, "\\%02X", data.dptr[i]);
2320                         }
2321                 }
2322                 fprintf(f, "\"\n");
2323         }
2324
2325         fprintf(f, "\n");
2326
2327         return 0;
2328 }
2329
2330 /*
2331   convenience function to list all keys to stdout
2332  */
2333 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2334                  struct ctdb_dump_db_context *ctx)
2335 {
2336         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2337                                  ctx->printemptyrecords, ctx);
2338 }
2339
2340 /*
2341   get the pid of a ctdb daemon
2342  */
2343 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2344 {
2345         int ret;
2346         int32_t res;
2347
2348         ret = ctdb_control(ctdb, destnode, 0, 
2349                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2350                            NULL, NULL, &res, &timeout, NULL);
2351         if (ret != 0) {
2352                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2353                 return -1;
2354         }
2355
2356         *pid = res;
2357
2358         return 0;
2359 }
2360
2361
2362 /*
2363   async freeze send control
2364  */
2365 struct ctdb_client_control_state *
2366 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2367 {
2368         return ctdb_control_send(ctdb, destnode, priority, 
2369                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2370                            mem_ctx, &timeout, NULL);
2371 }
2372
2373 /* 
2374    async freeze recv control
2375 */
2376 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2377 {
2378         int ret;
2379         int32_t res;
2380
2381         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2382         if ( (ret != 0) || (res != 0) ){
2383                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2384                 return -1;
2385         }
2386
2387         return 0;
2388 }
2389
2390 /*
2391   freeze databases of a certain priority
2392  */
2393 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2394 {
2395         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2396         struct ctdb_client_control_state *state;
2397         int ret;
2398
2399         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2400         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2401         talloc_free(tmp_ctx);
2402
2403         return ret;
2404 }
2405
2406 /* Freeze all databases */
2407 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2408 {
2409         int i;
2410
2411         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2412                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2413                         return -1;
2414                 }
2415         }
2416         return 0;
2417 }
2418
2419 /*
2420   thaw databases of a certain priority
2421  */
2422 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2423 {
2424         int ret;
2425         int32_t res;
2426
2427         ret = ctdb_control(ctdb, destnode, priority, 
2428                            CTDB_CONTROL_THAW, 0, tdb_null, 
2429                            NULL, NULL, &res, &timeout, NULL);
2430         if (ret != 0 || res != 0) {
2431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2432                 return -1;
2433         }
2434
2435         return 0;
2436 }
2437
2438 /* thaw all databases */
2439 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2440 {
2441         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2442 }
2443
2444 /*
2445   get pnn of a node, or -1
2446  */
2447 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2448 {
2449         int ret;
2450         int32_t res;
2451
2452         ret = ctdb_control(ctdb, destnode, 0, 
2453                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2454                            NULL, NULL, &res, &timeout, NULL);
2455         if (ret != 0) {
2456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2457                 return -1;
2458         }
2459
2460         return res;
2461 }
2462
2463 /*
2464   get the monitoring mode of a remote node
2465  */
2466 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2467 {
2468         int ret;
2469         int32_t res;
2470
2471         ret = ctdb_control(ctdb, destnode, 0, 
2472                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2473                            NULL, NULL, &res, &timeout, NULL);
2474         if (ret != 0) {
2475                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2476                 return -1;
2477         }
2478
2479         *monmode = res;
2480
2481         return 0;
2482 }
2483
2484
2485 /*
2486  set the monitoring mode of a remote node to active
2487  */
2488 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2489 {
2490         int ret;
2491         
2492
2493         ret = ctdb_control(ctdb, destnode, 0, 
2494                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2495                            NULL, NULL,NULL, &timeout, NULL);
2496         if (ret != 0) {
2497                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2498                 return -1;
2499         }
2500
2501         
2502
2503         return 0;
2504 }
2505
2506 /*
2507   set the monitoring mode of a remote node to disable
2508  */
2509 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2510 {
2511         int ret;
2512         
2513
2514         ret = ctdb_control(ctdb, destnode, 0, 
2515                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2516                            NULL, NULL, NULL, &timeout, NULL);
2517         if (ret != 0) {
2518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2519                 return -1;
2520         }
2521
2522         
2523
2524         return 0;
2525 }
2526
2527
2528
2529 /* 
2530   sent to a node to make it take over an ip address
2531 */
2532 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2533                           uint32_t destnode, struct ctdb_public_ip *ip)
2534 {
2535         TDB_DATA data;
2536         struct ctdb_public_ipv4 ipv4;
2537         int ret;
2538         int32_t res;
2539
2540         if (ip->addr.sa.sa_family == AF_INET) {
2541                 ipv4.pnn = ip->pnn;
2542                 ipv4.sin = ip->addr.ip;
2543
2544                 data.dsize = sizeof(ipv4);
2545                 data.dptr  = (uint8_t *)&ipv4;
2546
2547                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2548                            NULL, &res, &timeout, NULL);
2549         } else {
2550                 data.dsize = sizeof(*ip);
2551                 data.dptr  = (uint8_t *)ip;
2552
2553                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2554                            NULL, &res, &timeout, NULL);
2555         }
2556
2557         if (ret != 0 || res != 0) {
2558                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2559                 return -1;
2560         }
2561
2562         return 0;       
2563 }
2564
2565
2566 /* 
2567   sent to a node to make it release an ip address
2568 */
2569 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2570                          uint32_t destnode, struct ctdb_public_ip *ip)
2571 {
2572         TDB_DATA data;
2573         struct ctdb_public_ipv4 ipv4;
2574         int ret;
2575         int32_t res;
2576
2577         if (ip->addr.sa.sa_family == AF_INET) {
2578                 ipv4.pnn = ip->pnn;
2579                 ipv4.sin = ip->addr.ip;
2580
2581                 data.dsize = sizeof(ipv4);
2582                 data.dptr  = (uint8_t *)&ipv4;
2583
2584                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2585                                    NULL, &res, &timeout, NULL);
2586         } else {
2587                 data.dsize = sizeof(*ip);
2588                 data.dptr  = (uint8_t *)ip;
2589
2590                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2591                                    NULL, &res, &timeout, NULL);
2592         }
2593
2594         if (ret != 0 || res != 0) {
2595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2596                 return -1;
2597         }
2598
2599         return 0;       
2600 }
2601
2602
2603 /*
2604   get a tunable
2605  */
2606 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2607                           struct timeval timeout, 
2608                           uint32_t destnode,
2609                           const char *name, uint32_t *value)
2610 {
2611         struct ctdb_control_get_tunable *t;
2612         TDB_DATA data, outdata;
2613         int32_t res;
2614         int ret;
2615
2616         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2617         data.dptr  = talloc_size(ctdb, data.dsize);
2618         CTDB_NO_MEMORY(ctdb, data.dptr);
2619
2620         t = (struct ctdb_control_get_tunable *)data.dptr;
2621         t->length = strlen(name)+1;
2622         memcpy(t->name, name, t->length);
2623
2624         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2625                            &outdata, &res, &timeout, NULL);
2626         talloc_free(data.dptr);
2627         if (ret != 0 || res != 0) {
2628                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2629                 return ret != 0 ? ret : res;
2630         }
2631
2632         if (outdata.dsize != sizeof(uint32_t)) {
2633                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2634                 talloc_free(outdata.dptr);
2635                 return -1;
2636         }
2637         
2638         *value = *(uint32_t *)outdata.dptr;
2639         talloc_free(outdata.dptr);
2640
2641         return 0;
2642 }
2643
2644 /*
2645   set a tunable
2646  */
2647 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2648                           struct timeval timeout, 
2649                           uint32_t destnode,
2650                           const char *name, uint32_t value)
2651 {
2652         struct ctdb_control_set_tunable *t;
2653         TDB_DATA data;
2654         int32_t res;
2655         int ret;
2656
2657         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2658         data.dptr  = talloc_size(ctdb, data.dsize);
2659         CTDB_NO_MEMORY(ctdb, data.dptr);
2660
2661         t = (struct ctdb_control_set_tunable *)data.dptr;
2662         t->length = strlen(name)+1;
2663         memcpy(t->name, name, t->length);
2664         t->value = value;
2665
2666         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2667                            NULL, &res, &timeout, NULL);
2668         talloc_free(data.dptr);
2669         if (ret != 0 || res != 0) {
2670                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2671                 return -1;
2672         }
2673
2674         return 0;
2675 }
2676
2677 /*
2678   list tunables
2679  */
2680 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2681                             struct timeval timeout, 
2682                             uint32_t destnode,
2683                             TALLOC_CTX *mem_ctx,
2684                             const char ***list, uint32_t *count)
2685 {
2686         TDB_DATA outdata;
2687         int32_t res;
2688         int ret;
2689         struct ctdb_control_list_tunable *t;
2690         char *p, *s, *ptr;
2691
2692         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2693                            mem_ctx, &outdata, &res, &timeout, NULL);
2694         if (ret != 0 || res != 0) {
2695                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2696                 return -1;
2697         }
2698
2699         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2700         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2701             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2702                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2703                 talloc_free(outdata.dptr);
2704                 return -1;              
2705         }
2706         
2707         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2708         CTDB_NO_MEMORY(ctdb, p);
2709
2710         talloc_free(outdata.dptr);
2711         
2712         (*list) = NULL;
2713         (*count) = 0;
2714
2715         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2716                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2717                 CTDB_NO_MEMORY(ctdb, *list);
2718                 (*list)[*count] = talloc_strdup(*list, s);
2719                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2720                 (*count)++;
2721         }
2722
2723         talloc_free(p);
2724
2725         return 0;
2726 }
2727
2728
2729 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2730                                    struct timeval timeout, uint32_t destnode,
2731                                    TALLOC_CTX *mem_ctx,
2732                                    uint32_t flags,
2733                                    struct ctdb_all_public_ips **ips)
2734 {
2735         int ret;
2736         TDB_DATA outdata;
2737         int32_t res;
2738
2739         ret = ctdb_control(ctdb, destnode, 0, 
2740                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2741                            mem_ctx, &outdata, &res, &timeout, NULL);
2742         if (ret == 0 && res == -1) {
2743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2744                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2745         }
2746         if (ret != 0 || res != 0) {
2747           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2748                 return -1;
2749         }
2750
2751         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2752         talloc_free(outdata.dptr);
2753                     
2754         return 0;
2755 }
2756
2757 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2758                              struct timeval timeout, uint32_t destnode,
2759                              TALLOC_CTX *mem_ctx,
2760                              struct ctdb_all_public_ips **ips)
2761 {
2762         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2763                                               destnode, mem_ctx,
2764                                               0, ips);
2765 }
2766
2767 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2768                         struct timeval timeout, uint32_t destnode, 
2769                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2770 {
2771         int ret, i, len;
2772         TDB_DATA outdata;
2773         int32_t res;
2774         struct ctdb_all_public_ipsv4 *ipsv4;
2775
2776         ret = ctdb_control(ctdb, destnode, 0, 
2777                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2778                            mem_ctx, &outdata, &res, &timeout, NULL);
2779         if (ret != 0 || res != 0) {
2780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2781                 return -1;
2782         }
2783
2784         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2785         len = offsetof(struct ctdb_all_public_ips, ips) +
2786                 ipsv4->num*sizeof(struct ctdb_public_ip);
2787         *ips = talloc_zero_size(mem_ctx, len);
2788         CTDB_NO_MEMORY(ctdb, *ips);
2789         (*ips)->num = ipsv4->num;
2790         for (i=0; i<ipsv4->num; i++) {
2791                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2792                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2793         }
2794
2795         talloc_free(outdata.dptr);
2796                     
2797         return 0;
2798 }
2799
2800 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2801                                  struct timeval timeout, uint32_t destnode,
2802                                  TALLOC_CTX *mem_ctx,
2803                                  const ctdb_sock_addr *addr,
2804                                  struct ctdb_control_public_ip_info **_info)
2805 {
2806         int ret;
2807         TDB_DATA indata;
2808         TDB_DATA outdata;
2809         int32_t res;
2810         struct ctdb_control_public_ip_info *info;
2811         uint32_t len;
2812         uint32_t i;
2813
2814         indata.dptr = discard_const_p(uint8_t, addr);
2815         indata.dsize = sizeof(*addr);
2816
2817         ret = ctdb_control(ctdb, destnode, 0,
2818                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2819                            mem_ctx, &outdata, &res, &timeout, NULL);
2820         if (ret != 0 || res != 0) {
2821                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2822                                 "failed ret:%d res:%d\n",
2823                                 ret, res));
2824                 return -1;
2825         }
2826
2827         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2828         if (len > outdata.dsize) {
2829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2830                                 "returned invalid data with size %u > %u\n",
2831                                 (unsigned int)outdata.dsize,
2832                                 (unsigned int)len));
2833                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2834                 return -1;
2835         }
2836
2837         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2838         len += info->num*sizeof(struct ctdb_control_iface_info);
2839
2840         if (len > outdata.dsize) {
2841                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2842                                 "returned invalid data with size %u > %u\n",
2843                                 (unsigned int)outdata.dsize,
2844                                 (unsigned int)len));
2845                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2846                 return -1;
2847         }
2848
2849         /* make sure we null terminate the returned strings */
2850         for (i=0; i < info->num; i++) {
2851                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2852         }
2853
2854         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2855                                                                 outdata.dptr,
2856                                                                 outdata.dsize);
2857         talloc_free(outdata.dptr);
2858         if (*_info == NULL) {
2859                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2860                                 "talloc_memdup size %u failed\n",
2861                                 (unsigned int)outdata.dsize));
2862                 return -1;
2863         }
2864
2865         return 0;
2866 }
2867
2868 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2869                          struct timeval timeout, uint32_t destnode,
2870                          TALLOC_CTX *mem_ctx,
2871                          struct ctdb_control_get_ifaces **_ifaces)
2872 {
2873         int ret;
2874         TDB_DATA outdata;
2875         int32_t res;
2876         struct ctdb_control_get_ifaces *ifaces;
2877         uint32_t len;
2878         uint32_t i;
2879
2880         ret = ctdb_control(ctdb, destnode, 0,
2881                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2882                            mem_ctx, &outdata, &res, &timeout, NULL);
2883         if (ret != 0 || res != 0) {
2884                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2885                                 "failed ret:%d res:%d\n",
2886                                 ret, res));
2887                 return -1;
2888         }
2889
2890         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2891         if (len > outdata.dsize) {
2892                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2893                                 "returned invalid data with size %u > %u\n",
2894                                 (unsigned int)outdata.dsize,
2895                                 (unsigned int)len));
2896                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2897                 return -1;
2898         }
2899
2900         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2901         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2902
2903         if (len > outdata.dsize) {
2904                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2905                                 "returned invalid data with size %u > %u\n",
2906                                 (unsigned int)outdata.dsize,
2907                                 (unsigned int)len));
2908                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2909                 return -1;
2910         }
2911
2912         /* make sure we null terminate the returned strings */
2913         for (i=0; i < ifaces->num; i++) {
2914                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2915         }
2916
2917         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2918                                                                   outdata.dptr,
2919                                                                   outdata.dsize);
2920         talloc_free(outdata.dptr);
2921         if (*_ifaces == NULL) {
2922                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2923                                 "talloc_memdup size %u failed\n",
2924                                 (unsigned int)outdata.dsize));
2925                 return -1;
2926         }
2927
2928         return 0;
2929 }
2930
2931 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2932                              struct timeval timeout, uint32_t destnode,
2933                              TALLOC_CTX *mem_ctx,
2934                              const struct ctdb_control_iface_info *info)
2935 {
2936         int ret;
2937         TDB_DATA indata;
2938         int32_t res;
2939
2940         indata.dptr = discard_const_p(uint8_t, info);
2941         indata.dsize = sizeof(*info);
2942
2943         ret = ctdb_control(ctdb, destnode, 0,
2944                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2945                            mem_ctx, NULL, &res, &timeout, NULL);
2946         if (ret != 0 || res != 0) {
2947                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2948                                 "failed ret:%d res:%d\n",
2949                                 ret, res));
2950                 return -1;
2951         }
2952
2953         return 0;
2954 }
2955
2956 /*
2957   set/clear the permanent disabled bit on a remote node
2958  */
2959 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2960                        uint32_t set, uint32_t clear)
2961 {
2962         int ret;
2963         TDB_DATA data;
2964         struct ctdb_node_map *nodemap=NULL;
2965         struct ctdb_node_flag_change c;
2966         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2967         uint32_t recmaster;
2968         uint32_t *nodes;
2969
2970
2971         /* find the recovery master */
2972         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2973         if (ret != 0) {
2974                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2975                 talloc_free(tmp_ctx);
2976                 return ret;
2977         }
2978
2979
2980         /* read the node flags from the recmaster */
2981         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2984                 talloc_free(tmp_ctx);
2985                 return -1;
2986         }
2987         if (destnode >= nodemap->num) {
2988                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2989                 talloc_free(tmp_ctx);
2990                 return -1;
2991         }
2992
2993         c.pnn       = destnode;
2994         c.old_flags = nodemap->nodes[destnode].flags;
2995         c.new_flags = c.old_flags;
2996         c.new_flags |= set;
2997         c.new_flags &= ~clear;
2998
2999         data.dsize = sizeof(c);
3000         data.dptr = (unsigned char *)&c;
3001
3002         /* send the flags update to all connected nodes */
3003         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3004
3005         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3006                                         nodes, 0,
3007                                         timeout, false, data,
3008                                         NULL, NULL,
3009                                         NULL) != 0) {
3010                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3011
3012                 talloc_free(tmp_ctx);
3013                 return -1;
3014         }
3015
3016         talloc_free(tmp_ctx);
3017         return 0;
3018 }
3019
3020
3021 /*
3022   get all tunables
3023  */
3024 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3025                                struct timeval timeout, 
3026                                uint32_t destnode,
3027                                struct ctdb_tunable *tunables)
3028 {
3029         TDB_DATA outdata;
3030         int ret;
3031         int32_t res;
3032
3033         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3034                            &outdata, &res, &timeout, NULL);
3035         if (ret != 0 || res != 0) {
3036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3037                 return -1;
3038         }
3039
3040         if (outdata.dsize != sizeof(*tunables)) {
3041                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3042                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3043                 return -1;              
3044         }
3045
3046         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3047         talloc_free(outdata.dptr);
3048         return 0;
3049 }
3050
3051 /*
3052   add a public address to a node
3053  */
3054 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3055                       struct timeval timeout, 
3056                       uint32_t destnode,
3057                       struct ctdb_control_ip_iface *pub)
3058 {
3059         TDB_DATA data;
3060         int32_t res;
3061         int ret;
3062
3063         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3064         data.dptr  = (unsigned char *)pub;
3065
3066         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3067                            NULL, &res, &timeout, NULL);
3068         if (ret != 0 || res != 0) {
3069                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3070                 return -1;
3071         }
3072
3073         return 0;
3074 }
3075
3076 /*
3077   delete a public address from a node
3078  */
3079 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3080                       struct timeval timeout, 
3081                       uint32_t destnode,
3082                       struct ctdb_control_ip_iface *pub)
3083 {
3084         TDB_DATA data;
3085         int32_t res;
3086         int ret;
3087
3088         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3089         data.dptr  = (unsigned char *)pub;
3090
3091         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3092                            NULL, &res, &timeout, NULL);
3093         if (ret != 0 || res != 0) {
3094                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3095                 return -1;
3096         }
3097
3098         return 0;
3099 }
3100
3101 /*
3102   kill a tcp connection
3103  */
3104 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3105                       struct timeval timeout, 
3106                       uint32_t destnode,
3107                       struct ctdb_control_killtcp *killtcp)
3108 {
3109         TDB_DATA data;
3110         int32_t res;
3111         int ret;
3112
3113         data.dsize = sizeof(struct ctdb_control_killtcp);
3114         data.dptr  = (unsigned char *)killtcp;
3115
3116         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3117                            NULL, &res, &timeout, NULL);
3118         if (ret != 0 || res != 0) {
3119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3120                 return -1;
3121         }
3122
3123         return 0;
3124 }
3125
3126 /*
3127   send a gratious arp
3128  */
3129 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3130                       struct timeval timeout, 
3131                       uint32_t destnode,
3132                       ctdb_sock_addr *addr,
3133                       const char *ifname)
3134 {
3135         TDB_DATA data;
3136         int32_t res;
3137         int ret, len;
3138         struct ctdb_control_gratious_arp *gratious_arp;
3139         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3140
3141
3142         len = strlen(ifname)+1;
3143         gratious_arp = talloc_size(tmp_ctx, 
3144                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3145         CTDB_NO_MEMORY(ctdb, gratious_arp);
3146
3147         gratious_arp->addr = *addr;
3148         gratious_arp->len = len;
3149         memcpy(&gratious_arp->iface[0], ifname, len);
3150
3151
3152         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3153         data.dptr  = (unsigned char *)gratious_arp;
3154
3155         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3156                            NULL, &res, &timeout, NULL);
3157         if (ret != 0 || res != 0) {
3158                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3159                 talloc_free(tmp_ctx);
3160                 return -1;
3161         }
3162
3163         talloc_free(tmp_ctx);
3164         return 0;
3165 }
3166
3167 /*
3168   get a list of all tcp tickles that a node knows about for a particular vnn
3169  */
3170 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3171                               struct timeval timeout, uint32_t destnode, 
3172                               TALLOC_CTX *mem_ctx, 
3173                               ctdb_sock_addr *addr,
3174                               struct ctdb_control_tcp_tickle_list **list)
3175 {
3176         int ret;
3177         TDB_DATA data, outdata;
3178         int32_t status;
3179
3180         data.dptr = (uint8_t*)addr;
3181         data.dsize = sizeof(ctdb_sock_addr);
3182
3183         ret = ctdb_control(ctdb, destnode, 0, 
3184                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3185                            mem_ctx, &outdata, &status, NULL, NULL);
3186         if (ret != 0 || status != 0) {
3187                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3188                 return -1;
3189         }
3190
3191         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3192
3193         return status;
3194 }
3195
3196 /*
3197   register a server id
3198  */
3199 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3200                       struct timeval timeout, 
3201                       struct ctdb_server_id *id)
3202 {
3203         TDB_DATA data;
3204         int32_t res;
3205         int ret;
3206
3207         data.dsize = sizeof(struct ctdb_server_id);
3208         data.dptr  = (unsigned char *)id;
3209
3210         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3211                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3212                         0, data, NULL,
3213                         NULL, &res, &timeout, NULL);
3214         if (ret != 0 || res != 0) {
3215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3216                 return -1;
3217         }
3218
3219         return 0;
3220 }
3221
3222 /*
3223   unregister a server id
3224  */
3225 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3226                       struct timeval timeout, 
3227                       struct ctdb_server_id *id)
3228 {
3229         TDB_DATA data;
3230         int32_t res;
3231         int ret;
3232
3233         data.dsize = sizeof(struct ctdb_server_id);
3234         data.dptr  = (unsigned char *)id;
3235
3236         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3237                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3238                         0, data, NULL,
3239                         NULL, &res, &timeout, NULL);
3240         if (ret != 0 || res != 0) {
3241                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3242                 return -1;
3243         }
3244
3245         return 0;
3246 }
3247
3248
3249 /*
3250   check if a server id exists
3251
3252   if a server id does exist, return *status == 1, otherwise *status == 0
3253  */
3254 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3255                       struct timeval timeout, 
3256                       uint32_t destnode,
3257                       struct ctdb_server_id *id,
3258                       uint32_t *status)
3259 {
3260         TDB_DATA data;
3261         int32_t res;
3262         int ret;
3263
3264         data.dsize = sizeof(struct ctdb_server_id);
3265         data.dptr  = (unsigned char *)id;
3266
3267         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3268                         0, data, NULL,
3269                         NULL, &res, &timeout, NULL);
3270         if (ret != 0) {
3271                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3272                 return -1;
3273         }
3274
3275         if (res) {
3276                 *status = 1;
3277         } else {
3278                 *status = 0;
3279         }
3280
3281         return 0;
3282 }
3283
3284 /*
3285    get the list of server ids that are registered on a node
3286 */
3287 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3288                 TALLOC_CTX *mem_ctx,
3289                 struct timeval timeout, uint32_t destnode, 
3290                 struct ctdb_server_id_list **svid_list)
3291 {
3292         int ret;
3293         TDB_DATA outdata;
3294         int32_t res;
3295
3296         ret = ctdb_control(ctdb, destnode, 0, 
3297                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3298                            mem_ctx, &outdata, &res, &timeout, NULL);
3299         if (ret != 0 || res != 0) {
3300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3301                 return -1;
3302         }
3303
3304         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3305                     
3306         return 0;
3307 }
3308
3309 /*
3310   initialise the ctdb daemon for client applications
3311
3312   NOTE: In current code the daemon does not fork. This is for testing purposes only
3313   and to simplify the code.
3314 */
3315 struct ctdb_context *ctdb_init(struct event_context *ev)
3316 {
3317         int ret;
3318         struct ctdb_context *ctdb;
3319
3320         ctdb = talloc_zero(ev, struct ctdb_context);
3321         if (ctdb == NULL) {
3322                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3323                 return NULL;
3324         }
3325         ctdb->ev  = ev;
3326         ctdb->idr = idr_init(ctdb);
3327         /* Wrap early to exercise code. */
3328         ctdb->lastid = INT_MAX-200;
3329         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3330
3331         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3332         if (ret != 0) {
3333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3334                 talloc_free(ctdb);
3335                 return NULL;
3336         }
3337
3338         ctdb->statistics.statistics_start_time = timeval_current();
3339
3340         return ctdb;
3341 }
3342
3343
3344 /*
3345   set some ctdb flags
3346 */
3347 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3348 {
3349         ctdb->flags |= flags;
3350 }
3351
3352 /*
3353   setup the local socket name
3354 */
3355 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3356 {
3357         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3358         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3359
3360         return 0;
3361 }
3362
3363 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3364 {
3365         return ctdb->daemon.name;
3366 }
3367
3368 /*
3369   return the pnn of this node
3370 */
3371 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3372 {
3373         return ctdb->pnn;
3374 }
3375
3376
3377 /*
3378   get the uptime of a remote node
3379  */
3380 struct ctdb_client_control_state *
3381 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3382 {
3383         return ctdb_control_send(ctdb, destnode, 0, 
3384                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3385                            mem_ctx, &timeout, NULL);
3386 }
3387
3388 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3389 {
3390         int ret;
3391         int32_t res;
3392         TDB_DATA outdata;
3393
3394         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3395         if (ret != 0 || res != 0) {
3396                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3397                 return -1;
3398         }
3399
3400         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3401
3402         return 0;
3403 }
3404
3405 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3406 {
3407         struct ctdb_client_control_state *state;
3408
3409         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3410         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3411 }
3412
3413 /*
3414   send a control to execute the "recovered" event script on a node
3415  */
3416 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3417 {
3418         int ret;
3419         int32_t status;
3420
3421         ret = ctdb_control(ctdb, destnode, 0, 
3422                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3423                            NULL, NULL, &status, &timeout, NULL);
3424         if (ret != 0 || status != 0) {
3425                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3426                 return -1;
3427         }
3428
3429         return 0;
3430 }
3431
3432 /* 
3433   callback for the async helpers used when sending the same control
3434   to multiple nodes in parallell.
3435 */
3436 static void async_callback(struct ctdb_client_control_state *state)
3437 {
3438         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3439         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3440         int ret;
3441         TDB_DATA outdata;
3442         int32_t res = -1;
3443         uint32_t destnode = state->c->hdr.destnode;
3444
3445         /* one more node has responded with recmode data */
3446         data->count--;
3447
3448         /* if we failed to push the db, then return an error and let
3449            the main loop try again.
3450         */
3451         if (state->state != CTDB_CONTROL_DONE) {
3452                 if ( !data->dont_log_errors) {
3453                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3454                 }
3455                 data->fail_count++;
3456                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3457                         res = -ETIME;
3458                 } else {
3459                         res = -1;
3460                 }
3461                 if (data->fail_callback) {
3462                         data->fail_callback(ctdb, destnode, res, outdata,
3463                                         data->callback_data);
3464                 }
3465                 return;
3466         }
3467         
3468         state->async.fn = NULL;
3469
3470         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3471         if ((ret != 0) || (res != 0)) {
3472                 if ( !data->dont_log_errors) {
3473                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3474                 }
3475                 data->fail_count++;
3476                 if (data->fail_callback) {
3477                         data->fail_callback(ctdb, destnode, res, outdata,
3478                                         data->callback_data);
3479                 }
3480         }
3481         if ((ret == 0) && (data->callback != NULL)) {
3482                 data->callback(ctdb, destnode, res, outdata,
3483                                         data->callback_data);
3484         }
3485 }
3486
3487
3488 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3489 {
3490         /* set up the callback functions */
3491         state->async.fn = async_callback;
3492         state->async.private_data = data;
3493         
3494         /* one more control to wait for to complete */
3495         data->count++;
3496 }
3497
3498
3499 /* wait for up to the maximum number of seconds allowed
3500    or until all nodes we expect a response from has replied
3501 */
3502 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3503 {
3504         while (data->count > 0) {
3505                 event_loop_once(ctdb->ev);
3506         }
3507         if (data->fail_count != 0) {
3508                 if (!data->dont_log_errors) {
3509                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3510                                  data->fail_count));
3511                 }
3512                 return -1;
3513         }
3514         return 0;
3515 }
3516
3517
3518 /* 
3519    perform a simple control on the listed nodes
3520    The control cannot return data
3521  */
3522 int ctdb_client_async_control(struct ctdb_context *ctdb,
3523                                 enum ctdb_controls opcode,
3524                                 uint32_t *nodes,
3525                                 uint64_t srvid,
3526                                 struct timeval timeout,
3527                                 bool dont_log_errors,
3528                                 TDB_DATA data,
3529                                 client_async_callback client_callback,
3530                                 client_async_callback fail_callback,
3531                                 void *callback_data)
3532 {
3533         struct client_async_data *async_data;
3534         struct ctdb_client_control_state *state;
3535         int j, num_nodes;
3536
3537         async_data = talloc_zero(ctdb, struct client_async_data);
3538         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3539         async_data->dont_log_errors = dont_log_errors;
3540         async_data->callback = client_callback;
3541         async_data->fail_callback = fail_callback;
3542         async_data->callback_data = callback_data;
3543         async_data->opcode        = opcode;
3544
3545         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3546
3547         /* loop over all nodes and send an async control to each of them */
3548         for (j=0; j<num_nodes; j++) {
3549                 uint32_t pnn = nodes[j];
3550
3551                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3552                                           0, data, async_data, &timeout, NULL);
3553                 if (state == NULL) {
3554                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3555                         talloc_free(async_data);
3556                         return -1;
3557                 }
3558                 
3559                 ctdb_client_async_add(async_data, state);
3560         }
3561
3562         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3563                 talloc_free(async_data);
3564                 return -1;
3565         }
3566
3567         talloc_free(async_data);
3568         return 0;
3569 }
3570
3571 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3572                                 struct ctdb_vnn_map *vnn_map,
3573                                 TALLOC_CTX *mem_ctx,
3574                                 bool include_self)
3575 {
3576         int i, j, num_nodes;
3577         uint32_t *nodes;
3578
3579         for (i=num_nodes=0;i<vnn_map->size;i++) {
3580                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3581                         continue;
3582                 }
3583                 num_nodes++;
3584         } 
3585
3586         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3587         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3588
3589         for (i=j=0;i<vnn_map->size;i++) {
3590                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3591                         continue;
3592                 }
3593                 nodes[j++] = vnn_map->map[i];
3594         } 
3595
3596         return nodes;
3597 }
3598
3599 /* Get list of nodes not including those with flags specified by mask.
3600  * If exclude_pnn is not -1 then exclude that pnn from the list.
3601  */
3602 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3603                         struct ctdb_node_map *node_map,
3604                         TALLOC_CTX *mem_ctx,
3605                         uint32_t mask,
3606                         int exclude_pnn)
3607 {
3608         int i, j, num_nodes;
3609         uint32_t *nodes;
3610
3611         for (i=num_nodes=0;i<node_map->num;i++) {
3612                 if (node_map->nodes[i].flags & mask) {
3613                         continue;
3614                 }
3615                 if (node_map->nodes[i].pnn == exclude_pnn) {
3616                         continue;
3617                 }
3618                 num_nodes++;
3619         } 
3620
3621         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3622         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3623
3624         for (i=j=0;i<node_map->num;i++) {
3625                 if (node_map->nodes[i].flags & mask) {
3626                         continue;
3627                 }
3628                 if (node_map->nodes[i].pnn == exclude_pnn) {
3629                         continue;
3630                 }
3631                 nodes[j++] = node_map->nodes[i].pnn;
3632         } 
3633
3634         return nodes;
3635 }
3636
3637 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3638                                 struct ctdb_node_map *node_map,
3639                                 TALLOC_CTX *mem_ctx,
3640                                 bool include_self)
3641 {
3642         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3643                              include_self ? -1 : ctdb->pnn);
3644 }
3645
3646 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3647                                 struct ctdb_node_map *node_map,
3648                                 TALLOC_CTX *mem_ctx,
3649                                 bool include_self)
3650 {
3651         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3652                              include_self ? -1 : ctdb->pnn);
3653 }
3654
3655 /* 
3656   this is used to test if a pnn lock exists and if it exists will return
3657   the number of connections that pnn has reported or -1 if that recovery
3658   daemon is not running.
3659 */
3660 int
3661 ctdb_read_pnn_lock(int fd, int32_t pnn)
3662 {
3663         struct flock lock;
3664         char c;
3665
3666         lock.l_type = F_WRLCK;
3667         lock.l_whence = SEEK_SET;
3668         lock.l_start = pnn;
3669         lock.l_len = 1;
3670         lock.l_pid = 0;
3671
3672         if (fcntl(fd, F_GETLK, &lock) != 0) {
3673                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3674                 return -1;
3675         }
3676
3677         if (lock.l_type == F_UNLCK) {
3678                 return -1;
3679         }
3680
3681         if (pread(fd, &c, 1, pnn) == -1) {
3682                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3683                 return -1;
3684         }
3685
3686         return c;
3687 }
3688
3689 /*
3690   get capabilities of a remote node
3691  */
3692 struct ctdb_client_control_state *
3693 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3694 {
3695         return ctdb_control_send(ctdb, destnode, 0, 
3696                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3697                            mem_ctx, &timeout, NULL);
3698 }
3699
3700 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3701 {
3702         int ret;
3703         int32_t res;
3704         TDB_DATA outdata;
3705
3706         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3707         if ( (ret != 0) || (res != 0) ) {
3708                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3709                 return -1;
3710         }
3711
3712         if (capabilities) {
3713                 *capabilities = *((uint32_t *)outdata.dptr);
3714         }
3715
3716         return 0;
3717 }
3718
3719 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3720 {
3721         struct ctdb_client_control_state *state;
3722         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3723         int ret;
3724
3725         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3726         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3727         talloc_free(tmp_ctx);
3728         return ret;
3729 }
3730
3731 /**
3732  * check whether a transaction is active on a given db on a given node
3733  */
3734 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3735                                      uint32_t destnode,
3736                                      uint32_t db_id)
3737 {
3738         int32_t status;
3739         int ret;
3740         TDB_DATA indata;
3741
3742         indata.dptr = (uint8_t *)&db_id;
3743         indata.dsize = sizeof(db_id);
3744
3745         ret = ctdb_control(ctdb, destnode, 0,
3746                            CTDB_CONTROL_TRANS2_ACTIVE,
3747                            0, indata, NULL, NULL, &status,
3748                            NULL, NULL);
3749
3750         if (ret != 0) {
3751                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3752                 return -1;
3753         }
3754
3755         return status;
3756 }
3757
3758
3759 struct ctdb_transaction_handle {
3760         struct ctdb_db_context *ctdb_db;
3761         bool in_replay;
3762         /*
3763          * we store the reads and writes done under a transaction:
3764          * - one list stores both reads and writes (m_all),
3765          * - the other just writes (m_write)
3766          */
3767         struct ctdb_marshall_buffer *m_all;
3768         struct ctdb_marshall_buffer *m_write;
3769 };
3770
3771 /* start a transaction on a database */
3772 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3773 {
3774         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3775         return 0;
3776 }
3777
3778 /* start a transaction on a database */
3779 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3780 {
3781         struct ctdb_record_handle *rh;
3782         TDB_DATA key;
3783         TDB_DATA data;
3784         struct ctdb_ltdb_header header;
3785         TALLOC_CTX *tmp_ctx;
3786         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3787         int ret;
3788         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3789         pid_t pid;
3790         int32_t status;
3791
3792         key.dptr = discard_const(keyname);
3793         key.dsize = strlen(keyname);
3794
3795         if (!ctdb_db->persistent) {
3796                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3797                 return -1;
3798         }
3799
3800 again:
3801         tmp_ctx = talloc_new(h);
3802
3803         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3804         if (rh == NULL) {
3805                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3806                 talloc_free(tmp_ctx);
3807                 return -1;
3808         }
3809
3810         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3811                                               CTDB_CURRENT_NODE,
3812                                               ctdb_db->db_id);
3813         if (status == 1) {
3814                 unsigned long int usec = (1000 + random()) % 100000;
3815                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3816                                     "on db_id[0x%08x]. waiting for %lu "
3817                                     "microseconds\n",
3818                                     ctdb_db->db_id, usec));
3819                 talloc_free(tmp_ctx);
3820                 usleep(usec);
3821                 goto again;
3822         }
3823
3824         /*
3825          * store the pid in the database:
3826          * it is not enough that the node is dmaster...
3827          */
3828         pid = getpid();
3829         data.dptr = (unsigned char *)&pid;
3830         data.dsize = sizeof(pid_t);
3831         rh->header.rsn++;
3832         rh->header.dmaster = ctdb_db->ctdb->pnn;
3833         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3834         if (ret != 0) {
3835                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3836                                   "transaction record\n"));
3837                 talloc_free(tmp_ctx);
3838                 return -1;
3839         }
3840
3841         talloc_free(rh);
3842
3843         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3844         if (ret != 0) {
3845                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3846                 talloc_free(tmp_ctx);
3847                 return -1;
3848         }
3849
3850         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3851         if (ret != 0) {
3852                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3853                                  "lock record inside transaction\n"));
3854                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3855                 talloc_free(tmp_ctx);
3856                 goto again;
3857         }
3858
3859         if (header.dmaster != ctdb_db->ctdb->pnn) {
3860                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3861                                    "transaction lock record\n"));
3862                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3863                 talloc_free(tmp_ctx);
3864                 goto again;
3865         }
3866
3867         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3868                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3869                                     "the transaction lock record\n"));
3870                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3871                 talloc_free(tmp_ctx);
3872                 goto again;
3873         }
3874
3875         talloc_free(tmp_ctx);
3876
3877         return 0;
3878 }
3879
3880
3881 /* start a transaction on a database */
3882 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3883                                                        TALLOC_CTX *mem_ctx)
3884 {
3885         struct ctdb_transaction_handle *h;
3886         int ret;
3887
3888         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3889         if (h == NULL) {
3890                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3891                 return NULL;
3892         }
3893
3894         h->ctdb_db = ctdb_db;
3895
3896         ret = ctdb_transaction_fetch_start(h);
3897         if (ret != 0) {
3898                 talloc_free(h);
3899                 return NULL;
3900         }
3901
3902         talloc_set_destructor(h, ctdb_transaction_destructor);
3903
3904         return h;
3905 }
3906
3907
3908
3909 /*
3910   fetch a record inside a transaction
3911  */
3912 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3913                            TALLOC_CTX *mem_ctx, 
3914                            TDB_DATA key, TDB_DATA *data)
3915 {
3916         struct ctdb_ltdb_header header;
3917         int ret;
3918
3919         ZERO_STRUCT(header);
3920
3921         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3922         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3923                 /* record doesn't exist yet */
3924                 *data = tdb_null;
3925                 ret = 0;
3926         }
3927         
3928         if (ret != 0) {
3929                 return ret;
3930         }
3931
3932         if (!h->in_replay) {
3933                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3934                 if (h->m_all == NULL) {
3935                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3936                         return -1;
3937                 }
3938         }
3939
3940         return 0;
3941 }
3942
3943 /*
3944   stores a record inside a transaction
3945  */
3946 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3947                            TDB_DATA key, TDB_DATA data)
3948 {
3949         TALLOC_CTX *tmp_ctx = talloc_new(h);
3950         struct ctdb_ltdb_header header;
3951         TDB_DATA olddata;
3952         int ret;
3953
3954         ZERO_STRUCT(header);
3955
3956         /* we need the header so we can update the RSN */
3957         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3958         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3959                 /* the record doesn't exist - create one with us as dmaster.
3960                    This is only safe because we are in a transaction and this
3961                    is a persistent database */
3962                 ZERO_STRUCT(header);
3963         } else if (ret != 0) {
3964                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3965                 talloc_free(tmp_ctx);
3966                 return ret;
3967         }
3968
3969         if (data.dsize == olddata.dsize &&
3970             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3971                 /* save writing the same data */
3972                 talloc_free(tmp_ctx);
3973                 return 0;
3974         }
3975
3976         header.dmaster = h->ctdb_db->ctdb->pnn;
3977         header.rsn++;
3978
3979         if (!h->in_replay) {
3980                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3981                 if (h->m_all == NULL) {
3982                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3983                         talloc_free(tmp_ctx);
3984                         return -1;
3985                 }
3986         }               
3987
3988         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3989         if (h->m_write == NULL) {
3990                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3991                 talloc_free(tmp_ctx);
3992                 return -1;
3993         }
3994         
3995         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3996
3997         talloc_free(tmp_ctx);
3998         
3999         return ret;
4000 }
4001
4002 /*
4003   replay a transaction
4004  */
4005 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
4006 {
4007         int ret, i;
4008         struct ctdb_rec_data *rec = NULL;
4009
4010         h->in_replay = true;
4011         talloc_free(h->m_write);
4012         h->m_write = NULL;
4013
4014         ret = ctdb_transaction_fetch_start(h);
4015         if (ret != 0) {
4016                 return ret;
4017         }
4018
4019         for (i=0;i<h->m_all->count;i++) {
4020                 TDB_DATA key, data;
4021
4022                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
4023                 if (rec == NULL) {
4024                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
4025                         goto failed;
4026                 }
4027
4028                 if (rec->reqid == 0) {
4029                         /* its a store */
4030                         if (ctdb_transaction_store(h, key, data) != 0) {
4031                                 goto failed;
4032                         }
4033                 } else {
4034                         TDB_DATA data2;
4035                         TALLOC_CTX *tmp_ctx = talloc_new(h);
4036
4037                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
4038                                 talloc_free(tmp_ctx);
4039                                 goto failed;
4040                         }
4041                         if (data2.dsize != data.dsize ||
4042                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
4043                                 /* the record has changed on us - we have to give up */
4044                                 talloc_free(tmp_ctx);
4045                                 goto failed;
4046                         }
4047                         talloc_free(tmp_ctx);
4048                 }
4049         }
4050         
4051         return 0;
4052
4053 failed:
4054         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4055         return -1;
4056 }
4057
4058
4059 /*
4060   commit a transaction
4061  */
4062 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4063 {
4064         int ret, retries=0;
4065         int32_t status;
4066         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
4067         struct timeval timeout;
4068         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
4069
4070         talloc_set_destructor(h, NULL);
4071
4072         /* our commit strategy is quite complex.
4073
4074            - we first try to commit the changes to all other nodes
4075
4076            - if that works, then we commit locally and we are done
4077
4078            - if a commit on another node fails, then we need to cancel
4079              the transaction, then restart the transaction (thus
4080              opening a window of time for a pending recovery to
4081              complete), then replay the transaction, checking all the
4082              reads and writes (checking that reads give the same data,
4083              and writes succeed). Then we retry the transaction to the
4084              other nodes
4085         */
4086
4087 again:
4088         if (h->m_write == NULL) {
4089                 /* no changes were made */
4090                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4091                 talloc_free(h);
4092                 return 0;
4093         }
4094
4095         /* tell ctdbd to commit to the other nodes */
4096         timeout = timeval_current_ofs(1, 0);
4097         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4098                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4099                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4100                            &timeout, NULL);
4101         if (ret != 0 || status != 0) {
4102                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4103                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4104                                      ", retrying after 1 second...\n",
4105                                      (retries==0)?"":"retry "));
4106                 sleep(1);
4107
4108                 if (ret != 0) {
4109                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4110                 } else {
4111                         /* work out what error code we will give if we 
4112                            have to fail the operation */
4113                         switch ((enum ctdb_trans2_commit_error)status) {
4114                         case CTDB_TRANS2_COMMIT_SUCCESS:
4115                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4116                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4117                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4118                                 break;
4119                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4120                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4121                                 break;
4122                         }
4123                 }
4124
4125                 if (++retries == 100) {
4126                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4127                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4128                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4129                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4130                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4131                         talloc_free(h);
4132                         return -1;
4133                 }               
4134
4135                 if (ctdb_replay_transaction(h) != 0) {
4136                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4137                                           "transaction on db 0x%08x, "
4138                                           "failure control =%u\n",
4139                                           h->ctdb_db->db_id,
4140                                           (unsigned)failure_control));
4141                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4142                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4143                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4144                         talloc_free(h);
4145                         return -1;
4146                 }
4147                 goto again;
4148         } else {
4149                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4150         }
4151
4152         /* do the real commit locally */
4153         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4154         if (ret != 0) {
4155                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4156                                   "on db id 0x%08x locally, "
4157                                   "failure_control=%u\n",
4158                                   h->ctdb_db->db_id,
4159                                   (unsigned)failure_control));
4160                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4161                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4162                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4163                 talloc_free(h);
4164                 return ret;
4165         }
4166
4167         /* tell ctdbd that we are finished with our local commit */
4168         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4169                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4170                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4171         talloc_free(h);
4172         return 0;
4173 }
4174
4175 /*
4176   recovery daemon ping to main daemon
4177  */
4178 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4179 {
4180         int ret;
4181         int32_t res;
4182
4183         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4184                            ctdb, NULL, &res, NULL, NULL);
4185         if (ret != 0 || res != 0) {
4186                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4187                 return -1;
4188         }
4189
4190         return 0;
4191 }
4192
4193 /* When forking the main daemon and the child process needs to connect
4194  * back to the daemon as a client process, this function can be used
4195  * to change the ctdb context from daemon into client mode.  The child
4196  * process must be created using ctdb_fork() and not fork() -
4197  * ctdb_fork() does some necessary housekeeping.
4198  */
4199 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4200 {
4201         int ret;
4202         va_list ap;
4203
4204         /* Add extra information so we can identify this in the logs */
4205         va_start(ap, fmt);
4206         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4207         va_end(ap);
4208
4209         /* get a new event context */
4210         ctdb->ev = event_context_init(ctdb);
4211         tevent_loop_allow_nesting(ctdb->ev);
4212
4213         /* Connect to main CTDB daemon */
4214         ret = ctdb_socket_connect(ctdb);
4215         if (ret != 0) {
4216                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4217                 return -1;
4218         }
4219
4220         ctdb->can_send_controls = true;
4221
4222         return 0;
4223 }
4224
4225 /*
4226   get the status of running the monitor eventscripts: NULL means never run.
4227  */
4228 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4229                 struct timeval timeout, uint32_t destnode, 
4230                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4231                 struct ctdb_scripts_wire **scripts)
4232 {
4233         int ret;
4234         TDB_DATA outdata, indata;
4235         int32_t res;
4236         uint32_t uinttype = type;
4237
4238         indata.dptr = (uint8_t *)&uinttype;
4239         indata.dsize = sizeof(uinttype);
4240
4241         ret = ctdb_control(ctdb, destnode, 0, 
4242                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4243                            mem_ctx, &outdata, &res, &timeout, NULL);
4244         if (ret != 0 || res != 0) {
4245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4246                 return -1;
4247         }
4248
4249         if (outdata.dsize == 0) {
4250                 *scripts = NULL;
4251         } else {
4252                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4253                 talloc_free(outdata.dptr);
4254         }
4255                     
4256         return 0;
4257 }
4258
4259 /*
4260   tell the main daemon how long it took to lock the reclock file
4261  */
4262 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4263 {
4264         int ret;
4265         int32_t res;
4266         TDB_DATA data;
4267
4268         data.dptr = (uint8_t *)&latency;
4269         data.dsize = sizeof(latency);
4270
4271         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4272                            ctdb, NULL, &res, NULL, NULL);
4273         if (ret != 0 || res != 0) {
4274                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4275                 return -1;
4276         }
4277
4278         return 0;
4279 }
4280
4281 /*
4282   get the name of the reclock file
4283  */
4284 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4285                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4286                          const char **name)
4287 {
4288         int ret;
4289         int32_t res;
4290         TDB_DATA data;
4291
4292         ret = ctdb_control(ctdb, destnode, 0, 
4293                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4294                            mem_ctx, &data, &res, &timeout, NULL);
4295         if (ret != 0 || res != 0) {
4296                 return -1;
4297         }
4298
4299         if (data.dsize == 0) {
4300                 *name = NULL;
4301         } else {
4302                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4303         }
4304         talloc_free(data.dptr);
4305
4306         return 0;
4307 }
4308
4309 /*
4310   set the reclock filename for a node
4311  */
4312 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4313 {
4314         int ret;
4315         TDB_DATA data;
4316         int32_t res;
4317
4318         if (reclock == NULL) {
4319                 data.dsize = 0;
4320                 data.dptr  = NULL;
4321         } else {
4322                 data.dsize = strlen(reclock) + 1;
4323                 data.dptr  = discard_const(reclock);
4324         }
4325
4326         ret = ctdb_control(ctdb, destnode, 0, 
4327                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4328                            NULL, NULL, &res, &timeout, NULL);
4329         if (ret != 0 || res != 0) {
4330                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4331                 return -1;
4332         }
4333
4334         return 0;
4335 }
4336
4337 /*
4338   stop a node
4339  */
4340 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4341 {
4342         int ret;
4343         int32_t res;
4344
4345         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4346                            ctdb, NULL, &res, &timeout, NULL);
4347         if (ret != 0 || res != 0) {
4348                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4349                 return -1;
4350         }
4351
4352         return 0;
4353 }
4354
4355 /*
4356   continue a node
4357  */
4358 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4359 {
4360         int ret;
4361
4362         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4363                            ctdb, NULL, NULL, &timeout, NULL);
4364         if (ret != 0) {
4365                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4366                 return -1;
4367         }
4368
4369         return 0;
4370 }
4371
4372 /*
4373   set the natgw state for a node
4374  */
4375 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4376 {
4377         int ret;
4378         TDB_DATA data;
4379         int32_t res;
4380
4381         data.dsize = sizeof(natgwstate);
4382         data.dptr  = (uint8_t *)&natgwstate;
4383
4384         ret = ctdb_control(ctdb, destnode, 0, 
4385                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4386                            NULL, NULL, &res, &timeout, NULL);
4387         if (ret != 0 || res != 0) {
4388                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4389                 return -1;
4390         }
4391
4392         return 0;
4393 }
4394
4395 /*
4396   set the lmaster role for a node
4397  */
4398 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4399 {
4400         int ret;
4401         TDB_DATA data;
4402         int32_t res;
4403
4404         data.dsize = sizeof(lmasterrole);
4405         data.dptr  = (uint8_t *)&lmasterrole;
4406
4407         ret = ctdb_control(ctdb, destnode, 0, 
4408                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4409                            NULL, NULL, &res, &timeout, NULL);
4410         if (ret != 0 || res != 0) {
4411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4412                 return -1;
4413         }
4414
4415         return 0;
4416 }
4417
4418 /*
4419   set the recmaster role for a node
4420  */
4421 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4422 {
4423         int ret;
4424         TDB_DATA data;
4425         int32_t res;
4426
4427         data.dsize = sizeof(recmasterrole);
4428         data.dptr  = (uint8_t *)&recmasterrole;
4429
4430         ret = ctdb_control(ctdb, destnode, 0, 
4431                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4432                            NULL, NULL, &res, &timeout, NULL);
4433         if (ret != 0 || res != 0) {
4434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4435                 return -1;
4436         }
4437
4438         return 0;
4439 }
4440
4441 /* enable an eventscript
4442  */
4443 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4444 {
4445         int ret;
4446         TDB_DATA data;
4447         int32_t res;
4448
4449         data.dsize = strlen(script) + 1;
4450         data.dptr  = discard_const(script);
4451
4452         ret = ctdb_control(ctdb, destnode, 0, 
4453                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4454                            NULL, NULL, &res, &timeout, NULL);
4455         if (ret != 0 || res != 0) {
4456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4457                 return -1;
4458         }
4459
4460         return 0;
4461 }
4462
4463 /* disable an eventscript
4464  */
4465 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4466 {
4467         int ret;
4468         TDB_DATA data;
4469         int32_t res;
4470
4471         data.dsize = strlen(script) + 1;
4472         data.dptr  = discard_const(script);
4473
4474         ret = ctdb_control(ctdb, destnode, 0, 
4475                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4476                            NULL, NULL, &res, &timeout, NULL);
4477         if (ret != 0 || res != 0) {
4478                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4479                 return -1;
4480         }
4481
4482         return 0;
4483 }
4484
4485
4486 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4487 {
4488         int ret;
4489         TDB_DATA data;
4490         int32_t res;
4491
4492         data.dsize = sizeof(*bantime);
4493         data.dptr  = (uint8_t *)bantime;
4494
4495         ret = ctdb_control(ctdb, destnode, 0, 
4496                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4497                            NULL, NULL, &res, &timeout, NULL);
4498         if (ret != 0 || res != 0) {
4499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4500                 return -1;
4501         }
4502
4503         return 0;
4504 }
4505
4506
4507 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4508 {
4509         int ret;
4510         TDB_DATA outdata;
4511         int32_t res;
4512         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4513
4514         ret = ctdb_control(ctdb, destnode, 0, 
4515                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4516                            tmp_ctx, &outdata, &res, &timeout, NULL);
4517         if (ret != 0 || res != 0) {
4518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4519                 talloc_free(tmp_ctx);
4520                 return -1;
4521         }
4522
4523         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4524         talloc_free(tmp_ctx);
4525
4526         return 0;
4527 }
4528
4529
4530 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4531 {
4532         int ret;
4533         int32_t res;
4534         TDB_DATA data;
4535         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4536
4537         data.dptr = (uint8_t*)db_prio;
4538         data.dsize = sizeof(*db_prio);
4539
4540         ret = ctdb_control(ctdb, destnode, 0, 
4541                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4542                            tmp_ctx, NULL, &res, &timeout, NULL);
4543         if (ret != 0 || res != 0) {
4544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4545                 talloc_free(tmp_ctx);
4546                 return -1;
4547         }
4548
4549         talloc_free(tmp_ctx);
4550
4551         return 0;
4552 }
4553
4554 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4555 {
4556         int ret;
4557         int32_t res;
4558         TDB_DATA data;
4559         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4560
4561         data.dptr = (uint8_t*)&db_id;
4562         data.dsize = sizeof(db_id);
4563
4564         ret = ctdb_control(ctdb, destnode, 0, 
4565                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4566                            tmp_ctx, NULL, &res, &timeout, NULL);
4567         if (ret != 0 || res < 0) {
4568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4569                 talloc_free(tmp_ctx);
4570                 return -1;
4571         }
4572
4573         if (priority) {
4574                 *priority = res;
4575         }
4576
4577         talloc_free(tmp_ctx);
4578
4579         return 0;
4580 }
4581
4582 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4583 {
4584         int ret;
4585         TDB_DATA outdata;
4586         int32_t res;
4587
4588         ret = ctdb_control(ctdb, destnode, 0, 
4589                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4590                            mem_ctx, &outdata, &res, &timeout, NULL);
4591         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4593                 return -1;
4594         }
4595
4596         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4597         talloc_free(outdata.dptr);
4598                     
4599         return 0;
4600 }
4601
4602 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4603 {
4604         if (h == NULL) {
4605                 return NULL;
4606         }
4607
4608         return &h->header;
4609 }
4610
4611
4612 struct ctdb_client_control_state *
4613 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4614 {
4615         struct ctdb_client_control_state *handle;
4616         struct ctdb_marshall_buffer *m;
4617         struct ctdb_rec_data *rec;
4618         TDB_DATA outdata;
4619
4620         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4621         if (m == NULL) {
4622                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4623                 return NULL;
4624         }
4625
4626         m->db_id = ctdb_db->db_id;
4627
4628         rec = ctdb_marshall_record(m, 0, key, header, data);
4629         if (rec == NULL) {
4630                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4631                 talloc_free(m);
4632                 return NULL;
4633         }
4634         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4635         if (m == NULL) {
4636                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4637                 talloc_free(m);
4638                 return NULL;
4639         }
4640         m->count++;
4641         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4642
4643
4644         outdata.dptr = (uint8_t *)m;
4645         outdata.dsize = talloc_get_size(m);
4646
4647         handle = ctdb_control_send(ctdb, destnode, 0, 
4648                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4649                            mem_ctx, &timeout, NULL);
4650         talloc_free(m);
4651         return handle;
4652 }
4653
4654 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4655 {
4656         int ret;
4657         int32_t res;
4658
4659         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4660         if ( (ret != 0) || (res != 0) ){
4661                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4662                 return -1;
4663         }
4664
4665         return 0;
4666 }
4667
4668 int
4669 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4670 {
4671         struct ctdb_client_control_state *state;
4672
4673         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4674         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4675 }
4676
4677
4678
4679
4680
4681
4682 /*
4683   set a database to be readonly
4684  */
4685 struct ctdb_client_control_state *
4686 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4687 {
4688         TDB_DATA data;
4689
4690         data.dptr = (uint8_t *)&dbid;
4691         data.dsize = sizeof(dbid);
4692
4693         return ctdb_control_send(ctdb, destnode, 0, 
4694                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4695                            ctdb, NULL, NULL);
4696 }
4697
4698 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4699 {
4700         int ret;
4701         int32_t res;
4702
4703         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4704         if (ret != 0 || res != 0) {
4705           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4706                 return -1;
4707         }
4708
4709         return 0;
4710 }
4711
4712 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4713 {
4714         struct ctdb_client_control_state *state;
4715
4716         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4717         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4718 }
4719
4720 /*
4721   set a database to be sticky
4722  */
4723 struct ctdb_client_control_state *
4724 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4725 {
4726         TDB_DATA data;
4727
4728         data.dptr = (uint8_t *)&dbid;
4729         data.dsize = sizeof(dbid);
4730
4731         return ctdb_control_send(ctdb, destnode, 0, 
4732                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4733                            ctdb, NULL, NULL);
4734 }
4735
4736 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4737 {
4738         int ret;
4739         int32_t res;
4740
4741         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4742         if (ret != 0 || res != 0) {
4743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4744                 return -1;
4745         }
4746
4747         return 0;
4748 }
4749
4750 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4751 {
4752         struct ctdb_client_control_state *state;
4753
4754         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4755         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4756 }