07b17d016a488110e835ba6c0df7f4c96e383043
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb_wrap/tdb_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_PROTOCOL;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, bool updatetdb)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88         c->header = header;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         /* we need to force the record to be written out if this was a remote access */
106         if (c->new_data == NULL) {
107                 c->new_data = &c->record_data;
108         }
109
110         if (c->new_data && updatetdb) {
111                 /* XXX check that we always have the lock here? */
112                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
113                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
114                         talloc_free(c);
115                         return -1;
116                 }
117         }
118
119         if (c->reply_data) {
120                 call->reply_data = *c->reply_data;
121
122                 talloc_steal(call, call->reply_data.dptr);
123                 talloc_set_name_const(call->reply_data.dptr, __location__);
124         } else {
125                 call->reply_data.dptr = NULL;
126                 call->reply_data.dsize = 0;
127         }
128         call->status = c->status;
129
130         talloc_free(c);
131
132         return 0;
133 }
134
135
136 /*
137   queue a packet for sending from client to daemon
138 */
139 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
140 {
141         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
142 }
143
144
145 /*
146   called when a CTDB_REPLY_CALL packet comes in in the client
147
148   This packet comes in response to a CTDB_REQ_CALL request packet. It
149   contains any reply data from the call
150 */
151 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 {
153         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
154         struct ctdb_client_call_state *state;
155
156         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
157         if (state == NULL) {
158                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
159                 return;
160         }
161
162         if (hdr->reqid != state->reqid) {
163                 /* we found a record  but it was the wrong one */
164                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
165                 return;
166         }
167
168         state->call->reply_data.dptr = c->data;
169         state->call->reply_data.dsize = c->datalen;
170         state->call->status = c->status;
171
172         talloc_steal(state, c);
173
174         state->state = CTDB_CALL_DONE;
175
176         if (state->async.fn) {
177                 state->async.fn(state);
178         }
179 }
180
181 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
182
183 /*
184   this is called in the client, when data comes in from the daemon
185  */
186 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
187 {
188         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
189         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
190         TALLOC_CTX *tmp_ctx;
191
192         /* place the packet as a child of a tmp_ctx. We then use
193            talloc_free() below to free it. If any of the calls want
194            to keep it, then they will steal it somewhere else, and the
195            talloc_free() will be a no-op */
196         tmp_ctx = talloc_new(ctdb);
197         talloc_steal(tmp_ctx, hdr);
198
199         if (cnt == 0) {
200                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
201                 exit(1);
202         }
203
204         if (cnt < sizeof(*hdr)) {
205                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
206                 goto done;
207         }
208         if (cnt != hdr->length) {
209                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
210                                (unsigned)hdr->length, (unsigned)cnt);
211                 goto done;
212         }
213
214         if (hdr->ctdb_magic != CTDB_MAGIC) {
215                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
216                 goto done;
217         }
218
219         if (hdr->ctdb_version != CTDB_PROTOCOL) {
220                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
221                 goto done;
222         }
223
224         switch (hdr->operation) {
225         case CTDB_REPLY_CALL:
226                 ctdb_client_reply_call(ctdb, hdr);
227                 break;
228
229         case CTDB_REQ_MESSAGE:
230                 ctdb_request_message(ctdb, hdr);
231                 break;
232
233         case CTDB_REPLY_CONTROL:
234                 ctdb_client_reply_control(ctdb, hdr);
235                 break;
236
237         default:
238                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
239         }
240
241 done:
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   connect to a unix domain socket
247 */
248 int ctdb_socket_connect(struct ctdb_context *ctdb)
249 {
250         struct sockaddr_un addr;
251
252         memset(&addr, 0, sizeof(addr));
253         addr.sun_family = AF_UNIX;
254         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
255
256         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
257         if (ctdb->daemon.sd == -1) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
259                 return -1;
260         }
261
262         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
263                 close(ctdb->daemon.sd);
264                 ctdb->daemon.sd = -1;
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
273                                               CTDB_DS_ALIGNMENT, 
274                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
275         return 0;
276 }
277
278
279 struct ctdb_record_handle {
280         struct ctdb_db_context *ctdb_db;
281         TDB_DATA key;
282         TDB_DATA *data;
283         struct ctdb_ltdb_header header;
284 };
285
286
287 /*
288   make a recv call to the local ctdb daemon - called from client context
289
290   This is called when the program wants to wait for a ctdb_call to complete and get the 
291   results. This call will block unless the call has already completed.
292 */
293 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
294 {
295         if (state == NULL) {
296                 return -1;
297         }
298
299         while (state->state < CTDB_CALL_DONE) {
300                 event_loop_once(state->ctdb_db->ctdb->ev);
301         }
302         if (state->state != CTDB_CALL_DONE) {
303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
304                 talloc_free(state);
305                 return -1;
306         }
307
308         if (state->call->reply_data.dsize) {
309                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
310                                                       state->call->reply_data.dptr,
311                                                       state->call->reply_data.dsize);
312                 call->reply_data.dsize = state->call->reply_data.dsize;
313         } else {
314                 call->reply_data.dptr = NULL;
315                 call->reply_data.dsize = 0;
316         }
317         call->status = state->call->status;
318         talloc_free(state);
319
320         return call->status;
321 }
322
323
324
325
326 /*
327   destroy a ctdb_call in client
328 */
329 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
330 {
331         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
332         return 0;
333 }
334
335 /*
336   construct an event driven local ctdb_call
337
338   this is used so that locally processed ctdb_call requests are processed
339   in an event driven manner
340 */
341 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
342                                                                   struct ctdb_call *call,
343                                                                   struct ctdb_ltdb_header *header,
344                                                                   TDB_DATA *data)
345 {
346         struct ctdb_client_call_state *state;
347         struct ctdb_context *ctdb = ctdb_db->ctdb;
348         int ret;
349
350         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
351         CTDB_NO_MEMORY_NULL(ctdb, state);
352         state->call = talloc_zero(state, struct ctdb_call);
353         CTDB_NO_MEMORY_NULL(ctdb, state->call);
354
355         talloc_steal(state, data->dptr);
356
357         state->state   = CTDB_CALL_DONE;
358         *(state->call) = *call;
359         state->ctdb_db = ctdb_db;
360
361         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
362         if (ret != 0) {
363                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
364         }
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
400                 ret = -1;
401         }
402
403         if (ret == 0 && header.dmaster == ctdb->pnn) {
404                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
405                 talloc_free(data.dptr);
406                 ctdb_ltdb_unlock(ctdb_db, call->key);
407                 return state;
408         }
409
410         ctdb_ltdb_unlock(ctdb_db, call->key);
411         talloc_free(data.dptr);
412
413         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414         if (state == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416                 return NULL;
417         }
418         state->call = talloc_zero(state, struct ctdb_call);
419         if (state->call == NULL) {
420                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421                 return NULL;
422         }
423
424         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
425         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426         if (c == NULL) {
427                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428                 return NULL;
429         }
430
431         state->reqid     = ctdb_reqid_new(ctdb, state);
432         state->ctdb_db = ctdb_db;
433         talloc_set_destructor(state, ctdb_client_call_destructor);
434
435         c->hdr.reqid     = state->reqid;
436         c->flags         = call->flags;
437         c->db_id         = ctdb_db->db_id;
438         c->callid        = call->call_id;
439         c->hopcount      = 0;
440         c->keylen        = call->key.dsize;
441         c->calldatalen   = call->call_data.dsize;
442         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
443         memcpy(&c->data[call->key.dsize], 
444                call->call_data.dptr, call->call_data.dsize);
445         *(state->call)              = *call;
446         state->call->call_data.dptr = &c->data[call->key.dsize];
447         state->call->key.dptr       = &c->data[0];
448
449         state->state  = CTDB_CALL_WAIT;
450
451
452         ctdb_client_queue_pkt(ctdb, &c->hdr);
453
454         return state;
455 }
456
457
458 /*
459   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 */
461 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 {
463         struct ctdb_client_call_state *state;
464
465         state = ctdb_call_send(ctdb_db, call);
466         return ctdb_call_recv(state, call);
467 }
468
469
470 /*
471   tell the daemon what messaging srvid we will use, and register the message
472   handler function in the client
473 */
474 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
475                              ctdb_msg_fn_t handler,
476                              void *private_data)
477 {
478         int res;
479         int32_t status;
480
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512 /*
513  * check server ids
514  */
515 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
516                                        uint8_t *result)
517 {
518         TDB_DATA indata, outdata;
519         int res;
520         int32_t status;
521         int i;
522
523         indata.dptr = (uint8_t *)ids;
524         indata.dsize = num * sizeof(*ids);
525
526         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
527                            indata, ctdb, &outdata, &status, NULL, NULL);
528         if (res != 0 || status != 0) {
529                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
530                 return -1;
531         }
532
533         if (outdata.dsize != num*sizeof(uint8_t)) {
534                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
535                                   (long unsigned int)num*sizeof(uint8_t),
536                                   outdata.dsize));
537                 talloc_free(outdata.dptr);
538                 return -1;
539         }
540
541         for (i=0; i<num; i++) {
542                 result[i] = outdata.dptr[i];
543         }
544
545         talloc_free(outdata.dptr);
546         return 0;
547 }
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         talloc_free(r);
570         return res;
571 }
572
573
574 /*
575   cancel a ctdb_fetch_lock operation, releasing the lock
576  */
577 static int fetch_lock_destructor(struct ctdb_record_handle *h)
578 {
579         ctdb_ltdb_unlock(h->ctdb_db, h->key);
580         return 0;
581 }
582
583 /*
584   force the migration of a record to this node
585  */
586 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
587 {
588         struct ctdb_call call;
589         ZERO_STRUCT(call);
590         call.call_id = CTDB_NULL_FUNC;
591         call.key = key;
592         call.flags = CTDB_IMMEDIATE_MIGRATION;
593         return ctdb_call(ctdb_db, &call);
594 }
595
596 /*
597   try to fetch a readonly copy of a record
598  */
599 static int
600 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
601 {
602         int ret;
603
604         struct ctdb_call call;
605         ZERO_STRUCT(call);
606
607         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
608         call.call_data.dptr = NULL;
609         call.call_data.dsize = 0;
610         call.key = key;
611         call.flags = CTDB_WANT_READONLY;
612         ret = ctdb_call(ctdb_db, &call);
613
614         if (ret != 0) {
615                 return -1;
616         }
617         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
618                 return -1;
619         }
620
621         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
622         if (*hdr == NULL) {
623                 talloc_free(call.reply_data.dptr);
624                 return -1;
625         }
626
627         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
628         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
629         if (data->dptr == NULL) {
630                 talloc_free(call.reply_data.dptr);
631                 talloc_free(hdr);
632                 return -1;
633         }
634
635         return 0;
636 }
637
638 /*
639   get a lock on a record, and return the records data. Blocks until it gets the lock
640  */
641 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
642                                            TDB_DATA key, TDB_DATA *data)
643 {
644         int ret;
645         struct ctdb_record_handle *h;
646
647         /*
648           procedure is as follows:
649
650           1) get the chain lock. 
651           2) check if we are dmaster
652           3) if we are the dmaster then return handle 
653           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
654              reply from ctdbd
655           5) when we get the reply, goto (1)
656          */
657
658         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
659         if (h == NULL) {
660                 return NULL;
661         }
662
663         h->ctdb_db = ctdb_db;
664         h->key     = key;
665         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
666         if (h->key.dptr == NULL) {
667                 talloc_free(h);
668                 return NULL;
669         }
670         h->data    = data;
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
673                  (const char *)key.dptr));
674
675 again:
676         /* step 1 - get the chain lock */
677         ret = ctdb_ltdb_lock(ctdb_db, key);
678         if (ret != 0) {
679                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
680                 talloc_free(h);
681                 return NULL;
682         }
683
684         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
685
686         talloc_set_destructor(h, fetch_lock_destructor);
687
688         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
689
690         /* when torturing, ensure we test the remote path */
691         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
692             random() % 5 == 0) {
693                 h->header.dmaster = (uint32_t)-1;
694         }
695
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
698
699         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
700                 ctdb_ltdb_unlock(ctdb_db, key);
701                 ret = ctdb_client_force_migration(ctdb_db, key);
702                 if (ret != 0) {
703                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
704                         talloc_free(h);
705                         return NULL;
706                 }
707                 goto again;
708         }
709
710         /* if this is a request for read/write and we have delegations
711            we have to revoke all delegations first
712         */
713         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
714             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
715                 ctdb_ltdb_unlock(ctdb_db, key);
716                 ret = ctdb_client_force_migration(ctdb_db, key);
717                 if (ret != 0) {
718                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
719                         talloc_free(h);
720                         return NULL;
721                 }
722                 goto again;
723         }
724
725         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
726         return h;
727 }
728
729 /*
730   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
731  */
732 struct ctdb_record_handle *
733 ctdb_fetch_readonly_lock(
734         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
735         TDB_DATA key, TDB_DATA *data,
736         int read_only)
737 {
738         int ret;
739         struct ctdb_record_handle *h;
740         struct ctdb_ltdb_header *roheader = NULL;
741
742         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
743         if (h == NULL) {
744                 return NULL;
745         }
746
747         h->ctdb_db = ctdb_db;
748         h->key     = key;
749         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
750         if (h->key.dptr == NULL) {
751                 talloc_free(h);
752                 return NULL;
753         }
754         h->data    = data;
755
756         data->dptr = NULL;
757         data->dsize = 0;
758
759
760 again:
761         talloc_free(roheader);
762         roheader = NULL;
763
764         talloc_free(data->dptr);
765         data->dptr = NULL;
766         data->dsize = 0;
767
768         /* Lock the record/chain */
769         ret = ctdb_ltdb_lock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
772                 talloc_free(h);
773                 return NULL;
774         }
775
776         talloc_set_destructor(h, fetch_lock_destructor);
777
778         /* Check if record exists yet in the TDB */
779         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
780         if (ret != 0) {
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_force_migration(ctdb_db, key);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
785                         talloc_free(h);
786                         return NULL;
787                 }
788                 goto again;
789         }
790
791         /* if this is a request for read/write and we have delegations
792            we have to revoke all delegations first
793         */
794         if ((read_only == 0) 
795         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
796         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                 ctdb_ltdb_unlock(ctdb_db, key);
798                 ret = ctdb_client_force_migration(ctdb_db, key);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
801                         talloc_free(h);
802                         return NULL;
803                 }
804                 goto again;
805         }
806
807         /* if we are dmaster, just return the handle */
808         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
809                 return h;
810         }
811
812         if (read_only != 0) {
813                 TDB_DATA rodata = {NULL, 0};
814
815                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
816                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
817                         return h;
818                 }
819
820                 ctdb_ltdb_unlock(ctdb_db, key);
821                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
822                 if (ret != 0) {
823                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
835                         ret = ctdb_client_force_migration(ctdb_db, key);
836                         if (ret != 0) {
837                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                                 talloc_free(h);
839                                 return NULL;
840                         }
841
842                         goto again;
843                 }
844
845                 ret = ctdb_ltdb_lock(ctdb_db, key);
846                 if (ret != 0) {
847                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
848                         talloc_free(h);
849                         return NULL;
850                 }
851
852                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
853                 if (ret != 0) {
854                         ctdb_ltdb_unlock(ctdb_db, key);
855
856                         ret = ctdb_client_force_migration(ctdb_db, key);
857                         if (ret != 0) {
858                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
859                                 talloc_free(h);
860                                 return NULL;
861                         }
862
863                         goto again;
864                 }
865
866                 return h;
867         }
868
869         /* we are not dmaster and this was not a request for a readonly lock
870          * so unlock the record, migrate it and try again
871          */
872         ctdb_ltdb_unlock(ctdb_db, key);
873         ret = ctdb_client_force_migration(ctdb_db, key);
874         if (ret != 0) {
875                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
876                 talloc_free(h);
877                 return NULL;
878         }
879         goto again;
880 }
881
882 /*
883   store some data to the record that was locked with ctdb_fetch_lock()
884 */
885 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
886 {
887         if (h->ctdb_db->persistent) {
888                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
889                 return -1;
890         }
891
892         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
893 }
894
895 /*
896   non-locking fetch of a record
897  */
898 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
899                TDB_DATA key, TDB_DATA *data)
900 {
901         struct ctdb_call call;
902         int ret;
903
904         call.call_id = CTDB_FETCH_FUNC;
905         call.call_data.dptr = NULL;
906         call.call_data.dsize = 0;
907         call.key = key;
908
909         ret = ctdb_call(ctdb_db, &call);
910
911         if (ret == 0) {
912                 *data = call.reply_data;
913                 talloc_steal(mem_ctx, data->dptr);
914         }
915
916         return ret;
917 }
918
919
920
921 /*
922    called when a control completes or timesout to invoke the callback
923    function the user provided
924 */
925 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
926         struct timeval t, void *private_data)
927 {
928         struct ctdb_client_control_state *state;
929         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
930         int ret;
931
932         state = talloc_get_type(private_data, struct ctdb_client_control_state);
933         talloc_steal(tmp_ctx, state);
934
935         ret = ctdb_control_recv(state->ctdb, state, state,
936                         NULL, 
937                         NULL, 
938                         NULL);
939         if (ret != 0) {
940                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
941         }
942
943         talloc_free(tmp_ctx);
944 }
945
946 /*
947   called when a CTDB_REPLY_CONTROL packet comes in in the client
948
949   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
950   contains any reply data from the control
951 */
952 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
953                                       struct ctdb_req_header *hdr)
954 {
955         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
956         struct ctdb_client_control_state *state;
957
958         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
959         if (state == NULL) {
960                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
961                 return;
962         }
963
964         if (hdr->reqid != state->reqid) {
965                 /* we found a record  but it was the wrong one */
966                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
967                 return;
968         }
969
970         state->outdata.dptr = c->data;
971         state->outdata.dsize = c->datalen;
972         state->status = c->status;
973         if (c->errorlen) {
974                 state->errormsg = talloc_strndup(state, 
975                                                  (char *)&c->data[c->datalen], 
976                                                  c->errorlen);
977         }
978
979         /* state->outdata now uses resources from c so we dont want c
980            to just dissappear from under us while state is still alive
981         */
982         talloc_steal(state, c);
983
984         state->state = CTDB_CONTROL_DONE;
985
986         /* if we had a callback registered for this control, pull the response
987            and call the callback.
988         */
989         if (state->async.fn) {
990                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
991         }
992 }
993
994
995 /*
996   destroy a ctdb_control in client
997 */
998 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
999 {
1000         ctdb_reqid_remove(state->ctdb, state->reqid);
1001         return 0;
1002 }
1003
1004
1005 /* time out handler for ctdb_control */
1006 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1007         struct timeval t, void *private_data)
1008 {
1009         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1010
1011         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1012                          "dstnode:%u\n", state->reqid, state->c->opcode,
1013                          state->c->hdr.destnode));
1014
1015         state->state = CTDB_CONTROL_TIMEOUT;
1016
1017         /* if we had a callback registered for this control, pull the response
1018            and call the callback.
1019         */
1020         if (state->async.fn) {
1021                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1022         }
1023 }
1024
1025 /* async version of send control request */
1026 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1027                 uint32_t destnode, uint64_t srvid, 
1028                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1029                 TALLOC_CTX *mem_ctx,
1030                 struct timeval *timeout,
1031                 char **errormsg)
1032 {
1033         struct ctdb_client_control_state *state;
1034         size_t len;
1035         struct ctdb_req_control *c;
1036         int ret;
1037
1038         if (errormsg) {
1039                 *errormsg = NULL;
1040         }
1041
1042         /* if the domain socket is not yet open, open it */
1043         if (ctdb->daemon.sd==-1) {
1044                 ctdb_socket_connect(ctdb);
1045         }
1046
1047         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1048         CTDB_NO_MEMORY_NULL(ctdb, state);
1049
1050         state->ctdb       = ctdb;
1051         state->reqid      = ctdb_reqid_new(ctdb, state);
1052         state->state      = CTDB_CONTROL_WAIT;
1053         state->errormsg   = NULL;
1054
1055         talloc_set_destructor(state, ctdb_client_control_destructor);
1056
1057         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1058         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1059                                len, struct ctdb_req_control);
1060         state->c            = c;        
1061         CTDB_NO_MEMORY_NULL(ctdb, c);
1062         c->hdr.reqid        = state->reqid;
1063         c->hdr.destnode     = destnode;
1064         c->opcode           = opcode;
1065         c->client_id        = 0;
1066         c->flags            = flags;
1067         c->srvid            = srvid;
1068         c->datalen          = data.dsize;
1069         if (data.dsize) {
1070                 memcpy(&c->data[0], data.dptr, data.dsize);
1071         }
1072
1073         /* timeout */
1074         if (timeout && !timeval_is_zero(timeout)) {
1075                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1076         }
1077
1078         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1079         if (ret != 0) {
1080                 talloc_free(state);
1081                 return NULL;
1082         }
1083
1084         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1085                 talloc_free(state);
1086                 return NULL;
1087         }
1088
1089         return state;
1090 }
1091
1092
1093 /* async version of receive control reply */
1094 int ctdb_control_recv(struct ctdb_context *ctdb, 
1095                 struct ctdb_client_control_state *state, 
1096                 TALLOC_CTX *mem_ctx,
1097                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1098 {
1099         TALLOC_CTX *tmp_ctx;
1100
1101         if (status != NULL) {
1102                 *status = -1;
1103         }
1104         if (errormsg != NULL) {
1105                 *errormsg = NULL;
1106         }
1107
1108         if (state == NULL) {
1109                 return -1;
1110         }
1111
1112         /* prevent double free of state */
1113         tmp_ctx = talloc_new(ctdb);
1114         talloc_steal(tmp_ctx, state);
1115
1116         /* loop one event at a time until we either timeout or the control
1117            completes.
1118         */
1119         while (state->state == CTDB_CONTROL_WAIT) {
1120                 event_loop_once(ctdb->ev);
1121         }
1122
1123         if (state->state != CTDB_CONTROL_DONE) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1125                 if (state->async.fn) {
1126                         state->async.fn(state);
1127                 }
1128                 talloc_free(tmp_ctx);
1129                 return -1;
1130         }
1131
1132         if (state->errormsg) {
1133                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1134                 if (errormsg) {
1135                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1136                 }
1137                 if (state->async.fn) {
1138                         state->async.fn(state);
1139                 }
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         if (outdata) {
1145                 *outdata = state->outdata;
1146                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1147         }
1148
1149         if (status) {
1150                 *status = state->status;
1151         }
1152
1153         if (state->async.fn) {
1154                 state->async.fn(state);
1155         }
1156
1157         talloc_free(tmp_ctx);
1158         return 0;
1159 }
1160
1161
1162
1163 /*
1164   send a ctdb control message
1165   timeout specifies how long we should wait for a reply.
1166   if timeout is NULL we wait indefinitely
1167  */
1168 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1169                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1170                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1171                  struct timeval *timeout,
1172                  char **errormsg)
1173 {
1174         struct ctdb_client_control_state *state;
1175
1176         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1177                         flags, data, mem_ctx,
1178                         timeout, errormsg);
1179
1180         /* FIXME: Error conditions in ctdb_control_send return NULL without
1181          * setting errormsg.  So, there is no way to distinguish between sucess
1182          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1183         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1184                 if (status != NULL) {
1185                         *status = 0;
1186                 }
1187                 return 0;
1188         }
1189
1190         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1191                         errormsg);
1192 }
1193
1194
1195
1196
1197 /*
1198   a process exists call. Returns 0 if process exists, -1 otherwise
1199  */
1200 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1201 {
1202         int ret;
1203         TDB_DATA data;
1204         int32_t status;
1205
1206         data.dptr = (uint8_t*)&pid;
1207         data.dsize = sizeof(pid);
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1211                            NULL, NULL, &status, NULL, NULL);
1212         if (ret != 0) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1214                 return -1;
1215         }
1216
1217         return status;
1218 }
1219
1220 /*
1221   get remote statistics
1222  */
1223 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1224 {
1225         int ret;
1226         TDB_DATA data;
1227         int32_t res;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1231                            ctdb, &data, &res, NULL, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1234                 return -1;
1235         }
1236
1237         if (data.dsize != sizeof(struct ctdb_statistics)) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1239                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1240                       return -1;
1241         }
1242
1243         *status = *(struct ctdb_statistics *)data.dptr;
1244         talloc_free(data.dptr);
1245                         
1246         return 0;
1247 }
1248
1249 /*
1250  * get db statistics
1251  */
1252 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1253                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1254 {
1255         int ret;
1256         TDB_DATA indata, outdata;
1257         int32_t res;
1258         struct ctdb_db_statistics *wire, *s;
1259         char *ptr;
1260         int i;
1261
1262         indata.dptr = (uint8_t *)&dbid;
1263         indata.dsize = sizeof(dbid);
1264
1265         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1266                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1267         if (ret != 0 || res != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1269                 return -1;
1270         }
1271
1272         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1273                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1274                                  outdata.dsize,
1275                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1276                 return -1;
1277         }
1278
1279         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1280         if (s == NULL) {
1281                 talloc_free(outdata.dptr);
1282                 CTDB_NO_MEMORY(ctdb, s);
1283         }
1284
1285         wire = (struct ctdb_db_statistics *)outdata.dptr;
1286         *s = *wire;
1287         ptr = &wire->hot_keys_wire[0];
1288         for (i=0; i<wire->num_hot_keys; i++) {
1289                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1290                 if (s->hot_keys[i].key.dptr == NULL) {
1291                         talloc_free(outdata.dptr);
1292                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1293                 }
1294
1295                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1296                 ptr += wire->hot_keys[i].key.dsize;
1297         }
1298
1299         talloc_free(outdata.dptr);
1300         *dbstat = s;
1301         return 0;
1302 }
1303
1304 /*
1305   shutdown a remote ctdb node
1306  */
1307 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_control_send(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1313                            NULL, &timeout, NULL);
1314         if (state == NULL) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322 /*
1323   get vnn map from a remote node
1324  */
1325 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1326 {
1327         int ret;
1328         TDB_DATA outdata;
1329         int32_t res;
1330         struct ctdb_vnn_map_wire *map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1334                            mem_ctx, &outdata, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1337                 return -1;
1338         }
1339         
1340         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1341         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1342             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1343                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1344                 return -1;
1345         }
1346
1347         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1348         CTDB_NO_MEMORY(ctdb, *vnnmap);
1349         (*vnnmap)->generation = map->generation;
1350         (*vnnmap)->size       = map->size;
1351         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1352
1353         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1354         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1355         talloc_free(outdata.dptr);
1356                     
1357         return 0;
1358 }
1359
1360
1361 /*
1362   get the recovery mode of a remote node
1363  */
1364 struct ctdb_client_control_state *
1365 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1366 {
1367         return ctdb_control_send(ctdb, destnode, 0, 
1368                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1369                            mem_ctx, &timeout, NULL);
1370 }
1371
1372 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1380                 return -1;
1381         }
1382
1383         if (recmode) {
1384                 *recmode = (uint32_t)res;
1385         }
1386
1387         return 0;
1388 }
1389
1390 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1391 {
1392         struct ctdb_client_control_state *state;
1393
1394         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1395         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1396 }
1397
1398
1399
1400
1401 /*
1402   set the recovery mode of a remote node
1403  */
1404 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1405 {
1406         int ret;
1407         TDB_DATA data;
1408         int32_t res;
1409
1410         data.dsize = sizeof(uint32_t);
1411         data.dptr = (unsigned char *)&recmode;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   get the recovery master of a remote node
1428  */
1429 struct ctdb_client_control_state *
1430 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1431                         struct timeval timeout, uint32_t destnode)
1432 {
1433         return ctdb_control_send(ctdb, destnode, 0, 
1434                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1435                            mem_ctx, &timeout, NULL);
1436 }
1437
1438 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1439 {
1440         int ret;
1441         int32_t res;
1442
1443         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1446                 return -1;
1447         }
1448
1449         if (recmaster) {
1450                 *recmaster = (uint32_t)res;
1451         }
1452
1453         return 0;
1454 }
1455
1456 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1457 {
1458         struct ctdb_client_control_state *state;
1459
1460         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1461         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1462 }
1463
1464
1465 /*
1466   set the recovery master of a remote node
1467  */
1468 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1469 {
1470         int ret;
1471         TDB_DATA data;
1472         int32_t res;
1473
1474         ZERO_STRUCT(data);
1475         data.dsize = sizeof(uint32_t);
1476         data.dptr = (unsigned char *)&recmaster;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1480                            NULL, NULL, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1483                 return -1;
1484         }
1485
1486         return 0;
1487 }
1488
1489
1490 /*
1491   get a list of databases off a remote node
1492  */
1493 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1494                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1495 {
1496         int ret;
1497         TDB_DATA outdata;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1502                            mem_ctx, &outdata, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1505                 return -1;
1506         }
1507
1508         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1509         talloc_free(outdata.dptr);
1510                     
1511         return 0;
1512 }
1513
1514 /*
1515   get a list of nodes (vnn and flags ) from a remote node
1516  */
1517 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1518                 struct timeval timeout, uint32_t destnode, 
1519                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0, 
1526                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1530                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1531         }
1532         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1533                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1534                 return -1;
1535         }
1536
1537         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1538         talloc_free(outdata.dptr);
1539                     
1540         return 0;
1541 }
1542
1543 /*
1544   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1545  */
1546 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1547                 struct timeval timeout, uint32_t destnode, 
1548                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1549 {
1550         int ret, i, len;
1551         TDB_DATA outdata;
1552         struct ctdb_node_mapv4 *nodemapv4;
1553         int32_t res;
1554
1555         ret = ctdb_control(ctdb, destnode, 0, 
1556                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1557                            mem_ctx, &outdata, &res, &timeout, NULL);
1558         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1560                 return -1;
1561         }
1562
1563         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1564
1565         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1566         (*nodemap) = talloc_zero_size(mem_ctx, len);
1567         CTDB_NO_MEMORY(ctdb, (*nodemap));
1568
1569         (*nodemap)->num = nodemapv4->num;
1570         for (i=0; i<nodemapv4->num; i++) {
1571                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1572                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1573                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1574                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1575         }
1576                 
1577         talloc_free(outdata.dptr);
1578                     
1579         return 0;
1580 }
1581
1582 /*
1583   drop the transport, reload the nodes file and restart the transport
1584  */
1585 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1586                     struct timeval timeout, uint32_t destnode)
1587 {
1588         int ret;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, 
1592                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1593                            NULL, NULL, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1596                 return -1;
1597         }
1598
1599         return 0;
1600 }
1601
1602
1603 /*
1604   set vnn map on a node
1605  */
1606 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1607                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1608 {
1609         int ret;
1610         TDB_DATA data;
1611         int32_t res;
1612         struct ctdb_vnn_map_wire *map;
1613         size_t len;
1614
1615         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1616         map = talloc_size(mem_ctx, len);
1617         CTDB_NO_MEMORY(ctdb, map);
1618
1619         map->generation = vnnmap->generation;
1620         map->size = vnnmap->size;
1621         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1622         
1623         data.dsize = len;
1624         data.dptr  = (uint8_t *)map;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1631                 return -1;
1632         }
1633
1634         talloc_free(map);
1635
1636         return 0;
1637 }
1638
1639
1640 /*
1641   async send for pull database
1642  */
1643 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1644         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1645         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1646 {
1647         TDB_DATA indata;
1648         struct ctdb_control_pulldb *pull;
1649         struct ctdb_client_control_state *state;
1650
1651         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1652         CTDB_NO_MEMORY_NULL(ctdb, pull);
1653
1654         pull->db_id   = dbid;
1655         pull->lmaster = lmaster;
1656
1657         indata.dsize = sizeof(struct ctdb_control_pulldb);
1658         indata.dptr  = (unsigned char *)pull;
1659
1660         state = ctdb_control_send(ctdb, destnode, 0, 
1661                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1662                                   mem_ctx, &timeout, NULL);
1663         talloc_free(pull);
1664
1665         return state;
1666 }
1667
1668 /*
1669   async recv for pull database
1670  */
1671 int ctdb_ctrl_pulldb_recv(
1672         struct ctdb_context *ctdb, 
1673         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1674         TDB_DATA *outdata)
1675 {
1676         int ret;
1677         int32_t res;
1678
1679         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1680         if ( (ret != 0) || (res != 0) ){
1681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1682                 return -1;
1683         }
1684
1685         return 0;
1686 }
1687
1688 /*
1689   pull all keys and records for a specific database on a node
1690  */
1691 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1692                 uint32_t dbid, uint32_t lmaster, 
1693                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1694                 TDB_DATA *outdata)
1695 {
1696         struct ctdb_client_control_state *state;
1697
1698         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1699                                       timeout);
1700         
1701         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1702 }
1703
1704
1705 /*
1706   change dmaster for all keys in the database to the new value
1707  */
1708 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1709                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1710 {
1711         int ret;
1712         TDB_DATA indata;
1713         int32_t res;
1714
1715         indata.dsize = 2*sizeof(uint32_t);
1716         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1717
1718         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1719         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1720
1721         ret = ctdb_control(ctdb, destnode, 0, 
1722                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1723                            NULL, NULL, &res, &timeout, NULL);
1724         if (ret != 0 || res != 0) {
1725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1726                 return -1;
1727         }
1728
1729         return 0;
1730 }
1731
1732 /*
1733   ping a node, return number of clients connected
1734  */
1735 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1736 {
1737         int ret;
1738         int32_t res;
1739
1740         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1741                            tdb_null, NULL, NULL, &res, NULL, NULL);
1742         if (ret != 0) {
1743                 return -1;
1744         }
1745         return res;
1746 }
1747
1748 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1749                            struct timeval timeout, 
1750                            uint32_t destnode,
1751                            uint32_t *runstate)
1752 {
1753         TDB_DATA outdata;
1754         int32_t res;
1755         int ret;
1756
1757         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1758                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1759         if (ret != 0 || res != 0) {
1760                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1761                 return ret != 0 ? ret : res;
1762         }
1763
1764         if (outdata.dsize != sizeof(uint32_t)) {
1765                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1766                 talloc_free(outdata.dptr);
1767                 return -1;
1768         }
1769
1770         if (runstate != NULL) {
1771                 *runstate = *(uint32_t *)outdata.dptr;
1772         }
1773         talloc_free(outdata.dptr);
1774
1775         return 0;
1776 }
1777
1778 /*
1779   find the real path to a ltdb 
1780  */
1781 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1782                    const char **path)
1783 {
1784         int ret;
1785         int32_t res;
1786         TDB_DATA data;
1787
1788         data.dptr = (uint8_t *)&dbid;
1789         data.dsize = sizeof(dbid);
1790
1791         ret = ctdb_control(ctdb, destnode, 0, 
1792                            CTDB_CONTROL_GETDBPATH, 0, data, 
1793                            mem_ctx, &data, &res, &timeout, NULL);
1794         if (ret != 0 || res != 0) {
1795                 return -1;
1796         }
1797
1798         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1799         if ((*path) == NULL) {
1800                 return -1;
1801         }
1802
1803         talloc_free(data.dptr);
1804
1805         return 0;
1806 }
1807
1808 /*
1809   find the name of a db 
1810  */
1811 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1812                    const char **name)
1813 {
1814         int ret;
1815         int32_t res;
1816         TDB_DATA data;
1817
1818         data.dptr = (uint8_t *)&dbid;
1819         data.dsize = sizeof(dbid);
1820
1821         ret = ctdb_control(ctdb, destnode, 0, 
1822                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1823                            mem_ctx, &data, &res, &timeout, NULL);
1824         if (ret != 0 || res != 0) {
1825                 return -1;
1826         }
1827
1828         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1829         if ((*name) == NULL) {
1830                 return -1;
1831         }
1832
1833         talloc_free(data.dptr);
1834
1835         return 0;
1836 }
1837
1838 /*
1839   get the health status of a db
1840  */
1841 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1842                           struct timeval timeout,
1843                           uint32_t destnode,
1844                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1845                           const char **reason)
1846 {
1847         int ret;
1848         int32_t res;
1849         TDB_DATA data;
1850
1851         data.dptr = (uint8_t *)&dbid;
1852         data.dsize = sizeof(dbid);
1853
1854         ret = ctdb_control(ctdb, destnode, 0,
1855                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1856                            mem_ctx, &data, &res, &timeout, NULL);
1857         if (ret != 0 || res != 0) {
1858                 return -1;
1859         }
1860
1861         if (data.dsize == 0) {
1862                 (*reason) = NULL;
1863                 return 0;
1864         }
1865
1866         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1867         if ((*reason) == NULL) {
1868                 return -1;
1869         }
1870
1871         talloc_free(data.dptr);
1872
1873         return 0;
1874 }
1875
1876 /*
1877  * get db sequence number
1878  */
1879 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1880                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1881 {
1882         int ret;
1883         int32_t res;
1884         TDB_DATA data, outdata;
1885
1886         data.dptr = (uint8_t *)&dbid;
1887         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1888
1889         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1890                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1891         if (ret != 0 || res != 0) {
1892                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1893                 return -1;
1894         }
1895
1896         if (outdata.dsize != sizeof(uint64_t)) {
1897                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1898                 talloc_free(outdata.dptr);
1899                 return -1;
1900         }
1901
1902         if (seqnum != NULL) {
1903                 *seqnum = *(uint64_t *)outdata.dptr;
1904         }
1905         talloc_free(outdata.dptr);
1906
1907         return 0;
1908 }
1909
1910 /*
1911   create a database
1912  */
1913 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1914                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1915 {
1916         int ret;
1917         int32_t res;
1918         TDB_DATA data;
1919         uint64_t tdb_flags = 0;
1920
1921         data.dptr = discard_const(name);
1922         data.dsize = strlen(name)+1;
1923
1924         /* Make sure that volatile databases use jenkins hash */
1925         if (!persistent) {
1926                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1927         }
1928
1929 #ifdef TDB_MUTEX_LOCKING
1930         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1931                 tdb_flags |= TDB_MUTEX_LOCKING;
1932         }
1933 #endif
1934
1935         ret = ctdb_control(ctdb, destnode, tdb_flags,
1936                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1937                            0, data, 
1938                            mem_ctx, &data, &res, &timeout, NULL);
1939
1940         if (ret != 0 || res != 0) {
1941                 return -1;
1942         }
1943
1944         return 0;
1945 }
1946
1947 /*
1948   get debug level on a node
1949  */
1950 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1951 {
1952         int ret;
1953         int32_t res;
1954         TDB_DATA data;
1955
1956         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1957                            ctdb, &data, &res, NULL, NULL);
1958         if (ret != 0 || res != 0) {
1959                 return -1;
1960         }
1961         if (data.dsize != sizeof(int32_t)) {
1962                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1963                          (unsigned)data.dsize));
1964                 return -1;
1965         }
1966         *level = *(int32_t *)data.dptr;
1967         talloc_free(data.dptr);
1968         return 0;
1969 }
1970
1971 /*
1972   set debug level on a node
1973  */
1974 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1975 {
1976         int ret;
1977         int32_t res;
1978         TDB_DATA data;
1979
1980         data.dptr = (uint8_t *)&level;
1981         data.dsize = sizeof(level);
1982
1983         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1984                            NULL, NULL, &res, NULL, NULL);
1985         if (ret != 0 || res != 0) {
1986                 return -1;
1987         }
1988         return 0;
1989 }
1990
1991
1992 /*
1993   get a list of connected nodes
1994  */
1995 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1996                                 struct timeval timeout,
1997                                 TALLOC_CTX *mem_ctx,
1998                                 uint32_t *num_nodes)
1999 {
2000         struct ctdb_node_map *map=NULL;
2001         int ret, i;
2002         uint32_t *nodes;
2003
2004         *num_nodes = 0;
2005
2006         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2007         if (ret != 0) {
2008                 return NULL;
2009         }
2010
2011         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2012         if (nodes == NULL) {
2013                 return NULL;
2014         }
2015
2016         for (i=0;i<map->num;i++) {
2017                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2018                         nodes[*num_nodes] = map->nodes[i].pnn;
2019                         (*num_nodes)++;
2020                 }
2021         }
2022
2023         return nodes;
2024 }
2025
2026
2027 /*
2028   reset remote status
2029  */
2030 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2031 {
2032         int ret;
2033         int32_t res;
2034
2035         ret = ctdb_control(ctdb, destnode, 0, 
2036                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2037                            NULL, NULL, &res, NULL, NULL);
2038         if (ret != 0 || res != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2040                 return -1;
2041         }
2042         return 0;
2043 }
2044
2045 /*
2046   attach to a specific database - client call
2047 */
2048 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2049                                     struct timeval timeout,
2050                                     const char *name,
2051                                     bool persistent,
2052                                     uint32_t tdb_flags)
2053 {
2054         struct ctdb_db_context *ctdb_db;
2055         TDB_DATA data;
2056         int ret;
2057         int32_t res;
2058
2059         ctdb_db = ctdb_db_handle(ctdb, name);
2060         if (ctdb_db) {
2061                 return ctdb_db;
2062         }
2063
2064         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2065         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2066
2067         ctdb_db->ctdb = ctdb;
2068         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2069         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2070
2071         data.dptr = discard_const(name);
2072         data.dsize = strlen(name)+1;
2073
2074         /* CTDB has switched to using jenkins hash for volatile databases.
2075          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2076          * always set it.
2077          */
2078         if (!persistent) {
2079                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2080         }
2081
2082 #ifdef TDB_MUTEX_LOCKING
2083         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
2084                 tdb_flags |= TDB_MUTEX_LOCKING;
2085         }
2086 #endif
2087
2088         /* tell ctdb daemon to attach */
2089         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2090                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2091                            0, data, ctdb_db, &data, &res, NULL, NULL);
2092         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2093                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2094                 talloc_free(ctdb_db);
2095                 return NULL;
2096         }
2097         
2098         ctdb_db->db_id = *(uint32_t *)data.dptr;
2099         talloc_free(data.dptr);
2100
2101         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2102         if (ret != 0) {
2103                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2104                 talloc_free(ctdb_db);
2105                 return NULL;
2106         }
2107
2108         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2109         if (ctdb->valgrinding) {
2110                 tdb_flags |= TDB_NOMMAP;
2111         }
2112         tdb_flags |= TDB_DISALLOW_NESTING;
2113
2114         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2115                                       O_RDWR, 0);
2116         if (ctdb_db->ltdb == NULL) {
2117                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2118                 talloc_free(ctdb_db);
2119                 return NULL;
2120         }
2121
2122         ctdb_db->persistent = persistent;
2123
2124         DLIST_ADD(ctdb->db_list, ctdb_db);
2125
2126         /* add well known functions */
2127         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2128         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2129         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2130
2131         return ctdb_db;
2132 }
2133
2134 /*
2135  * detach from a specific database - client call
2136  */
2137 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2138 {
2139         int ret;
2140         int32_t status;
2141         TDB_DATA data;
2142
2143         data.dsize = sizeof(db_id);
2144         data.dptr = (uint8_t *)&db_id;
2145
2146         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2147                            0, data, NULL, NULL, &status, NULL, NULL);
2148         if (ret != 0 || status != 0) {
2149                 return -1;
2150         }
2151         return 0;
2152 }
2153
2154 /*
2155   setup a call for a database
2156  */
2157 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2158 {
2159         struct ctdb_registered_call *call;
2160
2161 #if 0
2162         TDB_DATA data;
2163         int32_t status;
2164         struct ctdb_control_set_call c;
2165         int ret;
2166
2167         /* this is no longer valid with the separate daemon architecture */
2168         c.db_id = ctdb_db->db_id;
2169         c.fn    = fn;
2170         c.id    = id;
2171
2172         data.dptr = (uint8_t *)&c;
2173         data.dsize = sizeof(c);
2174
2175         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2176                            data, NULL, NULL, &status, NULL, NULL);
2177         if (ret != 0 || status != 0) {
2178                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2179                 return -1;
2180         }
2181 #endif
2182
2183         /* also register locally */
2184         call = talloc(ctdb_db, struct ctdb_registered_call);
2185         call->fn = fn;
2186         call->id = id;
2187
2188         DLIST_ADD(ctdb_db->calls, call);        
2189         return 0;
2190 }
2191
2192
2193 struct traverse_state {
2194         bool done;
2195         uint32_t count;
2196         ctdb_traverse_func fn;
2197         void *private_data;
2198         bool listemptyrecords;
2199 };
2200
2201 /*
2202   called on each key during a ctdb_traverse
2203  */
2204 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2205 {
2206         struct traverse_state *state = (struct traverse_state *)p;
2207         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2208         TDB_DATA key;
2209
2210         if (data.dsize < sizeof(uint32_t) ||
2211             d->length != data.dsize) {
2212                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2213                 state->done = true;
2214                 return;
2215         }
2216
2217         key.dsize = d->keylen;
2218         key.dptr  = &d->data[0];
2219         data.dsize = d->datalen;
2220         data.dptr = &d->data[d->keylen];
2221
2222         if (key.dsize == 0 && data.dsize == 0) {
2223                 /* end of traverse */
2224                 state->done = true;
2225                 return;
2226         }
2227
2228         if (!state->listemptyrecords &&
2229             data.dsize == sizeof(struct ctdb_ltdb_header))
2230         {
2231                 /* empty records are deleted records in ctdb */
2232                 return;
2233         }
2234
2235         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2236                 state->done = true;
2237         }
2238
2239         state->count++;
2240 }
2241
2242 /**
2243  * start a cluster wide traverse, calling the supplied fn on each record
2244  * return the number of records traversed, or -1 on error
2245  *
2246  * Extendet variant with a flag to signal whether empty records should
2247  * be listed.
2248  */
2249 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2250                              ctdb_traverse_func fn,
2251                              bool withemptyrecords,
2252                              void *private_data)
2253 {
2254         TDB_DATA data;
2255         struct ctdb_traverse_start_ext t;
2256         int32_t status;
2257         int ret;
2258         uint64_t srvid = (getpid() | 0xFLL<<60);
2259         struct traverse_state state;
2260
2261         state.done = false;
2262         state.count = 0;
2263         state.private_data = private_data;
2264         state.fn = fn;
2265         state.listemptyrecords = withemptyrecords;
2266
2267         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2268         if (ret != 0) {
2269                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2270                 return -1;
2271         }
2272
2273         t.db_id = ctdb_db->db_id;
2274         t.srvid = srvid;
2275         t.reqid = 0;
2276         t.withemptyrecords = withemptyrecords;
2277
2278         data.dptr = (uint8_t *)&t;
2279         data.dsize = sizeof(t);
2280
2281         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2282                            data, NULL, NULL, &status, NULL, NULL);
2283         if (ret != 0 || status != 0) {
2284                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2285                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2286                 return -1;
2287         }
2288
2289         while (!state.done) {
2290                 event_loop_once(ctdb_db->ctdb->ev);
2291         }
2292
2293         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2294         if (ret != 0) {
2295                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2296                 return -1;
2297         }
2298
2299         return state.count;
2300 }
2301
2302 /**
2303  * start a cluster wide traverse, calling the supplied fn on each record
2304  * return the number of records traversed, or -1 on error
2305  *
2306  * Standard version which does not list the empty records:
2307  * These are considered deleted.
2308  */
2309 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2310 {
2311         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2312 }
2313
2314 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2315 /*
2316   called on each key during a catdb
2317  */
2318 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2319 {
2320         int i;
2321         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2322         FILE *f = c->f;
2323         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2324
2325         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2326         for (i=0;i<key.dsize;i++) {
2327                 if (ISASCII(key.dptr[i])) {
2328                         fprintf(f, "%c", key.dptr[i]);
2329                 } else {
2330                         fprintf(f, "\\%02X", key.dptr[i]);
2331                 }
2332         }
2333         fprintf(f, "\"\n");
2334
2335         fprintf(f, "dmaster: %u\n", h->dmaster);
2336         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2337
2338         if (c->printlmaster && ctdb->vnn_map != NULL) {
2339                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2340         }
2341
2342         if (c->printhash) {
2343                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2344         }
2345
2346         if (c->printrecordflags) {
2347                 fprintf(f, "flags: 0x%08x", h->flags);
2348                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2349                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2350                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2351                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2352                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2353                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2354                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2355                 fprintf(f, "\n");
2356         }
2357
2358         if (c->printdatasize) {
2359                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2360         } else {
2361                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2362                 for (i=sizeof(*h);i<data.dsize;i++) {
2363                         if (ISASCII(data.dptr[i])) {
2364                                 fprintf(f, "%c", data.dptr[i]);
2365                         } else {
2366                                 fprintf(f, "\\%02X", data.dptr[i]);
2367                         }
2368                 }
2369                 fprintf(f, "\"\n");
2370         }
2371
2372         fprintf(f, "\n");
2373
2374         return 0;
2375 }
2376
2377 /*
2378   convenience function to list all keys to stdout
2379  */
2380 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2381                  struct ctdb_dump_db_context *ctx)
2382 {
2383         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2384                                  ctx->printemptyrecords, ctx);
2385 }
2386
2387 /*
2388   get the pid of a ctdb daemon
2389  */
2390 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2391 {
2392         int ret;
2393         int32_t res;
2394
2395         ret = ctdb_control(ctdb, destnode, 0, 
2396                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2397                            NULL, NULL, &res, &timeout, NULL);
2398         if (ret != 0) {
2399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2400                 return -1;
2401         }
2402
2403         *pid = res;
2404
2405         return 0;
2406 }
2407
2408
2409 /*
2410   async freeze send control
2411  */
2412 struct ctdb_client_control_state *
2413 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2414 {
2415         return ctdb_control_send(ctdb, destnode, priority, 
2416                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2417                            mem_ctx, &timeout, NULL);
2418 }
2419
2420 /* 
2421    async freeze recv control
2422 */
2423 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2424 {
2425         int ret;
2426         int32_t res;
2427
2428         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2429         if ( (ret != 0) || (res != 0) ){
2430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2431                 return -1;
2432         }
2433
2434         return 0;
2435 }
2436
2437 /*
2438   freeze databases of a certain priority
2439  */
2440 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2441 {
2442         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2443         struct ctdb_client_control_state *state;
2444         int ret;
2445
2446         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2447         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2448         talloc_free(tmp_ctx);
2449
2450         return ret;
2451 }
2452
2453 /* Freeze all databases */
2454 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2455 {
2456         int i;
2457
2458         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2459                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2460                         return -1;
2461                 }
2462         }
2463         return 0;
2464 }
2465
2466 /*
2467   thaw databases of a certain priority
2468  */
2469 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2470 {
2471         int ret;
2472         int32_t res;
2473
2474         ret = ctdb_control(ctdb, destnode, priority, 
2475                            CTDB_CONTROL_THAW, 0, tdb_null, 
2476                            NULL, NULL, &res, &timeout, NULL);
2477         if (ret != 0 || res != 0) {
2478                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2479                 return -1;
2480         }
2481
2482         return 0;
2483 }
2484
2485 /* thaw all databases */
2486 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2487 {
2488         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2489 }
2490
2491 /*
2492   get pnn of a node, or -1
2493  */
2494 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2495 {
2496         int ret;
2497         int32_t res;
2498
2499         ret = ctdb_control(ctdb, destnode, 0, 
2500                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2501                            NULL, NULL, &res, &timeout, NULL);
2502         if (ret != 0) {
2503                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2504                 return -1;
2505         }
2506
2507         return res;
2508 }
2509
2510 /*
2511   get the monitoring mode of a remote node
2512  */
2513 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2514 {
2515         int ret;
2516         int32_t res;
2517
2518         ret = ctdb_control(ctdb, destnode, 0, 
2519                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2520                            NULL, NULL, &res, &timeout, NULL);
2521         if (ret != 0) {
2522                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2523                 return -1;
2524         }
2525
2526         *monmode = res;
2527
2528         return 0;
2529 }
2530
2531
2532 /*
2533  set the monitoring mode of a remote node to active
2534  */
2535 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2536 {
2537         int ret;
2538         
2539
2540         ret = ctdb_control(ctdb, destnode, 0, 
2541                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2542                            NULL, NULL,NULL, &timeout, NULL);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2545                 return -1;
2546         }
2547
2548         
2549
2550         return 0;
2551 }
2552
2553 /*
2554   set the monitoring mode of a remote node to disable
2555  */
2556 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2557 {
2558         int ret;
2559         
2560
2561         ret = ctdb_control(ctdb, destnode, 0, 
2562                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2563                            NULL, NULL, NULL, &timeout, NULL);
2564         if (ret != 0) {
2565                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2566                 return -1;
2567         }
2568
2569         
2570
2571         return 0;
2572 }
2573
2574
2575
2576 /* 
2577   sent to a node to make it take over an ip address
2578 */
2579 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2580                           uint32_t destnode, struct ctdb_public_ip *ip)
2581 {
2582         TDB_DATA data;
2583         struct ctdb_public_ipv4 ipv4;
2584         int ret;
2585         int32_t res;
2586
2587         if (ip->addr.sa.sa_family == AF_INET) {
2588                 ipv4.pnn = ip->pnn;
2589                 ipv4.sin = ip->addr.ip;
2590
2591                 data.dsize = sizeof(ipv4);
2592                 data.dptr  = (uint8_t *)&ipv4;
2593
2594                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2595                            NULL, &res, &timeout, NULL);
2596         } else {
2597                 data.dsize = sizeof(*ip);
2598                 data.dptr  = (uint8_t *)ip;
2599
2600                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2601                            NULL, &res, &timeout, NULL);
2602         }
2603
2604         if (ret != 0 || res != 0) {
2605                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2606                 return -1;
2607         }
2608
2609         return 0;       
2610 }
2611
2612
2613 /* 
2614   sent to a node to make it release an ip address
2615 */
2616 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2617                          uint32_t destnode, struct ctdb_public_ip *ip)
2618 {
2619         TDB_DATA data;
2620         struct ctdb_public_ipv4 ipv4;
2621         int ret;
2622         int32_t res;
2623
2624         if (ip->addr.sa.sa_family == AF_INET) {
2625                 ipv4.pnn = ip->pnn;
2626                 ipv4.sin = ip->addr.ip;
2627
2628                 data.dsize = sizeof(ipv4);
2629                 data.dptr  = (uint8_t *)&ipv4;
2630
2631                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2632                                    NULL, &res, &timeout, NULL);
2633         } else {
2634                 data.dsize = sizeof(*ip);
2635                 data.dptr  = (uint8_t *)ip;
2636
2637                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2638                                    NULL, &res, &timeout, NULL);
2639         }
2640
2641         if (ret != 0 || res != 0) {
2642                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2643                 return -1;
2644         }
2645
2646         return 0;       
2647 }
2648
2649
2650 /*
2651   get a tunable
2652  */
2653 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2654                           struct timeval timeout, 
2655                           uint32_t destnode,
2656                           const char *name, uint32_t *value)
2657 {
2658         struct ctdb_control_get_tunable *t;
2659         TDB_DATA data, outdata;
2660         int32_t res;
2661         int ret;
2662
2663         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2664         data.dptr  = talloc_size(ctdb, data.dsize);
2665         CTDB_NO_MEMORY(ctdb, data.dptr);
2666
2667         t = (struct ctdb_control_get_tunable *)data.dptr;
2668         t->length = strlen(name)+1;
2669         memcpy(t->name, name, t->length);
2670
2671         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2672                            &outdata, &res, &timeout, NULL);
2673         talloc_free(data.dptr);
2674         if (ret != 0 || res != 0) {
2675                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2676                 return ret != 0 ? ret : res;
2677         }
2678
2679         if (outdata.dsize != sizeof(uint32_t)) {
2680                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2681                 talloc_free(outdata.dptr);
2682                 return -1;
2683         }
2684         
2685         *value = *(uint32_t *)outdata.dptr;
2686         talloc_free(outdata.dptr);
2687
2688         return 0;
2689 }
2690
2691 /*
2692   set a tunable
2693  */
2694 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2695                           struct timeval timeout, 
2696                           uint32_t destnode,
2697                           const char *name, uint32_t value)
2698 {
2699         struct ctdb_control_set_tunable *t;
2700         TDB_DATA data;
2701         int32_t res;
2702         int ret;
2703
2704         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2705         data.dptr  = talloc_size(ctdb, data.dsize);
2706         CTDB_NO_MEMORY(ctdb, data.dptr);
2707
2708         t = (struct ctdb_control_set_tunable *)data.dptr;
2709         t->length = strlen(name)+1;
2710         memcpy(t->name, name, t->length);
2711         t->value = value;
2712
2713         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2714                            NULL, &res, &timeout, NULL);
2715         talloc_free(data.dptr);
2716         if (ret != 0 || res != 0) {
2717                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2718                 return -1;
2719         }
2720
2721         return 0;
2722 }
2723
2724 /*
2725   list tunables
2726  */
2727 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2728                             struct timeval timeout, 
2729                             uint32_t destnode,
2730                             TALLOC_CTX *mem_ctx,
2731                             const char ***list, uint32_t *count)
2732 {
2733         TDB_DATA outdata;
2734         int32_t res;
2735         int ret;
2736         struct ctdb_control_list_tunable *t;
2737         char *p, *s, *ptr;
2738
2739         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2740                            mem_ctx, &outdata, &res, &timeout, NULL);
2741         if (ret != 0 || res != 0) {
2742                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2743                 return -1;
2744         }
2745
2746         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2747         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2748             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2749                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2750                 talloc_free(outdata.dptr);
2751                 return -1;              
2752         }
2753         
2754         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2755         CTDB_NO_MEMORY(ctdb, p);
2756
2757         talloc_free(outdata.dptr);
2758         
2759         (*list) = NULL;
2760         (*count) = 0;
2761
2762         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2763                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2764                 CTDB_NO_MEMORY(ctdb, *list);
2765                 (*list)[*count] = talloc_strdup(*list, s);
2766                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2767                 (*count)++;
2768         }
2769
2770         talloc_free(p);
2771
2772         return 0;
2773 }
2774
2775
2776 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2777                                    struct timeval timeout, uint32_t destnode,
2778                                    TALLOC_CTX *mem_ctx,
2779                                    uint32_t flags,
2780                                    struct ctdb_all_public_ips **ips)
2781 {
2782         int ret;
2783         TDB_DATA outdata;
2784         int32_t res;
2785
2786         ret = ctdb_control(ctdb, destnode, 0, 
2787                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2788                            mem_ctx, &outdata, &res, &timeout, NULL);
2789         if (ret == 0 && res == -1) {
2790                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2791                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2792         }
2793         if (ret != 0 || res != 0) {
2794           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2795                 return -1;
2796         }
2797
2798         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2799         talloc_free(outdata.dptr);
2800                     
2801         return 0;
2802 }
2803
2804 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2805                              struct timeval timeout, uint32_t destnode,
2806                              TALLOC_CTX *mem_ctx,
2807                              struct ctdb_all_public_ips **ips)
2808 {
2809         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2810                                               destnode, mem_ctx,
2811                                               0, ips);
2812 }
2813
2814 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2815                         struct timeval timeout, uint32_t destnode, 
2816                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2817 {
2818         int ret, i, len;
2819         TDB_DATA outdata;
2820         int32_t res;
2821         struct ctdb_all_public_ipsv4 *ipsv4;
2822
2823         ret = ctdb_control(ctdb, destnode, 0, 
2824                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2825                            mem_ctx, &outdata, &res, &timeout, NULL);
2826         if (ret != 0 || res != 0) {
2827                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2828                 return -1;
2829         }
2830
2831         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2832         len = offsetof(struct ctdb_all_public_ips, ips) +
2833                 ipsv4->num*sizeof(struct ctdb_public_ip);
2834         *ips = talloc_zero_size(mem_ctx, len);
2835         CTDB_NO_MEMORY(ctdb, *ips);
2836         (*ips)->num = ipsv4->num;
2837         for (i=0; i<ipsv4->num; i++) {
2838                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2839                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2840         }
2841
2842         talloc_free(outdata.dptr);
2843                     
2844         return 0;
2845 }
2846
2847 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2848                                  struct timeval timeout, uint32_t destnode,
2849                                  TALLOC_CTX *mem_ctx,
2850                                  const ctdb_sock_addr *addr,
2851                                  struct ctdb_control_public_ip_info **_info)
2852 {
2853         int ret;
2854         TDB_DATA indata;
2855         TDB_DATA outdata;
2856         int32_t res;
2857         struct ctdb_control_public_ip_info *info;
2858         uint32_t len;
2859         uint32_t i;
2860
2861         indata.dptr = discard_const_p(uint8_t, addr);
2862         indata.dsize = sizeof(*addr);
2863
2864         ret = ctdb_control(ctdb, destnode, 0,
2865                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2866                            mem_ctx, &outdata, &res, &timeout, NULL);
2867         if (ret != 0 || res != 0) {
2868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2869                                 "failed ret:%d res:%d\n",
2870                                 ret, res));
2871                 return -1;
2872         }
2873
2874         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2875         if (len > outdata.dsize) {
2876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2877                                 "returned invalid data with size %u > %u\n",
2878                                 (unsigned int)outdata.dsize,
2879                                 (unsigned int)len));
2880                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2881                 return -1;
2882         }
2883
2884         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2885         len += info->num*sizeof(struct ctdb_control_iface_info);
2886
2887         if (len > outdata.dsize) {
2888                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2889                                 "returned invalid data with size %u > %u\n",
2890                                 (unsigned int)outdata.dsize,
2891                                 (unsigned int)len));
2892                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2893                 return -1;
2894         }
2895
2896         /* make sure we null terminate the returned strings */
2897         for (i=0; i < info->num; i++) {
2898                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2899         }
2900
2901         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2902                                                                 outdata.dptr,
2903                                                                 outdata.dsize);
2904         talloc_free(outdata.dptr);
2905         if (*_info == NULL) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2907                                 "talloc_memdup size %u failed\n",
2908                                 (unsigned int)outdata.dsize));
2909                 return -1;
2910         }
2911
2912         return 0;
2913 }
2914
2915 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2916                          struct timeval timeout, uint32_t destnode,
2917                          TALLOC_CTX *mem_ctx,
2918                          struct ctdb_control_get_ifaces **_ifaces)
2919 {
2920         int ret;
2921         TDB_DATA outdata;
2922         int32_t res;
2923         struct ctdb_control_get_ifaces *ifaces;
2924         uint32_t len;
2925         uint32_t i;
2926
2927         ret = ctdb_control(ctdb, destnode, 0,
2928                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2929                            mem_ctx, &outdata, &res, &timeout, NULL);
2930         if (ret != 0 || res != 0) {
2931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2932                                 "failed ret:%d res:%d\n",
2933                                 ret, res));
2934                 return -1;
2935         }
2936
2937         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2938         if (len > outdata.dsize) {
2939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2940                                 "returned invalid data with size %u > %u\n",
2941                                 (unsigned int)outdata.dsize,
2942                                 (unsigned int)len));
2943                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2944                 return -1;
2945         }
2946
2947         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2948         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2949
2950         if (len > outdata.dsize) {
2951                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2952                                 "returned invalid data with size %u > %u\n",
2953                                 (unsigned int)outdata.dsize,
2954                                 (unsigned int)len));
2955                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2956                 return -1;
2957         }
2958
2959         /* make sure we null terminate the returned strings */
2960         for (i=0; i < ifaces->num; i++) {
2961                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2962         }
2963
2964         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2965                                                                   outdata.dptr,
2966                                                                   outdata.dsize);
2967         talloc_free(outdata.dptr);
2968         if (*_ifaces == NULL) {
2969                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2970                                 "talloc_memdup size %u failed\n",
2971                                 (unsigned int)outdata.dsize));
2972                 return -1;
2973         }
2974
2975         return 0;
2976 }
2977
2978 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2979                              struct timeval timeout, uint32_t destnode,
2980                              TALLOC_CTX *mem_ctx,
2981                              const struct ctdb_control_iface_info *info)
2982 {
2983         int ret;
2984         TDB_DATA indata;
2985         int32_t res;
2986
2987         indata.dptr = discard_const_p(uint8_t, info);
2988         indata.dsize = sizeof(*info);
2989
2990         ret = ctdb_control(ctdb, destnode, 0,
2991                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2992                            mem_ctx, NULL, &res, &timeout, NULL);
2993         if (ret != 0 || res != 0) {
2994                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2995                                 "failed ret:%d res:%d\n",
2996                                 ret, res));
2997                 return -1;
2998         }
2999
3000         return 0;
3001 }
3002
3003 /*
3004   set/clear the permanent disabled bit on a remote node
3005  */
3006 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
3007                        uint32_t set, uint32_t clear)
3008 {
3009         int ret;
3010         TDB_DATA data;
3011         struct ctdb_node_map *nodemap=NULL;
3012         struct ctdb_node_flag_change c;
3013         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3014         uint32_t recmaster;
3015         uint32_t *nodes;
3016
3017
3018         /* find the recovery master */
3019         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
3020         if (ret != 0) {
3021                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3022                 talloc_free(tmp_ctx);
3023                 return ret;
3024         }
3025
3026
3027         /* read the node flags from the recmaster */
3028         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
3029         if (ret != 0) {
3030                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
3031                 talloc_free(tmp_ctx);
3032                 return -1;
3033         }
3034         if (destnode >= nodemap->num) {
3035                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
3036                 talloc_free(tmp_ctx);
3037                 return -1;
3038         }
3039
3040         c.pnn       = destnode;
3041         c.old_flags = nodemap->nodes[destnode].flags;
3042         c.new_flags = c.old_flags;
3043         c.new_flags |= set;
3044         c.new_flags &= ~clear;
3045
3046         data.dsize = sizeof(c);
3047         data.dptr = (unsigned char *)&c;
3048
3049         /* send the flags update to all connected nodes */
3050         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3051
3052         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3053                                         nodes, 0,
3054                                         timeout, false, data,
3055                                         NULL, NULL,
3056                                         NULL) != 0) {
3057                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3058
3059                 talloc_free(tmp_ctx);
3060                 return -1;
3061         }
3062
3063         talloc_free(tmp_ctx);
3064         return 0;
3065 }
3066
3067
3068 /*
3069   get all tunables
3070  */
3071 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3072                                struct timeval timeout, 
3073                                uint32_t destnode,
3074                                struct ctdb_tunable *tunables)
3075 {
3076         TDB_DATA outdata;
3077         int ret;
3078         int32_t res;
3079
3080         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3081                            &outdata, &res, &timeout, NULL);
3082         if (ret != 0 || res != 0) {
3083                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3084                 return -1;
3085         }
3086
3087         if (outdata.dsize != sizeof(*tunables)) {
3088                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3089                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3090                 return -1;              
3091         }
3092
3093         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3094         talloc_free(outdata.dptr);
3095         return 0;
3096 }
3097
3098 /*
3099   add a public address to a node
3100  */
3101 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3102                       struct timeval timeout, 
3103                       uint32_t destnode,
3104                       struct ctdb_control_ip_iface *pub)
3105 {
3106         TDB_DATA data;
3107         int32_t res;
3108         int ret;
3109
3110         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3111         data.dptr  = (unsigned char *)pub;
3112
3113         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3114                            NULL, &res, &timeout, NULL);
3115         if (ret != 0 || res != 0) {
3116                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3117                 return -1;
3118         }
3119
3120         return 0;
3121 }
3122
3123 /*
3124   delete a public address from a node
3125  */
3126 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3127                       struct timeval timeout, 
3128                       uint32_t destnode,
3129                       struct ctdb_control_ip_iface *pub)
3130 {
3131         TDB_DATA data;
3132         int32_t res;
3133         int ret;
3134
3135         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3136         data.dptr  = (unsigned char *)pub;
3137
3138         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3139                            NULL, &res, &timeout, NULL);
3140         if (ret != 0 || res != 0) {
3141                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3142                 return -1;
3143         }
3144
3145         return 0;
3146 }
3147
3148 /*
3149   kill a tcp connection
3150  */
3151 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3152                       struct timeval timeout, 
3153                       uint32_t destnode,
3154                       struct ctdb_control_killtcp *killtcp)
3155 {
3156         TDB_DATA data;
3157         int32_t res;
3158         int ret;
3159
3160         data.dsize = sizeof(struct ctdb_control_killtcp);
3161         data.dptr  = (unsigned char *)killtcp;
3162
3163         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3164                            NULL, &res, &timeout, NULL);
3165         if (ret != 0 || res != 0) {
3166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3167                 return -1;
3168         }
3169
3170         return 0;
3171 }
3172
3173 /*
3174   send a gratious arp
3175  */
3176 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3177                       struct timeval timeout, 
3178                       uint32_t destnode,
3179                       ctdb_sock_addr *addr,
3180                       const char *ifname)
3181 {
3182         TDB_DATA data;
3183         int32_t res;
3184         int ret, len;
3185         struct ctdb_control_gratious_arp *gratious_arp;
3186         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3187
3188
3189         len = strlen(ifname)+1;
3190         gratious_arp = talloc_size(tmp_ctx, 
3191                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3192         CTDB_NO_MEMORY(ctdb, gratious_arp);
3193
3194         gratious_arp->addr = *addr;
3195         gratious_arp->len = len;
3196         memcpy(&gratious_arp->iface[0], ifname, len);
3197
3198
3199         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3200         data.dptr  = (unsigned char *)gratious_arp;
3201
3202         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3203                            NULL, &res, &timeout, NULL);
3204         if (ret != 0 || res != 0) {
3205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3206                 talloc_free(tmp_ctx);
3207                 return -1;
3208         }
3209
3210         talloc_free(tmp_ctx);
3211         return 0;
3212 }
3213
3214 /*
3215   get a list of all tcp tickles that a node knows about for a particular vnn
3216  */
3217 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3218                               struct timeval timeout, uint32_t destnode, 
3219                               TALLOC_CTX *mem_ctx, 
3220                               ctdb_sock_addr *addr,
3221                               struct ctdb_control_tcp_tickle_list **list)
3222 {
3223         int ret;
3224         TDB_DATA data, outdata;
3225         int32_t status;
3226
3227         data.dptr = (uint8_t*)addr;
3228         data.dsize = sizeof(ctdb_sock_addr);
3229
3230         ret = ctdb_control(ctdb, destnode, 0, 
3231                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3232                            mem_ctx, &outdata, &status, NULL, NULL);
3233         if (ret != 0 || status != 0) {
3234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3235                 return -1;
3236         }
3237
3238         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3239
3240         return status;
3241 }
3242
3243 /*
3244   register a server id
3245  */
3246 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3247                       struct timeval timeout, 
3248                       struct ctdb_server_id *id)
3249 {
3250         TDB_DATA data;
3251         int32_t res;
3252         int ret;
3253
3254         data.dsize = sizeof(struct ctdb_server_id);
3255         data.dptr  = (unsigned char *)id;
3256
3257         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3258                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3259                         0, data, NULL,
3260                         NULL, &res, &timeout, NULL);
3261         if (ret != 0 || res != 0) {
3262                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3263                 return -1;
3264         }
3265
3266         return 0;
3267 }
3268
3269 /*
3270   unregister a server id
3271  */
3272 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3273                       struct timeval timeout, 
3274                       struct ctdb_server_id *id)
3275 {
3276         TDB_DATA data;
3277         int32_t res;
3278         int ret;
3279
3280         data.dsize = sizeof(struct ctdb_server_id);
3281         data.dptr  = (unsigned char *)id;
3282
3283         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3284                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3285                         0, data, NULL,
3286                         NULL, &res, &timeout, NULL);
3287         if (ret != 0 || res != 0) {
3288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3289                 return -1;
3290         }
3291
3292         return 0;
3293 }
3294
3295
3296 /*
3297   check if a server id exists
3298
3299   if a server id does exist, return *status == 1, otherwise *status == 0
3300  */
3301 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3302                       struct timeval timeout, 
3303                       uint32_t destnode,
3304                       struct ctdb_server_id *id,
3305                       uint32_t *status)
3306 {
3307         TDB_DATA data;
3308         int32_t res;
3309         int ret;
3310
3311         data.dsize = sizeof(struct ctdb_server_id);
3312         data.dptr  = (unsigned char *)id;
3313
3314         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3315                         0, data, NULL,
3316                         NULL, &res, &timeout, NULL);
3317         if (ret != 0) {
3318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3319                 return -1;
3320         }
3321
3322         if (res) {
3323                 *status = 1;
3324         } else {
3325                 *status = 0;
3326         }
3327
3328         return 0;
3329 }
3330
3331 /*
3332    get the list of server ids that are registered on a node
3333 */
3334 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3335                 TALLOC_CTX *mem_ctx,
3336                 struct timeval timeout, uint32_t destnode, 
3337                 struct ctdb_server_id_list **svid_list)
3338 {
3339         int ret;
3340         TDB_DATA outdata;
3341         int32_t res;
3342
3343         ret = ctdb_control(ctdb, destnode, 0, 
3344                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3345                            mem_ctx, &outdata, &res, &timeout, NULL);
3346         if (ret != 0 || res != 0) {
3347                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3348                 return -1;
3349         }
3350
3351         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3352                     
3353         return 0;
3354 }
3355
3356 /*
3357   initialise the ctdb daemon for client applications
3358
3359   NOTE: In current code the daemon does not fork. This is for testing purposes only
3360   and to simplify the code.
3361 */
3362 struct ctdb_context *ctdb_init(struct event_context *ev)
3363 {
3364         int ret;
3365         struct ctdb_context *ctdb;
3366
3367         ctdb = talloc_zero(ev, struct ctdb_context);
3368         if (ctdb == NULL) {
3369                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3370                 return NULL;
3371         }
3372         ctdb->ev  = ev;
3373         ctdb->idr = idr_init(ctdb);
3374         /* Wrap early to exercise code. */
3375         ctdb->lastid = INT_MAX-200;
3376         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3377
3378         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3379         if (ret != 0) {
3380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3381                 talloc_free(ctdb);
3382                 return NULL;
3383         }
3384
3385         ctdb->statistics.statistics_start_time = timeval_current();
3386
3387         return ctdb;
3388 }
3389
3390
3391 /*
3392   set some ctdb flags
3393 */
3394 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3395 {
3396         ctdb->flags |= flags;
3397 }
3398
3399 /*
3400   setup the local socket name
3401 */
3402 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3403 {
3404         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3405         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3406
3407         return 0;
3408 }
3409
3410 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3411 {
3412         return ctdb->daemon.name;
3413 }
3414
3415 /*
3416   return the pnn of this node
3417 */
3418 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3419 {
3420         return ctdb->pnn;
3421 }
3422
3423
3424 /*
3425   get the uptime of a remote node
3426  */
3427 struct ctdb_client_control_state *
3428 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3429 {
3430         return ctdb_control_send(ctdb, destnode, 0, 
3431                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3432                            mem_ctx, &timeout, NULL);
3433 }
3434
3435 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3436 {
3437         int ret;
3438         int32_t res;
3439         TDB_DATA outdata;
3440
3441         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3442         if (ret != 0 || res != 0) {
3443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3444                 return -1;
3445         }
3446
3447         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3448
3449         return 0;
3450 }
3451
3452 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3453 {
3454         struct ctdb_client_control_state *state;
3455
3456         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3457         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3458 }
3459
3460 /*
3461   send a control to execute the "recovered" event script on a node
3462  */
3463 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3464 {
3465         int ret;
3466         int32_t status;
3467
3468         ret = ctdb_control(ctdb, destnode, 0, 
3469                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3470                            NULL, NULL, &status, &timeout, NULL);
3471         if (ret != 0 || status != 0) {
3472                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3473                 return -1;
3474         }
3475
3476         return 0;
3477 }
3478
3479 /* 
3480   callback for the async helpers used when sending the same control
3481   to multiple nodes in parallell.
3482 */
3483 static void async_callback(struct ctdb_client_control_state *state)
3484 {
3485         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3486         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3487         int ret;
3488         TDB_DATA outdata;
3489         int32_t res = -1;
3490         uint32_t destnode = state->c->hdr.destnode;
3491
3492         outdata.dsize = 0;
3493         outdata.dptr = NULL;
3494
3495         /* one more node has responded with recmode data */
3496         data->count--;
3497
3498         /* if we failed to push the db, then return an error and let
3499            the main loop try again.
3500         */
3501         if (state->state != CTDB_CONTROL_DONE) {
3502                 if ( !data->dont_log_errors) {
3503                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3504                 }
3505                 data->fail_count++;
3506                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3507                         res = -ETIME;
3508                 } else {
3509                         res = -1;
3510                 }
3511                 if (data->fail_callback) {
3512                         data->fail_callback(ctdb, destnode, res, outdata,
3513                                         data->callback_data);
3514                 }
3515                 return;
3516         }
3517         
3518         state->async.fn = NULL;
3519
3520         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3521         if ((ret != 0) || (res != 0)) {
3522                 if ( !data->dont_log_errors) {
3523                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3524                 }
3525                 data->fail_count++;
3526                 if (data->fail_callback) {
3527                         data->fail_callback(ctdb, destnode, res, outdata,
3528                                         data->callback_data);
3529                 }
3530         }
3531         if ((ret == 0) && (data->callback != NULL)) {
3532                 data->callback(ctdb, destnode, res, outdata,
3533                                         data->callback_data);
3534         }
3535 }
3536
3537
3538 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3539 {
3540         /* set up the callback functions */
3541         state->async.fn = async_callback;
3542         state->async.private_data = data;
3543         
3544         /* one more control to wait for to complete */
3545         data->count++;
3546 }
3547
3548
3549 /* wait for up to the maximum number of seconds allowed
3550    or until all nodes we expect a response from has replied
3551 */
3552 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3553 {
3554         while (data->count > 0) {
3555                 event_loop_once(ctdb->ev);
3556         }
3557         if (data->fail_count != 0) {
3558                 if (!data->dont_log_errors) {
3559                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3560                                  data->fail_count));
3561                 }
3562                 return -1;
3563         }
3564         return 0;
3565 }
3566
3567
3568 /* 
3569    perform a simple control on the listed nodes
3570    The control cannot return data
3571  */
3572 int ctdb_client_async_control(struct ctdb_context *ctdb,
3573                                 enum ctdb_controls opcode,
3574                                 uint32_t *nodes,
3575                                 uint64_t srvid,
3576                                 struct timeval timeout,
3577                                 bool dont_log_errors,
3578                                 TDB_DATA data,
3579                                 client_async_callback client_callback,
3580                                 client_async_callback fail_callback,
3581                                 void *callback_data)
3582 {
3583         struct client_async_data *async_data;
3584         struct ctdb_client_control_state *state;
3585         int j, num_nodes;
3586
3587         async_data = talloc_zero(ctdb, struct client_async_data);
3588         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3589         async_data->dont_log_errors = dont_log_errors;
3590         async_data->callback = client_callback;
3591         async_data->fail_callback = fail_callback;
3592         async_data->callback_data = callback_data;
3593         async_data->opcode        = opcode;
3594
3595         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3596
3597         /* loop over all nodes and send an async control to each of them */
3598         for (j=0; j<num_nodes; j++) {
3599                 uint32_t pnn = nodes[j];
3600
3601                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3602                                           0, data, async_data, &timeout, NULL);
3603                 if (state == NULL) {
3604                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3605                         talloc_free(async_data);
3606                         return -1;
3607                 }
3608                 
3609                 ctdb_client_async_add(async_data, state);
3610         }
3611
3612         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3613                 talloc_free(async_data);
3614                 return -1;
3615         }
3616
3617         talloc_free(async_data);
3618         return 0;
3619 }
3620
3621 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3622                                 struct ctdb_vnn_map *vnn_map,
3623                                 TALLOC_CTX *mem_ctx,
3624                                 bool include_self)
3625 {
3626         int i, j, num_nodes;
3627         uint32_t *nodes;
3628
3629         for (i=num_nodes=0;i<vnn_map->size;i++) {
3630                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3631                         continue;
3632                 }
3633                 num_nodes++;
3634         } 
3635
3636         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3637         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3638
3639         for (i=j=0;i<vnn_map->size;i++) {
3640                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3641                         continue;
3642                 }
3643                 nodes[j++] = vnn_map->map[i];
3644         } 
3645
3646         return nodes;
3647 }
3648
3649 /* Get list of nodes not including those with flags specified by mask.
3650  * If exclude_pnn is not -1 then exclude that pnn from the list.
3651  */
3652 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3653                         struct ctdb_node_map *node_map,
3654                         TALLOC_CTX *mem_ctx,
3655                         uint32_t mask,
3656                         int exclude_pnn)
3657 {
3658         int i, j, num_nodes;
3659         uint32_t *nodes;
3660
3661         for (i=num_nodes=0;i<node_map->num;i++) {
3662                 if (node_map->nodes[i].flags & mask) {
3663                         continue;
3664                 }
3665                 if (node_map->nodes[i].pnn == exclude_pnn) {
3666                         continue;
3667                 }
3668                 num_nodes++;
3669         } 
3670
3671         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3672         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3673
3674         for (i=j=0;i<node_map->num;i++) {
3675                 if (node_map->nodes[i].flags & mask) {
3676                         continue;
3677                 }
3678                 if (node_map->nodes[i].pnn == exclude_pnn) {
3679                         continue;
3680                 }
3681                 nodes[j++] = node_map->nodes[i].pnn;
3682         } 
3683
3684         return nodes;
3685 }
3686
3687 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3688                                 struct ctdb_node_map *node_map,
3689                                 TALLOC_CTX *mem_ctx,
3690                                 bool include_self)
3691 {
3692         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3693                              include_self ? -1 : ctdb->pnn);
3694 }
3695
3696 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3697                                 struct ctdb_node_map *node_map,
3698                                 TALLOC_CTX *mem_ctx,
3699                                 bool include_self)
3700 {
3701         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3702                              include_self ? -1 : ctdb->pnn);
3703 }
3704
3705 /* 
3706   this is used to test if a pnn lock exists and if it exists will return
3707   the number of connections that pnn has reported or -1 if that recovery
3708   daemon is not running.
3709 */
3710 int
3711 ctdb_read_pnn_lock(int fd, int32_t pnn)
3712 {
3713         struct flock lock;
3714         char c;
3715
3716         lock.l_type = F_WRLCK;
3717         lock.l_whence = SEEK_SET;
3718         lock.l_start = pnn;
3719         lock.l_len = 1;
3720         lock.l_pid = 0;
3721
3722         if (fcntl(fd, F_GETLK, &lock) != 0) {
3723                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3724                 return -1;
3725         }
3726
3727         if (lock.l_type == F_UNLCK) {
3728                 return -1;
3729         }
3730
3731         if (pread(fd, &c, 1, pnn) == -1) {
3732                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3733                 return -1;
3734         }
3735
3736         return c;
3737 }
3738
3739 /*
3740   get capabilities of a remote node
3741  */
3742 struct ctdb_client_control_state *
3743 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3744 {
3745         return ctdb_control_send(ctdb, destnode, 0, 
3746                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3747                            mem_ctx, &timeout, NULL);
3748 }
3749
3750 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3751 {
3752         int ret;
3753         int32_t res;
3754         TDB_DATA outdata;
3755
3756         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3757         if ( (ret != 0) || (res != 0) ) {
3758                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3759                 return -1;
3760         }
3761
3762         if (capabilities) {
3763                 *capabilities = *((uint32_t *)outdata.dptr);
3764         }
3765
3766         return 0;
3767 }
3768
3769 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3770 {
3771         struct ctdb_client_control_state *state;
3772         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3773         int ret;
3774
3775         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3776         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3777         talloc_free(tmp_ctx);
3778         return ret;
3779 }
3780
3781 struct server_id {
3782         uint64_t pid;
3783         uint32_t task_id;
3784         uint32_t vnn;
3785         uint64_t unique_id;
3786 };
3787
3788 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3789 {
3790         struct server_id id;
3791
3792         id.pid = getpid();
3793         id.task_id = reqid;
3794         id.vnn = ctdb_get_pnn(ctdb);
3795         id.unique_id = id.vnn;
3796         id.unique_id = (id.unique_id << 32) | reqid;
3797
3798         return id;
3799 }
3800
3801 /* This is basically a copy from Samba's server_id.*.  However, a
3802  * dependency chain stops us from using Samba's version, so use a
3803  * renamed copy until a better solution is found. */
3804 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3805 {
3806         if (id1->pid != id2->pid) {
3807                 return false;
3808         }
3809
3810         if (id1->task_id != id2->task_id) {
3811                 return false;
3812         }
3813
3814         if (id1->vnn != id2->vnn) {
3815                 return false;
3816         }
3817
3818         if (id1->unique_id != id2->unique_id) {
3819                 return false;
3820         }
3821
3822         return true;
3823 }
3824
3825 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3826 {
3827         struct ctdb_server_id sid;
3828         int ret;
3829         uint32_t result;
3830
3831         sid.type = SERVER_TYPE_SAMBA;
3832         sid.pnn = id->vnn;
3833         sid.server_id = id->pid;
3834
3835         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3836                                         id->vnn, &sid, &result);
3837         if (ret != 0) {
3838                 /* If control times out, assume server_id exists. */
3839                 return true;
3840         }
3841
3842         if (result) {
3843                 return true;
3844         }
3845
3846         return false;
3847 }
3848
3849
3850 enum g_lock_type {
3851         G_LOCK_READ = 0,
3852         G_LOCK_WRITE = 1,
3853 };
3854
3855 struct g_lock_rec {
3856         enum g_lock_type type;
3857         struct server_id id;
3858 };
3859
3860 struct g_lock_recs {
3861         unsigned int num;
3862         struct g_lock_rec *lock;
3863 };
3864
3865 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3866                          struct g_lock_recs **locks)
3867 {
3868         struct g_lock_recs *recs;
3869
3870         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3871         if (recs == NULL) {
3872                 return false;
3873         }
3874
3875         if (data.dsize == 0) {
3876                 goto done;
3877         }
3878
3879         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3880                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3881                                   (unsigned long)data.dsize));
3882                 talloc_free(recs);
3883                 return false;
3884         }
3885
3886         recs->num = data.dsize / sizeof(struct g_lock_rec);
3887         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3888         if (recs->lock == NULL) {
3889                 talloc_free(recs);
3890                 return false;
3891         }
3892
3893 done:
3894         if (locks != NULL) {
3895                 *locks = recs;
3896         }
3897
3898         return true;
3899 }
3900
3901
3902 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3903                         struct ctdb_db_context *ctdb_db,
3904                         const char *keyname, uint32_t reqid)
3905 {
3906         TDB_DATA key, data;
3907         struct ctdb_record_handle *h;
3908         struct g_lock_recs *locks;
3909         struct server_id id;
3910         struct timeval t_start;
3911         int i;
3912
3913         key.dptr = (uint8_t *)discard_const(keyname);
3914         key.dsize = strlen(keyname) + 1;
3915
3916         t_start = timeval_current();
3917
3918 again:
3919         /* Keep trying for an hour. */
3920         if (timeval_elapsed(&t_start) > 3600) {
3921                 return false;
3922         }
3923
3924         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3925         if (h == NULL) {
3926                 return false;
3927         }
3928
3929         if (!g_lock_parse(h, data, &locks)) {
3930                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3931                 talloc_free(data.dptr);
3932                 talloc_free(h);
3933                 return false;
3934         }
3935
3936         talloc_free(data.dptr);
3937
3938         id = server_id_get(ctdb_db->ctdb, reqid);
3939
3940         i = 0;
3941         while (i < locks->num) {
3942                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3943                         /* Internal error */
3944                         talloc_free(h);
3945                         return false;
3946                 }
3947
3948                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3949                         if (i < locks->num-1) {
3950                                 locks->lock[i] = locks->lock[locks->num-1];
3951                         }
3952                         locks->num--;
3953                         continue;
3954                 }
3955
3956                 /* This entry is locked. */
3957                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3958                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3959                                    (unsigned long long)id.pid,
3960                                    id.task_id, id.vnn,
3961                                    (unsigned long long)id.unique_id));
3962                 talloc_free(h);
3963                 goto again;
3964         }
3965
3966         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3967                                      locks->num+1);
3968         if (locks->lock == NULL) {
3969                 talloc_free(h);
3970                 return false;
3971         }
3972
3973         locks->lock[locks->num].type = G_LOCK_WRITE;
3974         locks->lock[locks->num].id = id;
3975         locks->num++;
3976
3977         data.dptr = (uint8_t *)locks->lock;
3978         data.dsize = locks->num * sizeof(struct g_lock_rec);
3979
3980         if (ctdb_record_store(h, data) != 0) {
3981                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3982                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3983                                   (unsigned long long)id.pid,
3984                                   id.task_id, id.vnn,
3985                                   (unsigned long long)id.unique_id));
3986                 talloc_free(h);
3987                 return false;
3988         }
3989
3990         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3991                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3992                            (unsigned long long)id.pid,
3993                            id.task_id, id.vnn,
3994                            (unsigned long long)id.unique_id));
3995
3996         talloc_free(h);
3997         return true;
3998 }
3999
4000 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
4001                           struct ctdb_db_context *ctdb_db,
4002                           const char *keyname, uint32_t reqid)
4003 {
4004         TDB_DATA key, data;
4005         struct ctdb_record_handle *h;
4006         struct g_lock_recs *locks;
4007         struct server_id id;
4008         int i;
4009         bool found = false;
4010
4011         key.dptr = (uint8_t *)discard_const(keyname);
4012         key.dsize = strlen(keyname) + 1;
4013         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4014         if (h == NULL) {
4015                 return false;
4016         }
4017
4018         if (!g_lock_parse(h, data, &locks)) {
4019                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4020                 talloc_free(data.dptr);
4021                 talloc_free(h);
4022                 return false;
4023         }
4024
4025         talloc_free(data.dptr);
4026
4027         id = server_id_get(ctdb_db->ctdb, reqid);
4028
4029         for (i=0; i<locks->num; i++) {
4030                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4031                         if (i < locks->num-1) {
4032                                 locks->lock[i] = locks->lock[locks->num-1];
4033                         }
4034                         locks->num--;
4035                         found = true;
4036                         break;
4037                 }
4038         }
4039
4040         if (!found) {
4041                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4042                 talloc_free(h);
4043                 return false;
4044         }
4045
4046         data.dptr = (uint8_t *)locks->lock;
4047         data.dsize = locks->num * sizeof(struct g_lock_rec);
4048
4049         if (ctdb_record_store(h, data) != 0) {
4050                 talloc_free(h);
4051                 return false;
4052         }
4053
4054         talloc_free(h);
4055         return true;
4056 }
4057
4058
4059 struct ctdb_transaction_handle {
4060         struct ctdb_db_context *ctdb_db;
4061         struct ctdb_db_context *g_lock_db;
4062         char *lock_name;
4063         uint32_t reqid;
4064         /*
4065          * we store reads and writes done under a transaction:
4066          * - one list stores both reads and writes (m_all)
4067          * - the other just writes (m_write)
4068          */
4069         struct ctdb_marshall_buffer *m_all;
4070         struct ctdb_marshall_buffer *m_write;
4071 };
4072
4073 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4074 {
4075         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4076         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4077         return 0;
4078 }
4079
4080
4081 /**
4082  * start a transaction on a database
4083  */
4084 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4085                                                        TALLOC_CTX *mem_ctx)
4086 {
4087         struct ctdb_transaction_handle *h;
4088         struct ctdb_server_id id;
4089
4090         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4091         if (h == NULL) {
4092                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4093                 return NULL;
4094         }
4095
4096         h->ctdb_db = ctdb_db;
4097         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4098                                        (unsigned int)ctdb_db->db_id);
4099         if (h->lock_name == NULL) {
4100                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4101                 talloc_free(h);
4102                 return NULL;
4103         }
4104
4105         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4106                                    "g_lock.tdb", false, 0);
4107         if (!h->g_lock_db) {
4108                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4109                 talloc_free(h);
4110                 return NULL;
4111         }
4112
4113         id.type = SERVER_TYPE_SAMBA;
4114         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4115         id.server_id = getpid();
4116
4117         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4118                                          &id) != 0) {
4119                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4120                 talloc_free(h);
4121                 return NULL;
4122         }
4123
4124         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4125
4126         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4127                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4128                 talloc_free(h);
4129                 return NULL;
4130         }
4131
4132         talloc_set_destructor(h, ctdb_transaction_destructor);
4133         return h;
4134 }
4135
4136 /**
4137  * fetch a record inside a transaction
4138  */
4139 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4140                            TALLOC_CTX *mem_ctx,
4141                            TDB_DATA key, TDB_DATA *data)
4142 {
4143         struct ctdb_ltdb_header header;
4144         int ret;
4145
4146         ZERO_STRUCT(header);
4147
4148         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4149         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4150                 /* record doesn't exist yet */
4151                 *data = tdb_null;
4152                 ret = 0;
4153         }
4154
4155         if (ret != 0) {
4156                 return ret;
4157         }
4158
4159         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4160         if (h->m_all == NULL) {
4161                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4162                 return -1;
4163         }
4164
4165         return 0;
4166 }
4167
4168 /**
4169  * stores a record inside a transaction
4170  */
4171 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4172                            TDB_DATA key, TDB_DATA data)
4173 {
4174         TALLOC_CTX *tmp_ctx = talloc_new(h);
4175         struct ctdb_ltdb_header header;
4176         TDB_DATA olddata;
4177         int ret;
4178
4179         /* we need the header so we can update the RSN */
4180         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4181         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4182                 /* the record doesn't exist - create one with us as dmaster.
4183                    This is only safe because we are in a transaction and this
4184                    is a persistent database */
4185                 ZERO_STRUCT(header);
4186         } else if (ret != 0) {
4187                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4188                 talloc_free(tmp_ctx);
4189                 return ret;
4190         }
4191
4192         if (data.dsize == olddata.dsize &&
4193             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4194             header.rsn != 0) {
4195                 /* save writing the same data */
4196                 talloc_free(tmp_ctx);
4197                 return 0;
4198         }
4199
4200         header.dmaster = h->ctdb_db->ctdb->pnn;
4201         header.rsn++;
4202
4203         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4204         if (h->m_all == NULL) {
4205                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4206                 talloc_free(tmp_ctx);
4207                 return -1;
4208         }
4209
4210         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4211         if (h->m_write == NULL) {
4212                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4213                 talloc_free(tmp_ctx);
4214                 return -1;
4215         }
4216
4217         talloc_free(tmp_ctx);
4218         return 0;
4219 }
4220
4221 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4222 {
4223         const char *keyname = CTDB_DB_SEQNUM_KEY;
4224         TDB_DATA key, data;
4225         struct ctdb_ltdb_header header;
4226         int ret;
4227
4228         key.dptr = (uint8_t *)discard_const(keyname);
4229         key.dsize = strlen(keyname) + 1;
4230
4231         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4232         if (ret != 0) {
4233                 *seqnum = 0;
4234                 return 0;
4235         }
4236
4237         if (data.dsize == 0) {
4238                 *seqnum = 0;
4239                 return 0;
4240         }
4241
4242         if (data.dsize != sizeof(*seqnum)) {
4243                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4244                                   data.dsize));
4245                 talloc_free(data.dptr);
4246                 return -1;
4247         }
4248
4249         *seqnum = *(uint64_t *)data.dptr;
4250         talloc_free(data.dptr);
4251
4252         return 0;
4253 }
4254
4255
4256 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4257                                 uint64_t seqnum)
4258 {
4259         const char *keyname = CTDB_DB_SEQNUM_KEY;
4260         TDB_DATA key, data;
4261
4262         key.dptr = (uint8_t *)discard_const(keyname);
4263         key.dsize = strlen(keyname) + 1;
4264
4265         data.dptr = (uint8_t *)&seqnum;
4266         data.dsize = sizeof(seqnum);
4267
4268         return ctdb_transaction_store(h, key, data);
4269 }
4270
4271
4272 /**
4273  * commit a transaction
4274  */
4275 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4276 {
4277         int ret;
4278         uint64_t old_seqnum, new_seqnum;
4279         int32_t status;
4280         struct timeval timeout;
4281
4282         if (h->m_write == NULL) {
4283                 /* no changes were made */
4284                 talloc_free(h);
4285                 return 0;
4286         }
4287
4288         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4289         if (ret != 0) {
4290                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4291                 ret = -1;
4292                 goto done;
4293         }
4294
4295         new_seqnum = old_seqnum + 1;
4296         ret = ctdb_store_db_seqnum(h, new_seqnum);
4297         if (ret != 0) {
4298                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4299                 ret = -1;
4300                 goto done;
4301         }
4302
4303 again:
4304         timeout = timeval_current_ofs(3,0);
4305         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4306                            h->ctdb_db->db_id,
4307                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4308                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4309                            &status, &timeout, NULL);
4310         if (ret != 0 || status != 0) {
4311                 /*
4312                  * TRANS3_COMMIT control will only fail if recovery has been
4313                  * triggered.  Check if the database has been updated or not.
4314                  */
4315                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4316                 if (ret != 0) {
4317                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4318                         goto done;
4319                 }
4320
4321                 if (new_seqnum == old_seqnum) {
4322                         /* Database not yet updated, try again */
4323                         goto again;
4324                 }
4325
4326                 if (new_seqnum != (old_seqnum + 1)) {
4327                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4328                                           (long long unsigned)new_seqnum,
4329                                           (long long unsigned)old_seqnum));
4330                         ret = -1;
4331                         goto done;
4332                 }
4333         }
4334
4335         ret = 0;
4336
4337 done:
4338         talloc_free(h);
4339         return ret;
4340 }
4341
4342 /**
4343  * cancel a transaction
4344  */
4345 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4346 {
4347         talloc_free(h);
4348         return 0;
4349 }
4350
4351
4352 /*
4353   recovery daemon ping to main daemon
4354  */
4355 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4356 {
4357         int ret;
4358         int32_t res;
4359
4360         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4361                            ctdb, NULL, &res, NULL, NULL);
4362         if (ret != 0 || res != 0) {
4363                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4364                 return -1;
4365         }
4366
4367         return 0;
4368 }
4369
4370 /* When forking the main daemon and the child process needs to connect
4371  * back to the daemon as a client process, this function can be used
4372  * to change the ctdb context from daemon into client mode.  The child
4373  * process must be created using ctdb_fork() and not fork() -
4374  * ctdb_fork() does some necessary housekeeping.
4375  */
4376 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4377 {
4378         int ret;
4379         va_list ap;
4380
4381         /* Add extra information so we can identify this in the logs */
4382         va_start(ap, fmt);
4383         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4384         va_end(ap);
4385
4386         /* get a new event context */
4387         ctdb->ev = event_context_init(ctdb);
4388         tevent_loop_allow_nesting(ctdb->ev);
4389
4390         /* Connect to main CTDB daemon */
4391         ret = ctdb_socket_connect(ctdb);
4392         if (ret != 0) {
4393                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4394                 return -1;
4395         }
4396
4397         ctdb->can_send_controls = true;
4398
4399         return 0;
4400 }
4401
4402 /*
4403   get the status of running the monitor eventscripts: NULL means never run.
4404  */
4405 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4406                 struct timeval timeout, uint32_t destnode, 
4407                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4408                 struct ctdb_scripts_wire **scripts)
4409 {
4410         int ret;
4411         TDB_DATA outdata, indata;
4412         int32_t res;
4413         uint32_t uinttype = type;
4414
4415         indata.dptr = (uint8_t *)&uinttype;
4416         indata.dsize = sizeof(uinttype);
4417
4418         ret = ctdb_control(ctdb, destnode, 0, 
4419                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4420                            mem_ctx, &outdata, &res, &timeout, NULL);
4421         if (ret != 0 || res != 0) {
4422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4423                 return -1;
4424         }
4425
4426         if (outdata.dsize == 0) {
4427                 *scripts = NULL;
4428         } else {
4429                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4430                 talloc_free(outdata.dptr);
4431         }
4432                     
4433         return 0;
4434 }
4435
4436 /*
4437   tell the main daemon how long it took to lock the reclock file
4438  */
4439 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4440 {
4441         int ret;
4442         int32_t res;
4443         TDB_DATA data;
4444
4445         data.dptr = (uint8_t *)&latency;
4446         data.dsize = sizeof(latency);
4447
4448         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4449                            ctdb, NULL, &res, NULL, NULL);
4450         if (ret != 0 || res != 0) {
4451                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4452                 return -1;
4453         }
4454
4455         return 0;
4456 }
4457
4458 /*
4459   get the name of the reclock file
4460  */
4461 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4462                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4463                          const char **name)
4464 {
4465         int ret;
4466         int32_t res;
4467         TDB_DATA data;
4468
4469         ret = ctdb_control(ctdb, destnode, 0, 
4470                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4471                            mem_ctx, &data, &res, &timeout, NULL);
4472         if (ret != 0 || res != 0) {
4473                 return -1;
4474         }
4475
4476         if (data.dsize == 0) {
4477                 *name = NULL;
4478         } else {
4479                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4480         }
4481         talloc_free(data.dptr);
4482
4483         return 0;
4484 }
4485
4486 /*
4487   set the reclock filename for a node
4488  */
4489 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4490 {
4491         int ret;
4492         TDB_DATA data;
4493         int32_t res;
4494
4495         if (reclock == NULL) {
4496                 data.dsize = 0;
4497                 data.dptr  = NULL;
4498         } else {
4499                 data.dsize = strlen(reclock) + 1;
4500                 data.dptr  = discard_const(reclock);
4501         }
4502
4503         ret = ctdb_control(ctdb, destnode, 0, 
4504                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4505                            NULL, NULL, &res, &timeout, NULL);
4506         if (ret != 0 || res != 0) {
4507                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4508                 return -1;
4509         }
4510
4511         return 0;
4512 }
4513
4514 /*
4515   stop a node
4516  */
4517 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4518 {
4519         int ret;
4520         int32_t res;
4521
4522         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4523                            ctdb, NULL, &res, &timeout, NULL);
4524         if (ret != 0 || res != 0) {
4525                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4526                 return -1;
4527         }
4528
4529         return 0;
4530 }
4531
4532 /*
4533   continue a node
4534  */
4535 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4536 {
4537         int ret;
4538
4539         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4540                            ctdb, NULL, NULL, &timeout, NULL);
4541         if (ret != 0) {
4542                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4543                 return -1;
4544         }
4545
4546         return 0;
4547 }
4548
4549 /*
4550   set the natgw state for a node
4551  */
4552 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4553 {
4554         int ret;
4555         TDB_DATA data;
4556         int32_t res;
4557
4558         data.dsize = sizeof(natgwstate);
4559         data.dptr  = (uint8_t *)&natgwstate;
4560
4561         ret = ctdb_control(ctdb, destnode, 0, 
4562                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4563                            NULL, NULL, &res, &timeout, NULL);
4564         if (ret != 0 || res != 0) {
4565                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4566                 return -1;
4567         }
4568
4569         return 0;
4570 }
4571
4572 /*
4573   set the lmaster role for a node
4574  */
4575 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4576 {
4577         int ret;
4578         TDB_DATA data;
4579         int32_t res;
4580
4581         data.dsize = sizeof(lmasterrole);
4582         data.dptr  = (uint8_t *)&lmasterrole;
4583
4584         ret = ctdb_control(ctdb, destnode, 0, 
4585                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4586                            NULL, NULL, &res, &timeout, NULL);
4587         if (ret != 0 || res != 0) {
4588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4589                 return -1;
4590         }
4591
4592         return 0;
4593 }
4594
4595 /*
4596   set the recmaster role for a node
4597  */
4598 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4599 {
4600         int ret;
4601         TDB_DATA data;
4602         int32_t res;
4603
4604         data.dsize = sizeof(recmasterrole);
4605         data.dptr  = (uint8_t *)&recmasterrole;
4606
4607         ret = ctdb_control(ctdb, destnode, 0, 
4608                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4609                            NULL, NULL, &res, &timeout, NULL);
4610         if (ret != 0 || res != 0) {
4611                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4612                 return -1;
4613         }
4614
4615         return 0;
4616 }
4617
4618 /* enable an eventscript
4619  */
4620 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4621 {
4622         int ret;
4623         TDB_DATA data;
4624         int32_t res;
4625
4626         data.dsize = strlen(script) + 1;
4627         data.dptr  = discard_const(script);
4628
4629         ret = ctdb_control(ctdb, destnode, 0, 
4630                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4631                            NULL, NULL, &res, &timeout, NULL);
4632         if (ret != 0 || res != 0) {
4633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4634                 return -1;
4635         }
4636
4637         return 0;
4638 }
4639
4640 /* disable an eventscript
4641  */
4642 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4643 {
4644         int ret;
4645         TDB_DATA data;
4646         int32_t res;
4647
4648         data.dsize = strlen(script) + 1;
4649         data.dptr  = discard_const(script);
4650
4651         ret = ctdb_control(ctdb, destnode, 0, 
4652                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4653                            NULL, NULL, &res, &timeout, NULL);
4654         if (ret != 0 || res != 0) {
4655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4656                 return -1;
4657         }
4658
4659         return 0;
4660 }
4661
4662
4663 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4664 {
4665         int ret;
4666         TDB_DATA data;
4667         int32_t res;
4668
4669         data.dsize = sizeof(*bantime);
4670         data.dptr  = (uint8_t *)bantime;
4671
4672         ret = ctdb_control(ctdb, destnode, 0, 
4673                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4674                            NULL, NULL, &res, &timeout, NULL);
4675         if (ret != 0 || res != 0) {
4676                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4677                 return -1;
4678         }
4679
4680         return 0;
4681 }
4682
4683
4684 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4685 {
4686         int ret;
4687         TDB_DATA outdata;
4688         int32_t res;
4689         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4690
4691         ret = ctdb_control(ctdb, destnode, 0, 
4692                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4693                            tmp_ctx, &outdata, &res, &timeout, NULL);
4694         if (ret != 0 || res != 0) {
4695                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4696                 talloc_free(tmp_ctx);
4697                 return -1;
4698         }
4699
4700         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4701         talloc_free(tmp_ctx);
4702
4703         return 0;
4704 }
4705
4706
4707 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4708 {
4709         int ret;
4710         int32_t res;
4711         TDB_DATA data;
4712         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4713
4714         data.dptr = (uint8_t*)db_prio;
4715         data.dsize = sizeof(*db_prio);
4716
4717         ret = ctdb_control(ctdb, destnode, 0, 
4718                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4719                            tmp_ctx, NULL, &res, &timeout, NULL);
4720         if (ret != 0 || res != 0) {
4721                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4722                 talloc_free(tmp_ctx);
4723                 return -1;
4724         }
4725
4726         talloc_free(tmp_ctx);
4727
4728         return 0;
4729 }
4730
4731 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4732 {
4733         int ret;
4734         int32_t res;
4735         TDB_DATA data;
4736         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4737
4738         data.dptr = (uint8_t*)&db_id;
4739         data.dsize = sizeof(db_id);
4740
4741         ret = ctdb_control(ctdb, destnode, 0, 
4742                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4743                            tmp_ctx, NULL, &res, &timeout, NULL);
4744         if (ret != 0 || res < 0) {
4745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4746                 talloc_free(tmp_ctx);
4747                 return -1;
4748         }
4749
4750         if (priority) {
4751                 *priority = res;
4752         }
4753
4754         talloc_free(tmp_ctx);
4755
4756         return 0;
4757 }
4758
4759 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4760 {
4761         int ret;
4762         TDB_DATA outdata;
4763         int32_t res;
4764
4765         ret = ctdb_control(ctdb, destnode, 0, 
4766                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4767                            mem_ctx, &outdata, &res, &timeout, NULL);
4768         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4769                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4770                 return -1;
4771         }
4772
4773         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4774         talloc_free(outdata.dptr);
4775                     
4776         return 0;
4777 }
4778
4779 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4780 {
4781         if (h == NULL) {
4782                 return NULL;
4783         }
4784
4785         return &h->header;
4786 }
4787
4788
4789 struct ctdb_client_control_state *
4790 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4791 {
4792         struct ctdb_client_control_state *handle;
4793         struct ctdb_marshall_buffer *m;
4794         struct ctdb_rec_data *rec;
4795         TDB_DATA outdata;
4796
4797         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4798         if (m == NULL) {
4799                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4800                 return NULL;
4801         }
4802
4803         m->db_id = ctdb_db->db_id;
4804
4805         rec = ctdb_marshall_record(m, 0, key, header, data);
4806         if (rec == NULL) {
4807                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4808                 talloc_free(m);
4809                 return NULL;
4810         }
4811         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4812         if (m == NULL) {
4813                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4814                 talloc_free(m);
4815                 return NULL;
4816         }
4817         m->count++;
4818         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4819
4820
4821         outdata.dptr = (uint8_t *)m;
4822         outdata.dsize = talloc_get_size(m);
4823
4824         handle = ctdb_control_send(ctdb, destnode, 0, 
4825                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4826                            mem_ctx, &timeout, NULL);
4827         talloc_free(m);
4828         return handle;
4829 }
4830
4831 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4832 {
4833         int ret;
4834         int32_t res;
4835
4836         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4837         if ( (ret != 0) || (res != 0) ){
4838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4839                 return -1;
4840         }
4841
4842         return 0;
4843 }
4844
4845 int
4846 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4847 {
4848         struct ctdb_client_control_state *state;
4849
4850         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4851         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4852 }
4853
4854
4855
4856
4857
4858
4859 /*
4860   set a database to be readonly
4861  */
4862 struct ctdb_client_control_state *
4863 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4864 {
4865         TDB_DATA data;
4866
4867         data.dptr = (uint8_t *)&dbid;
4868         data.dsize = sizeof(dbid);
4869
4870         return ctdb_control_send(ctdb, destnode, 0, 
4871                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4872                            ctdb, NULL, NULL);
4873 }
4874
4875 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4876 {
4877         int ret;
4878         int32_t res;
4879
4880         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4881         if (ret != 0 || res != 0) {
4882           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4883                 return -1;
4884         }
4885
4886         return 0;
4887 }
4888
4889 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4890 {
4891         struct ctdb_client_control_state *state;
4892
4893         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4894         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4895 }
4896
4897 /*
4898   set a database to be sticky
4899  */
4900 struct ctdb_client_control_state *
4901 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4902 {
4903         TDB_DATA data;
4904
4905         data.dptr = (uint8_t *)&dbid;
4906         data.dsize = sizeof(dbid);
4907
4908         return ctdb_control_send(ctdb, destnode, 0, 
4909                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4910                            ctdb, NULL, NULL);
4911 }
4912
4913 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4914 {
4915         int ret;
4916         int32_t res;
4917
4918         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4919         if (ret != 0 || res != 0) {
4920                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4921                 return -1;
4922         }
4923
4924         return 0;
4925 }
4926
4927 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4928 {
4929         struct ctdb_client_control_state *state;
4930
4931         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4932         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4933 }