ctdb-daemon: Use correct tdb flags when enabling robust mutex support
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb_wrap/tdb_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_PROTOCOL;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, bool updatetdb)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88         c->header = header;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         /* we need to force the record to be written out if this was a remote access */
106         if (c->new_data == NULL) {
107                 c->new_data = &c->record_data;
108         }
109
110         if (c->new_data && updatetdb) {
111                 /* XXX check that we always have the lock here? */
112                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
113                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
114                         talloc_free(c);
115                         return -1;
116                 }
117         }
118
119         if (c->reply_data) {
120                 call->reply_data = *c->reply_data;
121
122                 talloc_steal(call, call->reply_data.dptr);
123                 talloc_set_name_const(call->reply_data.dptr, __location__);
124         } else {
125                 call->reply_data.dptr = NULL;
126                 call->reply_data.dsize = 0;
127         }
128         call->status = c->status;
129
130         talloc_free(c);
131
132         return 0;
133 }
134
135
136 /*
137   queue a packet for sending from client to daemon
138 */
139 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
140 {
141         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
142 }
143
144
145 /*
146   called when a CTDB_REPLY_CALL packet comes in in the client
147
148   This packet comes in response to a CTDB_REQ_CALL request packet. It
149   contains any reply data from the call
150 */
151 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 {
153         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
154         struct ctdb_client_call_state *state;
155
156         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
157         if (state == NULL) {
158                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
159                 return;
160         }
161
162         if (hdr->reqid != state->reqid) {
163                 /* we found a record  but it was the wrong one */
164                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
165                 return;
166         }
167
168         state->call->reply_data.dptr = c->data;
169         state->call->reply_data.dsize = c->datalen;
170         state->call->status = c->status;
171
172         talloc_steal(state, c);
173
174         state->state = CTDB_CALL_DONE;
175
176         if (state->async.fn) {
177                 state->async.fn(state);
178         }
179 }
180
181 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
182
183 /*
184   this is called in the client, when data comes in from the daemon
185  */
186 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
187 {
188         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
189         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
190         TALLOC_CTX *tmp_ctx;
191
192         /* place the packet as a child of a tmp_ctx. We then use
193            talloc_free() below to free it. If any of the calls want
194            to keep it, then they will steal it somewhere else, and the
195            talloc_free() will be a no-op */
196         tmp_ctx = talloc_new(ctdb);
197         talloc_steal(tmp_ctx, hdr);
198
199         if (cnt == 0) {
200                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
201                 exit(1);
202         }
203
204         if (cnt < sizeof(*hdr)) {
205                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
206                 goto done;
207         }
208         if (cnt != hdr->length) {
209                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
210                                (unsigned)hdr->length, (unsigned)cnt);
211                 goto done;
212         }
213
214         if (hdr->ctdb_magic != CTDB_MAGIC) {
215                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
216                 goto done;
217         }
218
219         if (hdr->ctdb_version != CTDB_PROTOCOL) {
220                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
221                 goto done;
222         }
223
224         switch (hdr->operation) {
225         case CTDB_REPLY_CALL:
226                 ctdb_client_reply_call(ctdb, hdr);
227                 break;
228
229         case CTDB_REQ_MESSAGE:
230                 ctdb_request_message(ctdb, hdr);
231                 break;
232
233         case CTDB_REPLY_CONTROL:
234                 ctdb_client_reply_control(ctdb, hdr);
235                 break;
236
237         default:
238                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
239         }
240
241 done:
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   connect to a unix domain socket
247 */
248 int ctdb_socket_connect(struct ctdb_context *ctdb)
249 {
250         struct sockaddr_un addr;
251
252         memset(&addr, 0, sizeof(addr));
253         addr.sun_family = AF_UNIX;
254         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
255
256         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
257         if (ctdb->daemon.sd == -1) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
259                 return -1;
260         }
261
262         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
263                 close(ctdb->daemon.sd);
264                 ctdb->daemon.sd = -1;
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
273                                               CTDB_DS_ALIGNMENT, 
274                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
275         return 0;
276 }
277
278
279 struct ctdb_record_handle {
280         struct ctdb_db_context *ctdb_db;
281         TDB_DATA key;
282         TDB_DATA *data;
283         struct ctdb_ltdb_header header;
284 };
285
286
287 /*
288   make a recv call to the local ctdb daemon - called from client context
289
290   This is called when the program wants to wait for a ctdb_call to complete and get the 
291   results. This call will block unless the call has already completed.
292 */
293 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
294 {
295         if (state == NULL) {
296                 return -1;
297         }
298
299         while (state->state < CTDB_CALL_DONE) {
300                 event_loop_once(state->ctdb_db->ctdb->ev);
301         }
302         if (state->state != CTDB_CALL_DONE) {
303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
304                 talloc_free(state);
305                 return -1;
306         }
307
308         if (state->call->reply_data.dsize) {
309                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
310                                                       state->call->reply_data.dptr,
311                                                       state->call->reply_data.dsize);
312                 call->reply_data.dsize = state->call->reply_data.dsize;
313         } else {
314                 call->reply_data.dptr = NULL;
315                 call->reply_data.dsize = 0;
316         }
317         call->status = state->call->status;
318         talloc_free(state);
319
320         return call->status;
321 }
322
323
324
325
326 /*
327   destroy a ctdb_call in client
328 */
329 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
330 {
331         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
332         return 0;
333 }
334
335 /*
336   construct an event driven local ctdb_call
337
338   this is used so that locally processed ctdb_call requests are processed
339   in an event driven manner
340 */
341 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
342                                                                   struct ctdb_call *call,
343                                                                   struct ctdb_ltdb_header *header,
344                                                                   TDB_DATA *data)
345 {
346         struct ctdb_client_call_state *state;
347         struct ctdb_context *ctdb = ctdb_db->ctdb;
348         int ret;
349
350         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
351         CTDB_NO_MEMORY_NULL(ctdb, state);
352         state->call = talloc_zero(state, struct ctdb_call);
353         CTDB_NO_MEMORY_NULL(ctdb, state->call);
354
355         talloc_steal(state, data->dptr);
356
357         state->state   = CTDB_CALL_DONE;
358         *(state->call) = *call;
359         state->ctdb_db = ctdb_db;
360
361         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
362         if (ret != 0) {
363                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
364         }
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
400                 ret = -1;
401         }
402
403         if (ret == 0 && header.dmaster == ctdb->pnn) {
404                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
405                 talloc_free(data.dptr);
406                 ctdb_ltdb_unlock(ctdb_db, call->key);
407                 return state;
408         }
409
410         ctdb_ltdb_unlock(ctdb_db, call->key);
411         talloc_free(data.dptr);
412
413         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414         if (state == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416                 return NULL;
417         }
418         state->call = talloc_zero(state, struct ctdb_call);
419         if (state->call == NULL) {
420                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421                 return NULL;
422         }
423
424         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
425         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426         if (c == NULL) {
427                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428                 return NULL;
429         }
430
431         state->reqid     = ctdb_reqid_new(ctdb, state);
432         state->ctdb_db = ctdb_db;
433         talloc_set_destructor(state, ctdb_client_call_destructor);
434
435         c->hdr.reqid     = state->reqid;
436         c->flags         = call->flags;
437         c->db_id         = ctdb_db->db_id;
438         c->callid        = call->call_id;
439         c->hopcount      = 0;
440         c->keylen        = call->key.dsize;
441         c->calldatalen   = call->call_data.dsize;
442         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
443         memcpy(&c->data[call->key.dsize], 
444                call->call_data.dptr, call->call_data.dsize);
445         *(state->call)              = *call;
446         state->call->call_data.dptr = &c->data[call->key.dsize];
447         state->call->key.dptr       = &c->data[0];
448
449         state->state  = CTDB_CALL_WAIT;
450
451
452         ctdb_client_queue_pkt(ctdb, &c->hdr);
453
454         return state;
455 }
456
457
458 /*
459   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 */
461 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 {
463         struct ctdb_client_call_state *state;
464
465         state = ctdb_call_send(ctdb_db, call);
466         return ctdb_call_recv(state, call);
467 }
468
469
470 /*
471   tell the daemon what messaging srvid we will use, and register the message
472   handler function in the client
473 */
474 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
475                              ctdb_msg_fn_t handler,
476                              void *private_data)
477 {
478         int res;
479         int32_t status;
480
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512 /*
513  * check server ids
514  */
515 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
516                                        uint8_t *result)
517 {
518         TDB_DATA indata, outdata;
519         int res;
520         int32_t status;
521         int i;
522
523         indata.dptr = (uint8_t *)ids;
524         indata.dsize = num * sizeof(*ids);
525
526         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
527                            indata, ctdb, &outdata, &status, NULL, NULL);
528         if (res != 0 || status != 0) {
529                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
530                 return -1;
531         }
532
533         if (outdata.dsize != num*sizeof(uint8_t)) {
534                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
535                                   (long unsigned int)num*sizeof(uint8_t),
536                                   outdata.dsize));
537                 talloc_free(outdata.dptr);
538                 return -1;
539         }
540
541         for (i=0; i<num; i++) {
542                 result[i] = outdata.dptr[i];
543         }
544
545         talloc_free(outdata.dptr);
546         return 0;
547 }
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         talloc_free(r);
570         return res;
571 }
572
573
574 /*
575   cancel a ctdb_fetch_lock operation, releasing the lock
576  */
577 static int fetch_lock_destructor(struct ctdb_record_handle *h)
578 {
579         ctdb_ltdb_unlock(h->ctdb_db, h->key);
580         return 0;
581 }
582
583 /*
584   force the migration of a record to this node
585  */
586 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
587 {
588         struct ctdb_call call;
589         ZERO_STRUCT(call);
590         call.call_id = CTDB_NULL_FUNC;
591         call.key = key;
592         call.flags = CTDB_IMMEDIATE_MIGRATION;
593         return ctdb_call(ctdb_db, &call);
594 }
595
596 /*
597   try to fetch a readonly copy of a record
598  */
599 static int
600 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
601 {
602         int ret;
603
604         struct ctdb_call call;
605         ZERO_STRUCT(call);
606
607         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
608         call.call_data.dptr = NULL;
609         call.call_data.dsize = 0;
610         call.key = key;
611         call.flags = CTDB_WANT_READONLY;
612         ret = ctdb_call(ctdb_db, &call);
613
614         if (ret != 0) {
615                 return -1;
616         }
617         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
618                 return -1;
619         }
620
621         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
622         if (*hdr == NULL) {
623                 talloc_free(call.reply_data.dptr);
624                 return -1;
625         }
626
627         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
628         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
629         if (data->dptr == NULL) {
630                 talloc_free(call.reply_data.dptr);
631                 talloc_free(hdr);
632                 return -1;
633         }
634
635         return 0;
636 }
637
638 /*
639   get a lock on a record, and return the records data. Blocks until it gets the lock
640  */
641 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
642                                            TDB_DATA key, TDB_DATA *data)
643 {
644         int ret;
645         struct ctdb_record_handle *h;
646
647         /*
648           procedure is as follows:
649
650           1) get the chain lock. 
651           2) check if we are dmaster
652           3) if we are the dmaster then return handle 
653           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
654              reply from ctdbd
655           5) when we get the reply, goto (1)
656          */
657
658         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
659         if (h == NULL) {
660                 return NULL;
661         }
662
663         h->ctdb_db = ctdb_db;
664         h->key     = key;
665         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
666         if (h->key.dptr == NULL) {
667                 talloc_free(h);
668                 return NULL;
669         }
670         h->data    = data;
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
673                  (const char *)key.dptr));
674
675 again:
676         /* step 1 - get the chain lock */
677         ret = ctdb_ltdb_lock(ctdb_db, key);
678         if (ret != 0) {
679                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
680                 talloc_free(h);
681                 return NULL;
682         }
683
684         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
685
686         talloc_set_destructor(h, fetch_lock_destructor);
687
688         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
689
690         /* when torturing, ensure we test the remote path */
691         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
692             random() % 5 == 0) {
693                 h->header.dmaster = (uint32_t)-1;
694         }
695
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
698
699         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
700                 ctdb_ltdb_unlock(ctdb_db, key);
701                 ret = ctdb_client_force_migration(ctdb_db, key);
702                 if (ret != 0) {
703                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
704                         talloc_free(h);
705                         return NULL;
706                 }
707                 goto again;
708         }
709
710         /* if this is a request for read/write and we have delegations
711            we have to revoke all delegations first
712         */
713         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
714             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
715                 ctdb_ltdb_unlock(ctdb_db, key);
716                 ret = ctdb_client_force_migration(ctdb_db, key);
717                 if (ret != 0) {
718                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
719                         talloc_free(h);
720                         return NULL;
721                 }
722                 goto again;
723         }
724
725         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
726         return h;
727 }
728
729 /*
730   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
731  */
732 struct ctdb_record_handle *
733 ctdb_fetch_readonly_lock(
734         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
735         TDB_DATA key, TDB_DATA *data,
736         int read_only)
737 {
738         int ret;
739         struct ctdb_record_handle *h;
740         struct ctdb_ltdb_header *roheader = NULL;
741
742         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
743         if (h == NULL) {
744                 return NULL;
745         }
746
747         h->ctdb_db = ctdb_db;
748         h->key     = key;
749         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
750         if (h->key.dptr == NULL) {
751                 talloc_free(h);
752                 return NULL;
753         }
754         h->data    = data;
755
756         data->dptr = NULL;
757         data->dsize = 0;
758
759
760 again:
761         talloc_free(roheader);
762         roheader = NULL;
763
764         talloc_free(data->dptr);
765         data->dptr = NULL;
766         data->dsize = 0;
767
768         /* Lock the record/chain */
769         ret = ctdb_ltdb_lock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
772                 talloc_free(h);
773                 return NULL;
774         }
775
776         talloc_set_destructor(h, fetch_lock_destructor);
777
778         /* Check if record exists yet in the TDB */
779         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
780         if (ret != 0) {
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_force_migration(ctdb_db, key);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
785                         talloc_free(h);
786                         return NULL;
787                 }
788                 goto again;
789         }
790
791         /* if this is a request for read/write and we have delegations
792            we have to revoke all delegations first
793         */
794         if ((read_only == 0) 
795         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
796         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                 ctdb_ltdb_unlock(ctdb_db, key);
798                 ret = ctdb_client_force_migration(ctdb_db, key);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
801                         talloc_free(h);
802                         return NULL;
803                 }
804                 goto again;
805         }
806
807         /* if we are dmaster, just return the handle */
808         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
809                 return h;
810         }
811
812         if (read_only != 0) {
813                 TDB_DATA rodata = {NULL, 0};
814
815                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
816                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
817                         return h;
818                 }
819
820                 ctdb_ltdb_unlock(ctdb_db, key);
821                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
822                 if (ret != 0) {
823                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
835                         ret = ctdb_client_force_migration(ctdb_db, key);
836                         if (ret != 0) {
837                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                                 talloc_free(h);
839                                 return NULL;
840                         }
841
842                         goto again;
843                 }
844
845                 ret = ctdb_ltdb_lock(ctdb_db, key);
846                 if (ret != 0) {
847                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
848                         talloc_free(h);
849                         return NULL;
850                 }
851
852                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
853                 if (ret != 0) {
854                         ctdb_ltdb_unlock(ctdb_db, key);
855
856                         ret = ctdb_client_force_migration(ctdb_db, key);
857                         if (ret != 0) {
858                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
859                                 talloc_free(h);
860                                 return NULL;
861                         }
862
863                         goto again;
864                 }
865
866                 return h;
867         }
868
869         /* we are not dmaster and this was not a request for a readonly lock
870          * so unlock the record, migrate it and try again
871          */
872         ctdb_ltdb_unlock(ctdb_db, key);
873         ret = ctdb_client_force_migration(ctdb_db, key);
874         if (ret != 0) {
875                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
876                 talloc_free(h);
877                 return NULL;
878         }
879         goto again;
880 }
881
882 /*
883   store some data to the record that was locked with ctdb_fetch_lock()
884 */
885 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
886 {
887         if (h->ctdb_db->persistent) {
888                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
889                 return -1;
890         }
891
892         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
893 }
894
895 /*
896   non-locking fetch of a record
897  */
898 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
899                TDB_DATA key, TDB_DATA *data)
900 {
901         struct ctdb_call call;
902         int ret;
903
904         call.call_id = CTDB_FETCH_FUNC;
905         call.call_data.dptr = NULL;
906         call.call_data.dsize = 0;
907         call.key = key;
908
909         ret = ctdb_call(ctdb_db, &call);
910
911         if (ret == 0) {
912                 *data = call.reply_data;
913                 talloc_steal(mem_ctx, data->dptr);
914         }
915
916         return ret;
917 }
918
919
920
921 /*
922    called when a control completes or timesout to invoke the callback
923    function the user provided
924 */
925 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
926         struct timeval t, void *private_data)
927 {
928         struct ctdb_client_control_state *state;
929         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
930         int ret;
931
932         state = talloc_get_type(private_data, struct ctdb_client_control_state);
933         talloc_steal(tmp_ctx, state);
934
935         ret = ctdb_control_recv(state->ctdb, state, state,
936                         NULL, 
937                         NULL, 
938                         NULL);
939         if (ret != 0) {
940                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
941         }
942
943         talloc_free(tmp_ctx);
944 }
945
946 /*
947   called when a CTDB_REPLY_CONTROL packet comes in in the client
948
949   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
950   contains any reply data from the control
951 */
952 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
953                                       struct ctdb_req_header *hdr)
954 {
955         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
956         struct ctdb_client_control_state *state;
957
958         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
959         if (state == NULL) {
960                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
961                 return;
962         }
963
964         if (hdr->reqid != state->reqid) {
965                 /* we found a record  but it was the wrong one */
966                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
967                 return;
968         }
969
970         state->outdata.dptr = c->data;
971         state->outdata.dsize = c->datalen;
972         state->status = c->status;
973         if (c->errorlen) {
974                 state->errormsg = talloc_strndup(state, 
975                                                  (char *)&c->data[c->datalen], 
976                                                  c->errorlen);
977         }
978
979         /* state->outdata now uses resources from c so we dont want c
980            to just dissappear from under us while state is still alive
981         */
982         talloc_steal(state, c);
983
984         state->state = CTDB_CONTROL_DONE;
985
986         /* if we had a callback registered for this control, pull the response
987            and call the callback.
988         */
989         if (state->async.fn) {
990                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
991         }
992 }
993
994
995 /*
996   destroy a ctdb_control in client
997 */
998 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
999 {
1000         ctdb_reqid_remove(state->ctdb, state->reqid);
1001         return 0;
1002 }
1003
1004
1005 /* time out handler for ctdb_control */
1006 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1007         struct timeval t, void *private_data)
1008 {
1009         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1010
1011         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1012                          "dstnode:%u\n", state->reqid, state->c->opcode,
1013                          state->c->hdr.destnode));
1014
1015         state->state = CTDB_CONTROL_TIMEOUT;
1016
1017         /* if we had a callback registered for this control, pull the response
1018            and call the callback.
1019         */
1020         if (state->async.fn) {
1021                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1022         }
1023 }
1024
1025 /* async version of send control request */
1026 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1027                 uint32_t destnode, uint64_t srvid, 
1028                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1029                 TALLOC_CTX *mem_ctx,
1030                 struct timeval *timeout,
1031                 char **errormsg)
1032 {
1033         struct ctdb_client_control_state *state;
1034         size_t len;
1035         struct ctdb_req_control *c;
1036         int ret;
1037
1038         if (errormsg) {
1039                 *errormsg = NULL;
1040         }
1041
1042         /* if the domain socket is not yet open, open it */
1043         if (ctdb->daemon.sd==-1) {
1044                 ctdb_socket_connect(ctdb);
1045         }
1046
1047         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1048         CTDB_NO_MEMORY_NULL(ctdb, state);
1049
1050         state->ctdb       = ctdb;
1051         state->reqid      = ctdb_reqid_new(ctdb, state);
1052         state->state      = CTDB_CONTROL_WAIT;
1053         state->errormsg   = NULL;
1054
1055         talloc_set_destructor(state, ctdb_client_control_destructor);
1056
1057         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1058         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1059                                len, struct ctdb_req_control);
1060         state->c            = c;        
1061         CTDB_NO_MEMORY_NULL(ctdb, c);
1062         c->hdr.reqid        = state->reqid;
1063         c->hdr.destnode     = destnode;
1064         c->opcode           = opcode;
1065         c->client_id        = 0;
1066         c->flags            = flags;
1067         c->srvid            = srvid;
1068         c->datalen          = data.dsize;
1069         if (data.dsize) {
1070                 memcpy(&c->data[0], data.dptr, data.dsize);
1071         }
1072
1073         /* timeout */
1074         if (timeout && !timeval_is_zero(timeout)) {
1075                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1076         }
1077
1078         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1079         if (ret != 0) {
1080                 talloc_free(state);
1081                 return NULL;
1082         }
1083
1084         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1085                 talloc_free(state);
1086                 return NULL;
1087         }
1088
1089         return state;
1090 }
1091
1092
1093 /* async version of receive control reply */
1094 int ctdb_control_recv(struct ctdb_context *ctdb, 
1095                 struct ctdb_client_control_state *state, 
1096                 TALLOC_CTX *mem_ctx,
1097                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1098 {
1099         TALLOC_CTX *tmp_ctx;
1100
1101         if (status != NULL) {
1102                 *status = -1;
1103         }
1104         if (errormsg != NULL) {
1105                 *errormsg = NULL;
1106         }
1107
1108         if (state == NULL) {
1109                 return -1;
1110         }
1111
1112         /* prevent double free of state */
1113         tmp_ctx = talloc_new(ctdb);
1114         talloc_steal(tmp_ctx, state);
1115
1116         /* loop one event at a time until we either timeout or the control
1117            completes.
1118         */
1119         while (state->state == CTDB_CONTROL_WAIT) {
1120                 event_loop_once(ctdb->ev);
1121         }
1122
1123         if (state->state != CTDB_CONTROL_DONE) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1125                 if (state->async.fn) {
1126                         state->async.fn(state);
1127                 }
1128                 talloc_free(tmp_ctx);
1129                 return -1;
1130         }
1131
1132         if (state->errormsg) {
1133                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1134                 if (errormsg) {
1135                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1136                 }
1137                 if (state->async.fn) {
1138                         state->async.fn(state);
1139                 }
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         if (outdata) {
1145                 *outdata = state->outdata;
1146                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1147         }
1148
1149         if (status) {
1150                 *status = state->status;
1151         }
1152
1153         if (state->async.fn) {
1154                 state->async.fn(state);
1155         }
1156
1157         talloc_free(tmp_ctx);
1158         return 0;
1159 }
1160
1161
1162
1163 /*
1164   send a ctdb control message
1165   timeout specifies how long we should wait for a reply.
1166   if timeout is NULL we wait indefinitely
1167  */
1168 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1169                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1170                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1171                  struct timeval *timeout,
1172                  char **errormsg)
1173 {
1174         struct ctdb_client_control_state *state;
1175
1176         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1177                         flags, data, mem_ctx,
1178                         timeout, errormsg);
1179
1180         /* FIXME: Error conditions in ctdb_control_send return NULL without
1181          * setting errormsg.  So, there is no way to distinguish between sucess
1182          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1183         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1184                 if (status != NULL) {
1185                         *status = 0;
1186                 }
1187                 return 0;
1188         }
1189
1190         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1191                         errormsg);
1192 }
1193
1194
1195
1196
1197 /*
1198   a process exists call. Returns 0 if process exists, -1 otherwise
1199  */
1200 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1201 {
1202         int ret;
1203         TDB_DATA data;
1204         int32_t status;
1205
1206         data.dptr = (uint8_t*)&pid;
1207         data.dsize = sizeof(pid);
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1211                            NULL, NULL, &status, NULL, NULL);
1212         if (ret != 0) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1214                 return -1;
1215         }
1216
1217         return status;
1218 }
1219
1220 /*
1221   get remote statistics
1222  */
1223 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1224 {
1225         int ret;
1226         TDB_DATA data;
1227         int32_t res;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1231                            ctdb, &data, &res, NULL, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1234                 return -1;
1235         }
1236
1237         if (data.dsize != sizeof(struct ctdb_statistics)) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1239                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1240                       return -1;
1241         }
1242
1243         *status = *(struct ctdb_statistics *)data.dptr;
1244         talloc_free(data.dptr);
1245                         
1246         return 0;
1247 }
1248
1249 /*
1250  * get db statistics
1251  */
1252 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1253                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1254 {
1255         int ret;
1256         TDB_DATA indata, outdata;
1257         int32_t res;
1258         struct ctdb_db_statistics *wire, *s;
1259         char *ptr;
1260         int i;
1261
1262         indata.dptr = (uint8_t *)&dbid;
1263         indata.dsize = sizeof(dbid);
1264
1265         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1266                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1267         if (ret != 0 || res != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1269                 return -1;
1270         }
1271
1272         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1273                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1274                                  outdata.dsize,
1275                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1276                 return -1;
1277         }
1278
1279         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1280         if (s == NULL) {
1281                 talloc_free(outdata.dptr);
1282                 CTDB_NO_MEMORY(ctdb, s);
1283         }
1284
1285         wire = (struct ctdb_db_statistics *)outdata.dptr;
1286         *s = *wire;
1287         ptr = &wire->hot_keys_wire[0];
1288         for (i=0; i<wire->num_hot_keys; i++) {
1289                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1290                 if (s->hot_keys[i].key.dptr == NULL) {
1291                         talloc_free(outdata.dptr);
1292                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1293                 }
1294
1295                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1296                 ptr += wire->hot_keys[i].key.dsize;
1297         }
1298
1299         talloc_free(outdata.dptr);
1300         *dbstat = s;
1301         return 0;
1302 }
1303
1304 /*
1305   shutdown a remote ctdb node
1306  */
1307 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_control_send(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1313                            NULL, &timeout, NULL);
1314         if (state == NULL) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322 /*
1323   get vnn map from a remote node
1324  */
1325 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1326 {
1327         int ret;
1328         TDB_DATA outdata;
1329         int32_t res;
1330         struct ctdb_vnn_map_wire *map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1334                            mem_ctx, &outdata, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1337                 return -1;
1338         }
1339         
1340         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1341         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1342             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1343                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1344                 return -1;
1345         }
1346
1347         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1348         CTDB_NO_MEMORY(ctdb, *vnnmap);
1349         (*vnnmap)->generation = map->generation;
1350         (*vnnmap)->size       = map->size;
1351         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1352
1353         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1354         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1355         talloc_free(outdata.dptr);
1356                     
1357         return 0;
1358 }
1359
1360
1361 /*
1362   get the recovery mode of a remote node
1363  */
1364 struct ctdb_client_control_state *
1365 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1366 {
1367         return ctdb_control_send(ctdb, destnode, 0, 
1368                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1369                            mem_ctx, &timeout, NULL);
1370 }
1371
1372 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1380                 return -1;
1381         }
1382
1383         if (recmode) {
1384                 *recmode = (uint32_t)res;
1385         }
1386
1387         return 0;
1388 }
1389
1390 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1391 {
1392         struct ctdb_client_control_state *state;
1393
1394         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1395         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1396 }
1397
1398
1399
1400
1401 /*
1402   set the recovery mode of a remote node
1403  */
1404 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1405 {
1406         int ret;
1407         TDB_DATA data;
1408         int32_t res;
1409
1410         data.dsize = sizeof(uint32_t);
1411         data.dptr = (unsigned char *)&recmode;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   get the recovery master of a remote node
1428  */
1429 struct ctdb_client_control_state *
1430 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1431                         struct timeval timeout, uint32_t destnode)
1432 {
1433         return ctdb_control_send(ctdb, destnode, 0, 
1434                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1435                            mem_ctx, &timeout, NULL);
1436 }
1437
1438 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1439 {
1440         int ret;
1441         int32_t res;
1442
1443         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1446                 return -1;
1447         }
1448
1449         if (recmaster) {
1450                 *recmaster = (uint32_t)res;
1451         }
1452
1453         return 0;
1454 }
1455
1456 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1457 {
1458         struct ctdb_client_control_state *state;
1459
1460         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1461         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1462 }
1463
1464
1465 /*
1466   set the recovery master of a remote node
1467  */
1468 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1469 {
1470         int ret;
1471         TDB_DATA data;
1472         int32_t res;
1473
1474         ZERO_STRUCT(data);
1475         data.dsize = sizeof(uint32_t);
1476         data.dptr = (unsigned char *)&recmaster;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1480                            NULL, NULL, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1483                 return -1;
1484         }
1485
1486         return 0;
1487 }
1488
1489
1490 /*
1491   get a list of databases off a remote node
1492  */
1493 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1494                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1495 {
1496         int ret;
1497         TDB_DATA outdata;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1502                            mem_ctx, &outdata, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1505                 return -1;
1506         }
1507
1508         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1509         talloc_free(outdata.dptr);
1510                     
1511         return 0;
1512 }
1513
1514 /*
1515   get a list of nodes (vnn and flags ) from a remote node
1516  */
1517 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1518                 struct timeval timeout, uint32_t destnode, 
1519                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0, 
1526                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1530                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1531         }
1532         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1533                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1534                 return -1;
1535         }
1536
1537         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1538         talloc_free(outdata.dptr);
1539                     
1540         return 0;
1541 }
1542
1543 /*
1544   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1545  */
1546 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1547                 struct timeval timeout, uint32_t destnode, 
1548                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1549 {
1550         int ret, i, len;
1551         TDB_DATA outdata;
1552         struct ctdb_node_mapv4 *nodemapv4;
1553         int32_t res;
1554
1555         ret = ctdb_control(ctdb, destnode, 0, 
1556                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1557                            mem_ctx, &outdata, &res, &timeout, NULL);
1558         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1560                 return -1;
1561         }
1562
1563         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1564
1565         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1566         (*nodemap) = talloc_zero_size(mem_ctx, len);
1567         CTDB_NO_MEMORY(ctdb, (*nodemap));
1568
1569         (*nodemap)->num = nodemapv4->num;
1570         for (i=0; i<nodemapv4->num; i++) {
1571                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1572                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1573                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1574                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1575         }
1576                 
1577         talloc_free(outdata.dptr);
1578                     
1579         return 0;
1580 }
1581
1582 /*
1583   drop the transport, reload the nodes file and restart the transport
1584  */
1585 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1586                     struct timeval timeout, uint32_t destnode)
1587 {
1588         int ret;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, 
1592                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1593                            NULL, NULL, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1596                 return -1;
1597         }
1598
1599         return 0;
1600 }
1601
1602
1603 /*
1604   set vnn map on a node
1605  */
1606 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1607                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1608 {
1609         int ret;
1610         TDB_DATA data;
1611         int32_t res;
1612         struct ctdb_vnn_map_wire *map;
1613         size_t len;
1614
1615         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1616         map = talloc_size(mem_ctx, len);
1617         CTDB_NO_MEMORY(ctdb, map);
1618
1619         map->generation = vnnmap->generation;
1620         map->size = vnnmap->size;
1621         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1622         
1623         data.dsize = len;
1624         data.dptr  = (uint8_t *)map;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1631                 return -1;
1632         }
1633
1634         talloc_free(map);
1635
1636         return 0;
1637 }
1638
1639
1640 /*
1641   async send for pull database
1642  */
1643 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1644         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1645         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1646 {
1647         TDB_DATA indata;
1648         struct ctdb_control_pulldb *pull;
1649         struct ctdb_client_control_state *state;
1650
1651         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1652         CTDB_NO_MEMORY_NULL(ctdb, pull);
1653
1654         pull->db_id   = dbid;
1655         pull->lmaster = lmaster;
1656
1657         indata.dsize = sizeof(struct ctdb_control_pulldb);
1658         indata.dptr  = (unsigned char *)pull;
1659
1660         state = ctdb_control_send(ctdb, destnode, 0, 
1661                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1662                                   mem_ctx, &timeout, NULL);
1663         talloc_free(pull);
1664
1665         return state;
1666 }
1667
1668 /*
1669   async recv for pull database
1670  */
1671 int ctdb_ctrl_pulldb_recv(
1672         struct ctdb_context *ctdb, 
1673         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1674         TDB_DATA *outdata)
1675 {
1676         int ret;
1677         int32_t res;
1678
1679         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1680         if ( (ret != 0) || (res != 0) ){
1681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1682                 return -1;
1683         }
1684
1685         return 0;
1686 }
1687
1688 /*
1689   pull all keys and records for a specific database on a node
1690  */
1691 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1692                 uint32_t dbid, uint32_t lmaster, 
1693                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1694                 TDB_DATA *outdata)
1695 {
1696         struct ctdb_client_control_state *state;
1697
1698         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1699                                       timeout);
1700         
1701         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1702 }
1703
1704
1705 /*
1706   change dmaster for all keys in the database to the new value
1707  */
1708 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1709                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1710 {
1711         int ret;
1712         TDB_DATA indata;
1713         int32_t res;
1714
1715         indata.dsize = 2*sizeof(uint32_t);
1716         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1717
1718         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1719         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1720
1721         ret = ctdb_control(ctdb, destnode, 0, 
1722                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1723                            NULL, NULL, &res, &timeout, NULL);
1724         if (ret != 0 || res != 0) {
1725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1726                 return -1;
1727         }
1728
1729         return 0;
1730 }
1731
1732 /*
1733   ping a node, return number of clients connected
1734  */
1735 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1736 {
1737         int ret;
1738         int32_t res;
1739
1740         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1741                            tdb_null, NULL, NULL, &res, NULL, NULL);
1742         if (ret != 0) {
1743                 return -1;
1744         }
1745         return res;
1746 }
1747
1748 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1749                            struct timeval timeout, 
1750                            uint32_t destnode,
1751                            uint32_t *runstate)
1752 {
1753         TDB_DATA outdata;
1754         int32_t res;
1755         int ret;
1756
1757         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1758                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1759         if (ret != 0 || res != 0) {
1760                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1761                 return ret != 0 ? ret : res;
1762         }
1763
1764         if (outdata.dsize != sizeof(uint32_t)) {
1765                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1766                 talloc_free(outdata.dptr);
1767                 return -1;
1768         }
1769
1770         if (runstate != NULL) {
1771                 *runstate = *(uint32_t *)outdata.dptr;
1772         }
1773         talloc_free(outdata.dptr);
1774
1775         return 0;
1776 }
1777
1778 /*
1779   find the real path to a ltdb 
1780  */
1781 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1782                    const char **path)
1783 {
1784         int ret;
1785         int32_t res;
1786         TDB_DATA data;
1787
1788         data.dptr = (uint8_t *)&dbid;
1789         data.dsize = sizeof(dbid);
1790
1791         ret = ctdb_control(ctdb, destnode, 0, 
1792                            CTDB_CONTROL_GETDBPATH, 0, data, 
1793                            mem_ctx, &data, &res, &timeout, NULL);
1794         if (ret != 0 || res != 0) {
1795                 return -1;
1796         }
1797
1798         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1799         if ((*path) == NULL) {
1800                 return -1;
1801         }
1802
1803         talloc_free(data.dptr);
1804
1805         return 0;
1806 }
1807
1808 /*
1809   find the name of a db 
1810  */
1811 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1812                    const char **name)
1813 {
1814         int ret;
1815         int32_t res;
1816         TDB_DATA data;
1817
1818         data.dptr = (uint8_t *)&dbid;
1819         data.dsize = sizeof(dbid);
1820
1821         ret = ctdb_control(ctdb, destnode, 0, 
1822                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1823                            mem_ctx, &data, &res, &timeout, NULL);
1824         if (ret != 0 || res != 0) {
1825                 return -1;
1826         }
1827
1828         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1829         if ((*name) == NULL) {
1830                 return -1;
1831         }
1832
1833         talloc_free(data.dptr);
1834
1835         return 0;
1836 }
1837
1838 /*
1839   get the health status of a db
1840  */
1841 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1842                           struct timeval timeout,
1843                           uint32_t destnode,
1844                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1845                           const char **reason)
1846 {
1847         int ret;
1848         int32_t res;
1849         TDB_DATA data;
1850
1851         data.dptr = (uint8_t *)&dbid;
1852         data.dsize = sizeof(dbid);
1853
1854         ret = ctdb_control(ctdb, destnode, 0,
1855                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1856                            mem_ctx, &data, &res, &timeout, NULL);
1857         if (ret != 0 || res != 0) {
1858                 return -1;
1859         }
1860
1861         if (data.dsize == 0) {
1862                 (*reason) = NULL;
1863                 return 0;
1864         }
1865
1866         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1867         if ((*reason) == NULL) {
1868                 return -1;
1869         }
1870
1871         talloc_free(data.dptr);
1872
1873         return 0;
1874 }
1875
1876 /*
1877  * get db sequence number
1878  */
1879 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1880                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1881 {
1882         int ret;
1883         int32_t res;
1884         TDB_DATA data, outdata;
1885
1886         data.dptr = (uint8_t *)&dbid;
1887         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1888
1889         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1890                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1891         if (ret != 0 || res != 0) {
1892                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1893                 return -1;
1894         }
1895
1896         if (outdata.dsize != sizeof(uint64_t)) {
1897                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1898                 talloc_free(outdata.dptr);
1899                 return -1;
1900         }
1901
1902         if (seqnum != NULL) {
1903                 *seqnum = *(uint64_t *)outdata.dptr;
1904         }
1905         talloc_free(outdata.dptr);
1906
1907         return 0;
1908 }
1909
1910 /*
1911   create a database
1912  */
1913 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1914                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1915 {
1916         int ret;
1917         int32_t res;
1918         TDB_DATA data;
1919         uint64_t tdb_flags = 0;
1920
1921         data.dptr = discard_const(name);
1922         data.dsize = strlen(name)+1;
1923
1924         /* Make sure that volatile databases use jenkins hash */
1925         if (!persistent) {
1926                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1927         }
1928
1929 #ifdef TDB_MUTEX_LOCKING
1930         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1931                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1932         }
1933 #endif
1934
1935         ret = ctdb_control(ctdb, destnode, tdb_flags,
1936                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1937                            0, data, 
1938                            mem_ctx, &data, &res, &timeout, NULL);
1939
1940         if (ret != 0 || res != 0) {
1941                 return -1;
1942         }
1943
1944         return 0;
1945 }
1946
1947 /*
1948   get debug level on a node
1949  */
1950 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1951 {
1952         int ret;
1953         int32_t res;
1954         TDB_DATA data;
1955
1956         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1957                            ctdb, &data, &res, NULL, NULL);
1958         if (ret != 0 || res != 0) {
1959                 return -1;
1960         }
1961         if (data.dsize != sizeof(int32_t)) {
1962                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1963                          (unsigned)data.dsize));
1964                 return -1;
1965         }
1966         *level = *(int32_t *)data.dptr;
1967         talloc_free(data.dptr);
1968         return 0;
1969 }
1970
1971 /*
1972   set debug level on a node
1973  */
1974 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1975 {
1976         int ret;
1977         int32_t res;
1978         TDB_DATA data;
1979
1980         data.dptr = (uint8_t *)&level;
1981         data.dsize = sizeof(level);
1982
1983         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1984                            NULL, NULL, &res, NULL, NULL);
1985         if (ret != 0 || res != 0) {
1986                 return -1;
1987         }
1988         return 0;
1989 }
1990
1991
1992 /*
1993   get a list of connected nodes
1994  */
1995 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1996                                 struct timeval timeout,
1997                                 TALLOC_CTX *mem_ctx,
1998                                 uint32_t *num_nodes)
1999 {
2000         struct ctdb_node_map *map=NULL;
2001         int ret, i;
2002         uint32_t *nodes;
2003
2004         *num_nodes = 0;
2005
2006         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2007         if (ret != 0) {
2008                 return NULL;
2009         }
2010
2011         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2012         if (nodes == NULL) {
2013                 return NULL;
2014         }
2015
2016         for (i=0;i<map->num;i++) {
2017                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2018                         nodes[*num_nodes] = map->nodes[i].pnn;
2019                         (*num_nodes)++;
2020                 }
2021         }
2022
2023         return nodes;
2024 }
2025
2026
2027 /*
2028   reset remote status
2029  */
2030 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2031 {
2032         int ret;
2033         int32_t res;
2034
2035         ret = ctdb_control(ctdb, destnode, 0, 
2036                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2037                            NULL, NULL, &res, NULL, NULL);
2038         if (ret != 0 || res != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2040                 return -1;
2041         }
2042         return 0;
2043 }
2044
2045 /*
2046   attach to a specific database - client call
2047 */
2048 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2049                                     struct timeval timeout,
2050                                     const char *name,
2051                                     bool persistent,
2052                                     uint32_t tdb_flags)
2053 {
2054         struct ctdb_db_context *ctdb_db;
2055         TDB_DATA data;
2056         int ret;
2057         int32_t res;
2058 #ifdef TDB_MUTEX_LOCKING
2059         uint32_t mutex_enabled = 0;
2060 #endif
2061
2062         ctdb_db = ctdb_db_handle(ctdb, name);
2063         if (ctdb_db) {
2064                 return ctdb_db;
2065         }
2066
2067         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2068         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2069
2070         ctdb_db->ctdb = ctdb;
2071         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2072         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2073
2074         data.dptr = discard_const(name);
2075         data.dsize = strlen(name)+1;
2076
2077         /* CTDB has switched to using jenkins hash for volatile databases.
2078          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2079          * always set it.
2080          */
2081         if (!persistent) {
2082                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2083         }
2084
2085 #ifdef TDB_MUTEX_LOCKING
2086         if (!persistent) {
2087                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2088                                             CTDB_CURRENT_NODE,
2089                                             "TDBMutexEnabled",
2090                                             &mutex_enabled);
2091                 if (ret != 0) {
2092                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2093                 }
2094
2095                 if (mutex_enabled == 1) {
2096                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2097                 }
2098         }
2099 #endif
2100
2101         /* tell ctdb daemon to attach */
2102         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2103                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2104                            0, data, ctdb_db, &data, &res, NULL, NULL);
2105         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2106                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2107                 talloc_free(ctdb_db);
2108                 return NULL;
2109         }
2110         
2111         ctdb_db->db_id = *(uint32_t *)data.dptr;
2112         talloc_free(data.dptr);
2113
2114         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2115         if (ret != 0) {
2116                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2117                 talloc_free(ctdb_db);
2118                 return NULL;
2119         }
2120
2121         if (persistent) {
2122                 tdb_flags = TDB_DEFAULT;
2123         } else {
2124                 tdb_flags = TDB_NOSYNC;
2125 #ifdef TDB_MUTEX_LOCKING
2126                 if (mutex_enabled) {
2127                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2128                 }
2129 #endif
2130         }
2131         if (ctdb->valgrinding) {
2132                 tdb_flags |= TDB_NOMMAP;
2133         }
2134         tdb_flags |= TDB_DISALLOW_NESTING;
2135
2136         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2137                                       O_RDWR, 0);
2138         if (ctdb_db->ltdb == NULL) {
2139                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2140                 talloc_free(ctdb_db);
2141                 return NULL;
2142         }
2143
2144         ctdb_db->persistent = persistent;
2145
2146         DLIST_ADD(ctdb->db_list, ctdb_db);
2147
2148         /* add well known functions */
2149         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2150         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2151         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2152
2153         return ctdb_db;
2154 }
2155
2156 /*
2157  * detach from a specific database - client call
2158  */
2159 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2160 {
2161         int ret;
2162         int32_t status;
2163         TDB_DATA data;
2164
2165         data.dsize = sizeof(db_id);
2166         data.dptr = (uint8_t *)&db_id;
2167
2168         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2169                            0, data, NULL, NULL, &status, NULL, NULL);
2170         if (ret != 0 || status != 0) {
2171                 return -1;
2172         }
2173         return 0;
2174 }
2175
2176 /*
2177   setup a call for a database
2178  */
2179 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2180 {
2181         struct ctdb_registered_call *call;
2182
2183 #if 0
2184         TDB_DATA data;
2185         int32_t status;
2186         struct ctdb_control_set_call c;
2187         int ret;
2188
2189         /* this is no longer valid with the separate daemon architecture */
2190         c.db_id = ctdb_db->db_id;
2191         c.fn    = fn;
2192         c.id    = id;
2193
2194         data.dptr = (uint8_t *)&c;
2195         data.dsize = sizeof(c);
2196
2197         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2198                            data, NULL, NULL, &status, NULL, NULL);
2199         if (ret != 0 || status != 0) {
2200                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2201                 return -1;
2202         }
2203 #endif
2204
2205         /* also register locally */
2206         call = talloc(ctdb_db, struct ctdb_registered_call);
2207         call->fn = fn;
2208         call->id = id;
2209
2210         DLIST_ADD(ctdb_db->calls, call);        
2211         return 0;
2212 }
2213
2214
2215 struct traverse_state {
2216         bool done;
2217         uint32_t count;
2218         ctdb_traverse_func fn;
2219         void *private_data;
2220         bool listemptyrecords;
2221 };
2222
2223 /*
2224   called on each key during a ctdb_traverse
2225  */
2226 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2227 {
2228         struct traverse_state *state = (struct traverse_state *)p;
2229         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2230         TDB_DATA key;
2231
2232         if (data.dsize < sizeof(uint32_t) ||
2233             d->length != data.dsize) {
2234                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2235                 state->done = true;
2236                 return;
2237         }
2238
2239         key.dsize = d->keylen;
2240         key.dptr  = &d->data[0];
2241         data.dsize = d->datalen;
2242         data.dptr = &d->data[d->keylen];
2243
2244         if (key.dsize == 0 && data.dsize == 0) {
2245                 /* end of traverse */
2246                 state->done = true;
2247                 return;
2248         }
2249
2250         if (!state->listemptyrecords &&
2251             data.dsize == sizeof(struct ctdb_ltdb_header))
2252         {
2253                 /* empty records are deleted records in ctdb */
2254                 return;
2255         }
2256
2257         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2258                 state->done = true;
2259         }
2260
2261         state->count++;
2262 }
2263
2264 /**
2265  * start a cluster wide traverse, calling the supplied fn on each record
2266  * return the number of records traversed, or -1 on error
2267  *
2268  * Extendet variant with a flag to signal whether empty records should
2269  * be listed.
2270  */
2271 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2272                              ctdb_traverse_func fn,
2273                              bool withemptyrecords,
2274                              void *private_data)
2275 {
2276         TDB_DATA data;
2277         struct ctdb_traverse_start_ext t;
2278         int32_t status;
2279         int ret;
2280         uint64_t srvid = (getpid() | 0xFLL<<60);
2281         struct traverse_state state;
2282
2283         state.done = false;
2284         state.count = 0;
2285         state.private_data = private_data;
2286         state.fn = fn;
2287         state.listemptyrecords = withemptyrecords;
2288
2289         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2290         if (ret != 0) {
2291                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2292                 return -1;
2293         }
2294
2295         t.db_id = ctdb_db->db_id;
2296         t.srvid = srvid;
2297         t.reqid = 0;
2298         t.withemptyrecords = withemptyrecords;
2299
2300         data.dptr = (uint8_t *)&t;
2301         data.dsize = sizeof(t);
2302
2303         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2304                            data, NULL, NULL, &status, NULL, NULL);
2305         if (ret != 0 || status != 0) {
2306                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2307                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2308                 return -1;
2309         }
2310
2311         while (!state.done) {
2312                 event_loop_once(ctdb_db->ctdb->ev);
2313         }
2314
2315         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2316         if (ret != 0) {
2317                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2318                 return -1;
2319         }
2320
2321         return state.count;
2322 }
2323
2324 /**
2325  * start a cluster wide traverse, calling the supplied fn on each record
2326  * return the number of records traversed, or -1 on error
2327  *
2328  * Standard version which does not list the empty records:
2329  * These are considered deleted.
2330  */
2331 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2332 {
2333         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2334 }
2335
2336 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2337 /*
2338   called on each key during a catdb
2339  */
2340 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2341 {
2342         int i;
2343         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2344         FILE *f = c->f;
2345         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2346
2347         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2348         for (i=0;i<key.dsize;i++) {
2349                 if (ISASCII(key.dptr[i])) {
2350                         fprintf(f, "%c", key.dptr[i]);
2351                 } else {
2352                         fprintf(f, "\\%02X", key.dptr[i]);
2353                 }
2354         }
2355         fprintf(f, "\"\n");
2356
2357         fprintf(f, "dmaster: %u\n", h->dmaster);
2358         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2359
2360         if (c->printlmaster && ctdb->vnn_map != NULL) {
2361                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2362         }
2363
2364         if (c->printhash) {
2365                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2366         }
2367
2368         if (c->printrecordflags) {
2369                 fprintf(f, "flags: 0x%08x", h->flags);
2370                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2371                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2372                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2373                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2374                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2375                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2376                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2377                 fprintf(f, "\n");
2378         }
2379
2380         if (c->printdatasize) {
2381                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2382         } else {
2383                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2384                 for (i=sizeof(*h);i<data.dsize;i++) {
2385                         if (ISASCII(data.dptr[i])) {
2386                                 fprintf(f, "%c", data.dptr[i]);
2387                         } else {
2388                                 fprintf(f, "\\%02X", data.dptr[i]);
2389                         }
2390                 }
2391                 fprintf(f, "\"\n");
2392         }
2393
2394         fprintf(f, "\n");
2395
2396         return 0;
2397 }
2398
2399 /*
2400   convenience function to list all keys to stdout
2401  */
2402 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2403                  struct ctdb_dump_db_context *ctx)
2404 {
2405         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2406                                  ctx->printemptyrecords, ctx);
2407 }
2408
2409 /*
2410   get the pid of a ctdb daemon
2411  */
2412 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2413 {
2414         int ret;
2415         int32_t res;
2416
2417         ret = ctdb_control(ctdb, destnode, 0, 
2418                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2419                            NULL, NULL, &res, &timeout, NULL);
2420         if (ret != 0) {
2421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2422                 return -1;
2423         }
2424
2425         *pid = res;
2426
2427         return 0;
2428 }
2429
2430
2431 /*
2432   async freeze send control
2433  */
2434 struct ctdb_client_control_state *
2435 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2436 {
2437         return ctdb_control_send(ctdb, destnode, priority, 
2438                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2439                            mem_ctx, &timeout, NULL);
2440 }
2441
2442 /* 
2443    async freeze recv control
2444 */
2445 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2446 {
2447         int ret;
2448         int32_t res;
2449
2450         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2451         if ( (ret != 0) || (res != 0) ){
2452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2453                 return -1;
2454         }
2455
2456         return 0;
2457 }
2458
2459 /*
2460   freeze databases of a certain priority
2461  */
2462 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2463 {
2464         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2465         struct ctdb_client_control_state *state;
2466         int ret;
2467
2468         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2469         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2470         talloc_free(tmp_ctx);
2471
2472         return ret;
2473 }
2474
2475 /* Freeze all databases */
2476 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2477 {
2478         int i;
2479
2480         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2481                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2482                         return -1;
2483                 }
2484         }
2485         return 0;
2486 }
2487
2488 /*
2489   thaw databases of a certain priority
2490  */
2491 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2492 {
2493         int ret;
2494         int32_t res;
2495
2496         ret = ctdb_control(ctdb, destnode, priority, 
2497                            CTDB_CONTROL_THAW, 0, tdb_null, 
2498                            NULL, NULL, &res, &timeout, NULL);
2499         if (ret != 0 || res != 0) {
2500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2501                 return -1;
2502         }
2503
2504         return 0;
2505 }
2506
2507 /* thaw all databases */
2508 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2509 {
2510         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2511 }
2512
2513 /*
2514   get pnn of a node, or -1
2515  */
2516 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2517 {
2518         int ret;
2519         int32_t res;
2520
2521         ret = ctdb_control(ctdb, destnode, 0, 
2522                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2523                            NULL, NULL, &res, &timeout, NULL);
2524         if (ret != 0) {
2525                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2526                 return -1;
2527         }
2528
2529         return res;
2530 }
2531
2532 /*
2533   get the monitoring mode of a remote node
2534  */
2535 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2536 {
2537         int ret;
2538         int32_t res;
2539
2540         ret = ctdb_control(ctdb, destnode, 0, 
2541                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2542                            NULL, NULL, &res, &timeout, NULL);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2545                 return -1;
2546         }
2547
2548         *monmode = res;
2549
2550         return 0;
2551 }
2552
2553
2554 /*
2555  set the monitoring mode of a remote node to active
2556  */
2557 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2558 {
2559         int ret;
2560         
2561
2562         ret = ctdb_control(ctdb, destnode, 0, 
2563                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2564                            NULL, NULL,NULL, &timeout, NULL);
2565         if (ret != 0) {
2566                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2567                 return -1;
2568         }
2569
2570         
2571
2572         return 0;
2573 }
2574
2575 /*
2576   set the monitoring mode of a remote node to disable
2577  */
2578 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2579 {
2580         int ret;
2581         
2582
2583         ret = ctdb_control(ctdb, destnode, 0, 
2584                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2585                            NULL, NULL, NULL, &timeout, NULL);
2586         if (ret != 0) {
2587                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2588                 return -1;
2589         }
2590
2591         
2592
2593         return 0;
2594 }
2595
2596
2597
2598 /* 
2599   sent to a node to make it take over an ip address
2600 */
2601 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2602                           uint32_t destnode, struct ctdb_public_ip *ip)
2603 {
2604         TDB_DATA data;
2605         struct ctdb_public_ipv4 ipv4;
2606         int ret;
2607         int32_t res;
2608
2609         if (ip->addr.sa.sa_family == AF_INET) {
2610                 ipv4.pnn = ip->pnn;
2611                 ipv4.sin = ip->addr.ip;
2612
2613                 data.dsize = sizeof(ipv4);
2614                 data.dptr  = (uint8_t *)&ipv4;
2615
2616                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2617                            NULL, &res, &timeout, NULL);
2618         } else {
2619                 data.dsize = sizeof(*ip);
2620                 data.dptr  = (uint8_t *)ip;
2621
2622                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2623                            NULL, &res, &timeout, NULL);
2624         }
2625
2626         if (ret != 0 || res != 0) {
2627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2628                 return -1;
2629         }
2630
2631         return 0;       
2632 }
2633
2634
2635 /* 
2636   sent to a node to make it release an ip address
2637 */
2638 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2639                          uint32_t destnode, struct ctdb_public_ip *ip)
2640 {
2641         TDB_DATA data;
2642         struct ctdb_public_ipv4 ipv4;
2643         int ret;
2644         int32_t res;
2645
2646         if (ip->addr.sa.sa_family == AF_INET) {
2647                 ipv4.pnn = ip->pnn;
2648                 ipv4.sin = ip->addr.ip;
2649
2650                 data.dsize = sizeof(ipv4);
2651                 data.dptr  = (uint8_t *)&ipv4;
2652
2653                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2654                                    NULL, &res, &timeout, NULL);
2655         } else {
2656                 data.dsize = sizeof(*ip);
2657                 data.dptr  = (uint8_t *)ip;
2658
2659                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2660                                    NULL, &res, &timeout, NULL);
2661         }
2662
2663         if (ret != 0 || res != 0) {
2664                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2665                 return -1;
2666         }
2667
2668         return 0;       
2669 }
2670
2671
2672 /*
2673   get a tunable
2674  */
2675 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2676                           struct timeval timeout, 
2677                           uint32_t destnode,
2678                           const char *name, uint32_t *value)
2679 {
2680         struct ctdb_control_get_tunable *t;
2681         TDB_DATA data, outdata;
2682         int32_t res;
2683         int ret;
2684
2685         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2686         data.dptr  = talloc_size(ctdb, data.dsize);
2687         CTDB_NO_MEMORY(ctdb, data.dptr);
2688
2689         t = (struct ctdb_control_get_tunable *)data.dptr;
2690         t->length = strlen(name)+1;
2691         memcpy(t->name, name, t->length);
2692
2693         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2694                            &outdata, &res, &timeout, NULL);
2695         talloc_free(data.dptr);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2698                 return ret != 0 ? ret : res;
2699         }
2700
2701         if (outdata.dsize != sizeof(uint32_t)) {
2702                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2703                 talloc_free(outdata.dptr);
2704                 return -1;
2705         }
2706         
2707         *value = *(uint32_t *)outdata.dptr;
2708         talloc_free(outdata.dptr);
2709
2710         return 0;
2711 }
2712
2713 /*
2714   set a tunable
2715  */
2716 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2717                           struct timeval timeout, 
2718                           uint32_t destnode,
2719                           const char *name, uint32_t value)
2720 {
2721         struct ctdb_control_set_tunable *t;
2722         TDB_DATA data;
2723         int32_t res;
2724         int ret;
2725
2726         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2727         data.dptr  = talloc_size(ctdb, data.dsize);
2728         CTDB_NO_MEMORY(ctdb, data.dptr);
2729
2730         t = (struct ctdb_control_set_tunable *)data.dptr;
2731         t->length = strlen(name)+1;
2732         memcpy(t->name, name, t->length);
2733         t->value = value;
2734
2735         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2736                            NULL, &res, &timeout, NULL);
2737         talloc_free(data.dptr);
2738         if (ret != 0 || res != 0) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2740                 return -1;
2741         }
2742
2743         return 0;
2744 }
2745
2746 /*
2747   list tunables
2748  */
2749 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2750                             struct timeval timeout, 
2751                             uint32_t destnode,
2752                             TALLOC_CTX *mem_ctx,
2753                             const char ***list, uint32_t *count)
2754 {
2755         TDB_DATA outdata;
2756         int32_t res;
2757         int ret;
2758         struct ctdb_control_list_tunable *t;
2759         char *p, *s, *ptr;
2760
2761         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2762                            mem_ctx, &outdata, &res, &timeout, NULL);
2763         if (ret != 0 || res != 0) {
2764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2765                 return -1;
2766         }
2767
2768         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2769         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2770             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2771                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2772                 talloc_free(outdata.dptr);
2773                 return -1;              
2774         }
2775         
2776         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2777         CTDB_NO_MEMORY(ctdb, p);
2778
2779         talloc_free(outdata.dptr);
2780         
2781         (*list) = NULL;
2782         (*count) = 0;
2783
2784         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2785                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2786                 CTDB_NO_MEMORY(ctdb, *list);
2787                 (*list)[*count] = talloc_strdup(*list, s);
2788                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2789                 (*count)++;
2790         }
2791
2792         talloc_free(p);
2793
2794         return 0;
2795 }
2796
2797
2798 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2799                                    struct timeval timeout, uint32_t destnode,
2800                                    TALLOC_CTX *mem_ctx,
2801                                    uint32_t flags,
2802                                    struct ctdb_all_public_ips **ips)
2803 {
2804         int ret;
2805         TDB_DATA outdata;
2806         int32_t res;
2807
2808         ret = ctdb_control(ctdb, destnode, 0, 
2809                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2810                            mem_ctx, &outdata, &res, &timeout, NULL);
2811         if (ret == 0 && res == -1) {
2812                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2813                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2814         }
2815         if (ret != 0 || res != 0) {
2816           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2817                 return -1;
2818         }
2819
2820         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2821         talloc_free(outdata.dptr);
2822                     
2823         return 0;
2824 }
2825
2826 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2827                              struct timeval timeout, uint32_t destnode,
2828                              TALLOC_CTX *mem_ctx,
2829                              struct ctdb_all_public_ips **ips)
2830 {
2831         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2832                                               destnode, mem_ctx,
2833                                               0, ips);
2834 }
2835
2836 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2837                         struct timeval timeout, uint32_t destnode, 
2838                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2839 {
2840         int ret, i, len;
2841         TDB_DATA outdata;
2842         int32_t res;
2843         struct ctdb_all_public_ipsv4 *ipsv4;
2844
2845         ret = ctdb_control(ctdb, destnode, 0, 
2846                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2847                            mem_ctx, &outdata, &res, &timeout, NULL);
2848         if (ret != 0 || res != 0) {
2849                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2850                 return -1;
2851         }
2852
2853         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2854         len = offsetof(struct ctdb_all_public_ips, ips) +
2855                 ipsv4->num*sizeof(struct ctdb_public_ip);
2856         *ips = talloc_zero_size(mem_ctx, len);
2857         CTDB_NO_MEMORY(ctdb, *ips);
2858         (*ips)->num = ipsv4->num;
2859         for (i=0; i<ipsv4->num; i++) {
2860                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2861                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2862         }
2863
2864         talloc_free(outdata.dptr);
2865                     
2866         return 0;
2867 }
2868
2869 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2870                                  struct timeval timeout, uint32_t destnode,
2871                                  TALLOC_CTX *mem_ctx,
2872                                  const ctdb_sock_addr *addr,
2873                                  struct ctdb_control_public_ip_info **_info)
2874 {
2875         int ret;
2876         TDB_DATA indata;
2877         TDB_DATA outdata;
2878         int32_t res;
2879         struct ctdb_control_public_ip_info *info;
2880         uint32_t len;
2881         uint32_t i;
2882
2883         indata.dptr = discard_const_p(uint8_t, addr);
2884         indata.dsize = sizeof(*addr);
2885
2886         ret = ctdb_control(ctdb, destnode, 0,
2887                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2888                            mem_ctx, &outdata, &res, &timeout, NULL);
2889         if (ret != 0 || res != 0) {
2890                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2891                                 "failed ret:%d res:%d\n",
2892                                 ret, res));
2893                 return -1;
2894         }
2895
2896         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2897         if (len > outdata.dsize) {
2898                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2899                                 "returned invalid data with size %u > %u\n",
2900                                 (unsigned int)outdata.dsize,
2901                                 (unsigned int)len));
2902                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2903                 return -1;
2904         }
2905
2906         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2907         len += info->num*sizeof(struct ctdb_control_iface_info);
2908
2909         if (len > outdata.dsize) {
2910                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2911                                 "returned invalid data with size %u > %u\n",
2912                                 (unsigned int)outdata.dsize,
2913                                 (unsigned int)len));
2914                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2915                 return -1;
2916         }
2917
2918         /* make sure we null terminate the returned strings */
2919         for (i=0; i < info->num; i++) {
2920                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2921         }
2922
2923         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2924                                                                 outdata.dptr,
2925                                                                 outdata.dsize);
2926         talloc_free(outdata.dptr);
2927         if (*_info == NULL) {
2928                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2929                                 "talloc_memdup size %u failed\n",
2930                                 (unsigned int)outdata.dsize));
2931                 return -1;
2932         }
2933
2934         return 0;
2935 }
2936
2937 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2938                          struct timeval timeout, uint32_t destnode,
2939                          TALLOC_CTX *mem_ctx,
2940                          struct ctdb_control_get_ifaces **_ifaces)
2941 {
2942         int ret;
2943         TDB_DATA outdata;
2944         int32_t res;
2945         struct ctdb_control_get_ifaces *ifaces;
2946         uint32_t len;
2947         uint32_t i;
2948
2949         ret = ctdb_control(ctdb, destnode, 0,
2950                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2951                            mem_ctx, &outdata, &res, &timeout, NULL);
2952         if (ret != 0 || res != 0) {
2953                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2954                                 "failed ret:%d res:%d\n",
2955                                 ret, res));
2956                 return -1;
2957         }
2958
2959         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2960         if (len > outdata.dsize) {
2961                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2962                                 "returned invalid data with size %u > %u\n",
2963                                 (unsigned int)outdata.dsize,
2964                                 (unsigned int)len));
2965                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2966                 return -1;
2967         }
2968
2969         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2970         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2971
2972         if (len > outdata.dsize) {
2973                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2974                                 "returned invalid data with size %u > %u\n",
2975                                 (unsigned int)outdata.dsize,
2976                                 (unsigned int)len));
2977                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2978                 return -1;
2979         }
2980
2981         /* make sure we null terminate the returned strings */
2982         for (i=0; i < ifaces->num; i++) {
2983                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2984         }
2985
2986         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2987                                                                   outdata.dptr,
2988                                                                   outdata.dsize);
2989         talloc_free(outdata.dptr);
2990         if (*_ifaces == NULL) {
2991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2992                                 "talloc_memdup size %u failed\n",
2993                                 (unsigned int)outdata.dsize));
2994                 return -1;
2995         }
2996
2997         return 0;
2998 }
2999
3000 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
3001                              struct timeval timeout, uint32_t destnode,
3002                              TALLOC_CTX *mem_ctx,
3003                              const struct ctdb_control_iface_info *info)
3004 {
3005         int ret;
3006         TDB_DATA indata;
3007         int32_t res;
3008
3009         indata.dptr = discard_const_p(uint8_t, info);
3010         indata.dsize = sizeof(*info);
3011
3012         ret = ctdb_control(ctdb, destnode, 0,
3013                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
3014                            mem_ctx, NULL, &res, &timeout, NULL);
3015         if (ret != 0 || res != 0) {
3016                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
3017                                 "failed ret:%d res:%d\n",
3018                                 ret, res));
3019                 return -1;
3020         }
3021
3022         return 0;
3023 }
3024
3025 /*
3026   set/clear the permanent disabled bit on a remote node
3027  */
3028 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
3029                        uint32_t set, uint32_t clear)
3030 {
3031         int ret;
3032         TDB_DATA data;
3033         struct ctdb_node_map *nodemap=NULL;
3034         struct ctdb_node_flag_change c;
3035         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3036         uint32_t recmaster;
3037         uint32_t *nodes;
3038
3039
3040         /* find the recovery master */
3041         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
3042         if (ret != 0) {
3043                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3044                 talloc_free(tmp_ctx);
3045                 return ret;
3046         }
3047
3048
3049         /* read the node flags from the recmaster */
3050         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
3051         if (ret != 0) {
3052                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
3053                 talloc_free(tmp_ctx);
3054                 return -1;
3055         }
3056         if (destnode >= nodemap->num) {
3057                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
3058                 talloc_free(tmp_ctx);
3059                 return -1;
3060         }
3061
3062         c.pnn       = destnode;
3063         c.old_flags = nodemap->nodes[destnode].flags;
3064         c.new_flags = c.old_flags;
3065         c.new_flags |= set;
3066         c.new_flags &= ~clear;
3067
3068         data.dsize = sizeof(c);
3069         data.dptr = (unsigned char *)&c;
3070
3071         /* send the flags update to all connected nodes */
3072         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3073
3074         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3075                                         nodes, 0,
3076                                         timeout, false, data,
3077                                         NULL, NULL,
3078                                         NULL) != 0) {
3079                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3080
3081                 talloc_free(tmp_ctx);
3082                 return -1;
3083         }
3084
3085         talloc_free(tmp_ctx);
3086         return 0;
3087 }
3088
3089
3090 /*
3091   get all tunables
3092  */
3093 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3094                                struct timeval timeout, 
3095                                uint32_t destnode,
3096                                struct ctdb_tunable *tunables)
3097 {
3098         TDB_DATA outdata;
3099         int ret;
3100         int32_t res;
3101
3102         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3103                            &outdata, &res, &timeout, NULL);
3104         if (ret != 0 || res != 0) {
3105                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3106                 return -1;
3107         }
3108
3109         if (outdata.dsize != sizeof(*tunables)) {
3110                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3111                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3112                 return -1;              
3113         }
3114
3115         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3116         talloc_free(outdata.dptr);
3117         return 0;
3118 }
3119
3120 /*
3121   add a public address to a node
3122  */
3123 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3124                       struct timeval timeout, 
3125                       uint32_t destnode,
3126                       struct ctdb_control_ip_iface *pub)
3127 {
3128         TDB_DATA data;
3129         int32_t res;
3130         int ret;
3131
3132         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3133         data.dptr  = (unsigned char *)pub;
3134
3135         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3136                            NULL, &res, &timeout, NULL);
3137         if (ret != 0 || res != 0) {
3138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3139                 return -1;
3140         }
3141
3142         return 0;
3143 }
3144
3145 /*
3146   delete a public address from a node
3147  */
3148 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3149                       struct timeval timeout, 
3150                       uint32_t destnode,
3151                       struct ctdb_control_ip_iface *pub)
3152 {
3153         TDB_DATA data;
3154         int32_t res;
3155         int ret;
3156
3157         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3158         data.dptr  = (unsigned char *)pub;
3159
3160         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3161                            NULL, &res, &timeout, NULL);
3162         if (ret != 0 || res != 0) {
3163                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3164                 return -1;
3165         }
3166
3167         return 0;
3168 }
3169
3170 /*
3171   kill a tcp connection
3172  */
3173 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3174                       struct timeval timeout, 
3175                       uint32_t destnode,
3176                       struct ctdb_control_killtcp *killtcp)
3177 {
3178         TDB_DATA data;
3179         int32_t res;
3180         int ret;
3181
3182         data.dsize = sizeof(struct ctdb_control_killtcp);
3183         data.dptr  = (unsigned char *)killtcp;
3184
3185         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3186                            NULL, &res, &timeout, NULL);
3187         if (ret != 0 || res != 0) {
3188                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3189                 return -1;
3190         }
3191
3192         return 0;
3193 }
3194
3195 /*
3196   send a gratious arp
3197  */
3198 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3199                       struct timeval timeout, 
3200                       uint32_t destnode,
3201                       ctdb_sock_addr *addr,
3202                       const char *ifname)
3203 {
3204         TDB_DATA data;
3205         int32_t res;
3206         int ret, len;
3207         struct ctdb_control_gratious_arp *gratious_arp;
3208         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3209
3210
3211         len = strlen(ifname)+1;
3212         gratious_arp = talloc_size(tmp_ctx, 
3213                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3214         CTDB_NO_MEMORY(ctdb, gratious_arp);
3215
3216         gratious_arp->addr = *addr;
3217         gratious_arp->len = len;
3218         memcpy(&gratious_arp->iface[0], ifname, len);
3219
3220
3221         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3222         data.dptr  = (unsigned char *)gratious_arp;
3223
3224         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3225                            NULL, &res, &timeout, NULL);
3226         if (ret != 0 || res != 0) {
3227                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3228                 talloc_free(tmp_ctx);
3229                 return -1;
3230         }
3231
3232         talloc_free(tmp_ctx);
3233         return 0;
3234 }
3235
3236 /*
3237   get a list of all tcp tickles that a node knows about for a particular vnn
3238  */
3239 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3240                               struct timeval timeout, uint32_t destnode, 
3241                               TALLOC_CTX *mem_ctx, 
3242                               ctdb_sock_addr *addr,
3243                               struct ctdb_control_tcp_tickle_list **list)
3244 {
3245         int ret;
3246         TDB_DATA data, outdata;
3247         int32_t status;
3248
3249         data.dptr = (uint8_t*)addr;
3250         data.dsize = sizeof(ctdb_sock_addr);
3251
3252         ret = ctdb_control(ctdb, destnode, 0, 
3253                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3254                            mem_ctx, &outdata, &status, NULL, NULL);
3255         if (ret != 0 || status != 0) {
3256                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3257                 return -1;
3258         }
3259
3260         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3261
3262         return status;
3263 }
3264
3265 /*
3266   register a server id
3267  */
3268 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3269                       struct timeval timeout, 
3270                       struct ctdb_server_id *id)
3271 {
3272         TDB_DATA data;
3273         int32_t res;
3274         int ret;
3275
3276         data.dsize = sizeof(struct ctdb_server_id);
3277         data.dptr  = (unsigned char *)id;
3278
3279         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3280                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3281                         0, data, NULL,
3282                         NULL, &res, &timeout, NULL);
3283         if (ret != 0 || res != 0) {
3284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3285                 return -1;
3286         }
3287
3288         return 0;
3289 }
3290
3291 /*
3292   unregister a server id
3293  */
3294 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3295                       struct timeval timeout, 
3296                       struct ctdb_server_id *id)
3297 {
3298         TDB_DATA data;
3299         int32_t res;
3300         int ret;
3301
3302         data.dsize = sizeof(struct ctdb_server_id);
3303         data.dptr  = (unsigned char *)id;
3304
3305         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3306                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3307                         0, data, NULL,
3308                         NULL, &res, &timeout, NULL);
3309         if (ret != 0 || res != 0) {
3310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3311                 return -1;
3312         }
3313
3314         return 0;
3315 }
3316
3317
3318 /*
3319   check if a server id exists
3320
3321   if a server id does exist, return *status == 1, otherwise *status == 0
3322  */
3323 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3324                       struct timeval timeout, 
3325                       uint32_t destnode,
3326                       struct ctdb_server_id *id,
3327                       uint32_t *status)
3328 {
3329         TDB_DATA data;
3330         int32_t res;
3331         int ret;
3332
3333         data.dsize = sizeof(struct ctdb_server_id);
3334         data.dptr  = (unsigned char *)id;
3335
3336         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3337                         0, data, NULL,
3338                         NULL, &res, &timeout, NULL);
3339         if (ret != 0) {
3340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3341                 return -1;
3342         }
3343
3344         if (res) {
3345                 *status = 1;
3346         } else {
3347                 *status = 0;
3348         }
3349
3350         return 0;
3351 }
3352
3353 /*
3354    get the list of server ids that are registered on a node
3355 */
3356 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3357                 TALLOC_CTX *mem_ctx,
3358                 struct timeval timeout, uint32_t destnode, 
3359                 struct ctdb_server_id_list **svid_list)
3360 {
3361         int ret;
3362         TDB_DATA outdata;
3363         int32_t res;
3364
3365         ret = ctdb_control(ctdb, destnode, 0, 
3366                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3367                            mem_ctx, &outdata, &res, &timeout, NULL);
3368         if (ret != 0 || res != 0) {
3369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3370                 return -1;
3371         }
3372
3373         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3374                     
3375         return 0;
3376 }
3377
3378 /*
3379   initialise the ctdb daemon for client applications
3380
3381   NOTE: In current code the daemon does not fork. This is for testing purposes only
3382   and to simplify the code.
3383 */
3384 struct ctdb_context *ctdb_init(struct event_context *ev)
3385 {
3386         int ret;
3387         struct ctdb_context *ctdb;
3388
3389         ctdb = talloc_zero(ev, struct ctdb_context);
3390         if (ctdb == NULL) {
3391                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3392                 return NULL;
3393         }
3394         ctdb->ev  = ev;
3395         ctdb->idr = idr_init(ctdb);
3396         /* Wrap early to exercise code. */
3397         ctdb->lastid = INT_MAX-200;
3398         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3399
3400         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3401         if (ret != 0) {
3402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3403                 talloc_free(ctdb);
3404                 return NULL;
3405         }
3406
3407         ctdb->statistics.statistics_start_time = timeval_current();
3408
3409         return ctdb;
3410 }
3411
3412
3413 /*
3414   set some ctdb flags
3415 */
3416 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3417 {
3418         ctdb->flags |= flags;
3419 }
3420
3421 /*
3422   setup the local socket name
3423 */
3424 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3425 {
3426         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3427         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3428
3429         return 0;
3430 }
3431
3432 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3433 {
3434         return ctdb->daemon.name;
3435 }
3436
3437 /*
3438   return the pnn of this node
3439 */
3440 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3441 {
3442         return ctdb->pnn;
3443 }
3444
3445
3446 /*
3447   get the uptime of a remote node
3448  */
3449 struct ctdb_client_control_state *
3450 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3451 {
3452         return ctdb_control_send(ctdb, destnode, 0, 
3453                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3454                            mem_ctx, &timeout, NULL);
3455 }
3456
3457 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3458 {
3459         int ret;
3460         int32_t res;
3461         TDB_DATA outdata;
3462
3463         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3464         if (ret != 0 || res != 0) {
3465                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3466                 return -1;
3467         }
3468
3469         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3470
3471         return 0;
3472 }
3473
3474 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3475 {
3476         struct ctdb_client_control_state *state;
3477
3478         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3479         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3480 }
3481
3482 /*
3483   send a control to execute the "recovered" event script on a node
3484  */
3485 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3486 {
3487         int ret;
3488         int32_t status;
3489
3490         ret = ctdb_control(ctdb, destnode, 0, 
3491                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3492                            NULL, NULL, &status, &timeout, NULL);
3493         if (ret != 0 || status != 0) {
3494                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3495                 return -1;
3496         }
3497
3498         return 0;
3499 }
3500
3501 /* 
3502   callback for the async helpers used when sending the same control
3503   to multiple nodes in parallell.
3504 */
3505 static void async_callback(struct ctdb_client_control_state *state)
3506 {
3507         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3508         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3509         int ret;
3510         TDB_DATA outdata;
3511         int32_t res = -1;
3512         uint32_t destnode = state->c->hdr.destnode;
3513
3514         outdata.dsize = 0;
3515         outdata.dptr = NULL;
3516
3517         /* one more node has responded with recmode data */
3518         data->count--;
3519
3520         /* if we failed to push the db, then return an error and let
3521            the main loop try again.
3522         */
3523         if (state->state != CTDB_CONTROL_DONE) {
3524                 if ( !data->dont_log_errors) {
3525                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3526                 }
3527                 data->fail_count++;
3528                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3529                         res = -ETIME;
3530                 } else {
3531                         res = -1;
3532                 }
3533                 if (data->fail_callback) {
3534                         data->fail_callback(ctdb, destnode, res, outdata,
3535                                         data->callback_data);
3536                 }
3537                 return;
3538         }
3539         
3540         state->async.fn = NULL;
3541
3542         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3543         if ((ret != 0) || (res != 0)) {
3544                 if ( !data->dont_log_errors) {
3545                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3546                 }
3547                 data->fail_count++;
3548                 if (data->fail_callback) {
3549                         data->fail_callback(ctdb, destnode, res, outdata,
3550                                         data->callback_data);
3551                 }
3552         }
3553         if ((ret == 0) && (data->callback != NULL)) {
3554                 data->callback(ctdb, destnode, res, outdata,
3555                                         data->callback_data);
3556         }
3557 }
3558
3559
3560 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3561 {
3562         /* set up the callback functions */
3563         state->async.fn = async_callback;
3564         state->async.private_data = data;
3565         
3566         /* one more control to wait for to complete */
3567         data->count++;
3568 }
3569
3570
3571 /* wait for up to the maximum number of seconds allowed
3572    or until all nodes we expect a response from has replied
3573 */
3574 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3575 {
3576         while (data->count > 0) {
3577                 event_loop_once(ctdb->ev);
3578         }
3579         if (data->fail_count != 0) {
3580                 if (!data->dont_log_errors) {
3581                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3582                                  data->fail_count));
3583                 }
3584                 return -1;
3585         }
3586         return 0;
3587 }
3588
3589
3590 /* 
3591    perform a simple control on the listed nodes
3592    The control cannot return data
3593  */
3594 int ctdb_client_async_control(struct ctdb_context *ctdb,
3595                                 enum ctdb_controls opcode,
3596                                 uint32_t *nodes,
3597                                 uint64_t srvid,
3598                                 struct timeval timeout,
3599                                 bool dont_log_errors,
3600                                 TDB_DATA data,
3601                                 client_async_callback client_callback,
3602                                 client_async_callback fail_callback,
3603                                 void *callback_data)
3604 {
3605         struct client_async_data *async_data;
3606         struct ctdb_client_control_state *state;
3607         int j, num_nodes;
3608
3609         async_data = talloc_zero(ctdb, struct client_async_data);
3610         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3611         async_data->dont_log_errors = dont_log_errors;
3612         async_data->callback = client_callback;
3613         async_data->fail_callback = fail_callback;
3614         async_data->callback_data = callback_data;
3615         async_data->opcode        = opcode;
3616
3617         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3618
3619         /* loop over all nodes and send an async control to each of them */
3620         for (j=0; j<num_nodes; j++) {
3621                 uint32_t pnn = nodes[j];
3622
3623                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3624                                           0, data, async_data, &timeout, NULL);
3625                 if (state == NULL) {
3626                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3627                         talloc_free(async_data);
3628                         return -1;
3629                 }
3630                 
3631                 ctdb_client_async_add(async_data, state);
3632         }
3633
3634         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3635                 talloc_free(async_data);
3636                 return -1;
3637         }
3638
3639         talloc_free(async_data);
3640         return 0;
3641 }
3642
3643 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3644                                 struct ctdb_vnn_map *vnn_map,
3645                                 TALLOC_CTX *mem_ctx,
3646                                 bool include_self)
3647 {
3648         int i, j, num_nodes;
3649         uint32_t *nodes;
3650
3651         for (i=num_nodes=0;i<vnn_map->size;i++) {
3652                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3653                         continue;
3654                 }
3655                 num_nodes++;
3656         } 
3657
3658         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3659         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3660
3661         for (i=j=0;i<vnn_map->size;i++) {
3662                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3663                         continue;
3664                 }
3665                 nodes[j++] = vnn_map->map[i];
3666         } 
3667
3668         return nodes;
3669 }
3670
3671 /* Get list of nodes not including those with flags specified by mask.
3672  * If exclude_pnn is not -1 then exclude that pnn from the list.
3673  */
3674 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3675                         struct ctdb_node_map *node_map,
3676                         TALLOC_CTX *mem_ctx,
3677                         uint32_t mask,
3678                         int exclude_pnn)
3679 {
3680         int i, j, num_nodes;
3681         uint32_t *nodes;
3682
3683         for (i=num_nodes=0;i<node_map->num;i++) {
3684                 if (node_map->nodes[i].flags & mask) {
3685                         continue;
3686                 }
3687                 if (node_map->nodes[i].pnn == exclude_pnn) {
3688                         continue;
3689                 }
3690                 num_nodes++;
3691         } 
3692
3693         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3694         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3695
3696         for (i=j=0;i<node_map->num;i++) {
3697                 if (node_map->nodes[i].flags & mask) {
3698                         continue;
3699                 }
3700                 if (node_map->nodes[i].pnn == exclude_pnn) {
3701                         continue;
3702                 }
3703                 nodes[j++] = node_map->nodes[i].pnn;
3704         } 
3705
3706         return nodes;
3707 }
3708
3709 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3710                                 struct ctdb_node_map *node_map,
3711                                 TALLOC_CTX *mem_ctx,
3712                                 bool include_self)
3713 {
3714         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3715                              include_self ? -1 : ctdb->pnn);
3716 }
3717
3718 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3719                                 struct ctdb_node_map *node_map,
3720                                 TALLOC_CTX *mem_ctx,
3721                                 bool include_self)
3722 {
3723         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3724                              include_self ? -1 : ctdb->pnn);
3725 }
3726
3727 /* 
3728   this is used to test if a pnn lock exists and if it exists will return
3729   the number of connections that pnn has reported or -1 if that recovery
3730   daemon is not running.
3731 */
3732 int
3733 ctdb_read_pnn_lock(int fd, int32_t pnn)
3734 {
3735         struct flock lock;
3736         char c;
3737
3738         lock.l_type = F_WRLCK;
3739         lock.l_whence = SEEK_SET;
3740         lock.l_start = pnn;
3741         lock.l_len = 1;
3742         lock.l_pid = 0;
3743
3744         if (fcntl(fd, F_GETLK, &lock) != 0) {
3745                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3746                 return -1;
3747         }
3748
3749         if (lock.l_type == F_UNLCK) {
3750                 return -1;
3751         }
3752
3753         if (pread(fd, &c, 1, pnn) == -1) {
3754                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3755                 return -1;
3756         }
3757
3758         return c;
3759 }
3760
3761 /*
3762   get capabilities of a remote node
3763  */
3764 struct ctdb_client_control_state *
3765 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3766 {
3767         return ctdb_control_send(ctdb, destnode, 0, 
3768                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3769                            mem_ctx, &timeout, NULL);
3770 }
3771
3772 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3773 {
3774         int ret;
3775         int32_t res;
3776         TDB_DATA outdata;
3777
3778         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3779         if ( (ret != 0) || (res != 0) ) {
3780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3781                 return -1;
3782         }
3783
3784         if (capabilities) {
3785                 *capabilities = *((uint32_t *)outdata.dptr);
3786         }
3787
3788         return 0;
3789 }
3790
3791 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3792 {
3793         struct ctdb_client_control_state *state;
3794         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3795         int ret;
3796
3797         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3798         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3799         talloc_free(tmp_ctx);
3800         return ret;
3801 }
3802
3803 struct server_id {
3804         uint64_t pid;
3805         uint32_t task_id;
3806         uint32_t vnn;
3807         uint64_t unique_id;
3808 };
3809
3810 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3811 {
3812         struct server_id id;
3813
3814         id.pid = getpid();
3815         id.task_id = reqid;
3816         id.vnn = ctdb_get_pnn(ctdb);
3817         id.unique_id = id.vnn;
3818         id.unique_id = (id.unique_id << 32) | reqid;
3819
3820         return id;
3821 }
3822
3823 /* This is basically a copy from Samba's server_id.*.  However, a
3824  * dependency chain stops us from using Samba's version, so use a
3825  * renamed copy until a better solution is found. */
3826 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3827 {
3828         if (id1->pid != id2->pid) {
3829                 return false;
3830         }
3831
3832         if (id1->task_id != id2->task_id) {
3833                 return false;
3834         }
3835
3836         if (id1->vnn != id2->vnn) {
3837                 return false;
3838         }
3839
3840         if (id1->unique_id != id2->unique_id) {
3841                 return false;
3842         }
3843
3844         return true;
3845 }
3846
3847 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3848 {
3849         struct ctdb_server_id sid;
3850         int ret;
3851         uint32_t result;
3852
3853         sid.type = SERVER_TYPE_SAMBA;
3854         sid.pnn = id->vnn;
3855         sid.server_id = id->pid;
3856
3857         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3858                                         id->vnn, &sid, &result);
3859         if (ret != 0) {
3860                 /* If control times out, assume server_id exists. */
3861                 return true;
3862         }
3863
3864         if (result) {
3865                 return true;
3866         }
3867
3868         return false;
3869 }
3870
3871
3872 enum g_lock_type {
3873         G_LOCK_READ = 0,
3874         G_LOCK_WRITE = 1,
3875 };
3876
3877 struct g_lock_rec {
3878         enum g_lock_type type;
3879         struct server_id id;
3880 };
3881
3882 struct g_lock_recs {
3883         unsigned int num;
3884         struct g_lock_rec *lock;
3885 };
3886
3887 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3888                          struct g_lock_recs **locks)
3889 {
3890         struct g_lock_recs *recs;
3891
3892         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3893         if (recs == NULL) {
3894                 return false;
3895         }
3896
3897         if (data.dsize == 0) {
3898                 goto done;
3899         }
3900
3901         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3902                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3903                                   (unsigned long)data.dsize));
3904                 talloc_free(recs);
3905                 return false;
3906         }
3907
3908         recs->num = data.dsize / sizeof(struct g_lock_rec);
3909         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3910         if (recs->lock == NULL) {
3911                 talloc_free(recs);
3912                 return false;
3913         }
3914
3915 done:
3916         if (locks != NULL) {
3917                 *locks = recs;
3918         }
3919
3920         return true;
3921 }
3922
3923
3924 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3925                         struct ctdb_db_context *ctdb_db,
3926                         const char *keyname, uint32_t reqid)
3927 {
3928         TDB_DATA key, data;
3929         struct ctdb_record_handle *h;
3930         struct g_lock_recs *locks;
3931         struct server_id id;
3932         struct timeval t_start;
3933         int i;
3934
3935         key.dptr = (uint8_t *)discard_const(keyname);
3936         key.dsize = strlen(keyname) + 1;
3937
3938         t_start = timeval_current();
3939
3940 again:
3941         /* Keep trying for an hour. */
3942         if (timeval_elapsed(&t_start) > 3600) {
3943                 return false;
3944         }
3945
3946         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3947         if (h == NULL) {
3948                 return false;
3949         }
3950
3951         if (!g_lock_parse(h, data, &locks)) {
3952                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3953                 talloc_free(data.dptr);
3954                 talloc_free(h);
3955                 return false;
3956         }
3957
3958         talloc_free(data.dptr);
3959
3960         id = server_id_get(ctdb_db->ctdb, reqid);
3961
3962         i = 0;
3963         while (i < locks->num) {
3964                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3965                         /* Internal error */
3966                         talloc_free(h);
3967                         return false;
3968                 }
3969
3970                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3971                         if (i < locks->num-1) {
3972                                 locks->lock[i] = locks->lock[locks->num-1];
3973                         }
3974                         locks->num--;
3975                         continue;
3976                 }
3977
3978                 /* This entry is locked. */
3979                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3980                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3981                                    (unsigned long long)id.pid,
3982                                    id.task_id, id.vnn,
3983                                    (unsigned long long)id.unique_id));
3984                 talloc_free(h);
3985                 goto again;
3986         }
3987
3988         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3989                                      locks->num+1);
3990         if (locks->lock == NULL) {
3991                 talloc_free(h);
3992                 return false;
3993         }
3994
3995         locks->lock[locks->num].type = G_LOCK_WRITE;
3996         locks->lock[locks->num].id = id;
3997         locks->num++;
3998
3999         data.dptr = (uint8_t *)locks->lock;
4000         data.dsize = locks->num * sizeof(struct g_lock_rec);
4001
4002         if (ctdb_record_store(h, data) != 0) {
4003                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
4004                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4005                                   (unsigned long long)id.pid,
4006                                   id.task_id, id.vnn,
4007                                   (unsigned long long)id.unique_id));
4008                 talloc_free(h);
4009                 return false;
4010         }
4011
4012         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
4013                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4014                            (unsigned long long)id.pid,
4015                            id.task_id, id.vnn,
4016                            (unsigned long long)id.unique_id));
4017
4018         talloc_free(h);
4019         return true;
4020 }
4021
4022 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
4023                           struct ctdb_db_context *ctdb_db,
4024                           const char *keyname, uint32_t reqid)
4025 {
4026         TDB_DATA key, data;
4027         struct ctdb_record_handle *h;
4028         struct g_lock_recs *locks;
4029         struct server_id id;
4030         int i;
4031         bool found = false;
4032
4033         key.dptr = (uint8_t *)discard_const(keyname);
4034         key.dsize = strlen(keyname) + 1;
4035         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4036         if (h == NULL) {
4037                 return false;
4038         }
4039
4040         if (!g_lock_parse(h, data, &locks)) {
4041                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4042                 talloc_free(data.dptr);
4043                 talloc_free(h);
4044                 return false;
4045         }
4046
4047         talloc_free(data.dptr);
4048
4049         id = server_id_get(ctdb_db->ctdb, reqid);
4050
4051         for (i=0; i<locks->num; i++) {
4052                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4053                         if (i < locks->num-1) {
4054                                 locks->lock[i] = locks->lock[locks->num-1];
4055                         }
4056                         locks->num--;
4057                         found = true;
4058                         break;
4059                 }
4060         }
4061
4062         if (!found) {
4063                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4064                 talloc_free(h);
4065                 return false;
4066         }
4067
4068         data.dptr = (uint8_t *)locks->lock;
4069         data.dsize = locks->num * sizeof(struct g_lock_rec);
4070
4071         if (ctdb_record_store(h, data) != 0) {
4072                 talloc_free(h);
4073                 return false;
4074         }
4075
4076         talloc_free(h);
4077         return true;
4078 }
4079
4080
4081 struct ctdb_transaction_handle {
4082         struct ctdb_db_context *ctdb_db;
4083         struct ctdb_db_context *g_lock_db;
4084         char *lock_name;
4085         uint32_t reqid;
4086         /*
4087          * we store reads and writes done under a transaction:
4088          * - one list stores both reads and writes (m_all)
4089          * - the other just writes (m_write)
4090          */
4091         struct ctdb_marshall_buffer *m_all;
4092         struct ctdb_marshall_buffer *m_write;
4093 };
4094
4095 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4096 {
4097         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4098         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4099         return 0;
4100 }
4101
4102
4103 /**
4104  * start a transaction on a database
4105  */
4106 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4107                                                        TALLOC_CTX *mem_ctx)
4108 {
4109         struct ctdb_transaction_handle *h;
4110         struct ctdb_server_id id;
4111
4112         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4113         if (h == NULL) {
4114                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4115                 return NULL;
4116         }
4117
4118         h->ctdb_db = ctdb_db;
4119         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4120                                        (unsigned int)ctdb_db->db_id);
4121         if (h->lock_name == NULL) {
4122                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4123                 talloc_free(h);
4124                 return NULL;
4125         }
4126
4127         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4128                                    "g_lock.tdb", false, 0);
4129         if (!h->g_lock_db) {
4130                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4131                 talloc_free(h);
4132                 return NULL;
4133         }
4134
4135         id.type = SERVER_TYPE_SAMBA;
4136         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4137         id.server_id = getpid();
4138
4139         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4140                                          &id) != 0) {
4141                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4142                 talloc_free(h);
4143                 return NULL;
4144         }
4145
4146         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4147
4148         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4149                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4150                 talloc_free(h);
4151                 return NULL;
4152         }
4153
4154         talloc_set_destructor(h, ctdb_transaction_destructor);
4155         return h;
4156 }
4157
4158 /**
4159  * fetch a record inside a transaction
4160  */
4161 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4162                            TALLOC_CTX *mem_ctx,
4163                            TDB_DATA key, TDB_DATA *data)
4164 {
4165         struct ctdb_ltdb_header header;
4166         int ret;
4167
4168         ZERO_STRUCT(header);
4169
4170         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4171         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4172                 /* record doesn't exist yet */
4173                 *data = tdb_null;
4174                 ret = 0;
4175         }
4176
4177         if (ret != 0) {
4178                 return ret;
4179         }
4180
4181         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4182         if (h->m_all == NULL) {
4183                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4184                 return -1;
4185         }
4186
4187         return 0;
4188 }
4189
4190 /**
4191  * stores a record inside a transaction
4192  */
4193 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4194                            TDB_DATA key, TDB_DATA data)
4195 {
4196         TALLOC_CTX *tmp_ctx = talloc_new(h);
4197         struct ctdb_ltdb_header header;
4198         TDB_DATA olddata;
4199         int ret;
4200
4201         /* we need the header so we can update the RSN */
4202         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4203         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4204                 /* the record doesn't exist - create one with us as dmaster.
4205                    This is only safe because we are in a transaction and this
4206                    is a persistent database */
4207                 ZERO_STRUCT(header);
4208         } else if (ret != 0) {
4209                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4210                 talloc_free(tmp_ctx);
4211                 return ret;
4212         }
4213
4214         if (data.dsize == olddata.dsize &&
4215             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4216             header.rsn != 0) {
4217                 /* save writing the same data */
4218                 talloc_free(tmp_ctx);
4219                 return 0;
4220         }
4221
4222         header.dmaster = h->ctdb_db->ctdb->pnn;
4223         header.rsn++;
4224
4225         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4226         if (h->m_all == NULL) {
4227                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4228                 talloc_free(tmp_ctx);
4229                 return -1;
4230         }
4231
4232         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4233         if (h->m_write == NULL) {
4234                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4235                 talloc_free(tmp_ctx);
4236                 return -1;
4237         }
4238
4239         talloc_free(tmp_ctx);
4240         return 0;
4241 }
4242
4243 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4244 {
4245         const char *keyname = CTDB_DB_SEQNUM_KEY;
4246         TDB_DATA key, data;
4247         struct ctdb_ltdb_header header;
4248         int ret;
4249
4250         key.dptr = (uint8_t *)discard_const(keyname);
4251         key.dsize = strlen(keyname) + 1;
4252
4253         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4254         if (ret != 0) {
4255                 *seqnum = 0;
4256                 return 0;
4257         }
4258
4259         if (data.dsize == 0) {
4260                 *seqnum = 0;
4261                 return 0;
4262         }
4263
4264         if (data.dsize != sizeof(*seqnum)) {
4265                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4266                                   data.dsize));
4267                 talloc_free(data.dptr);
4268                 return -1;
4269         }
4270
4271         *seqnum = *(uint64_t *)data.dptr;
4272         talloc_free(data.dptr);
4273
4274         return 0;
4275 }
4276
4277
4278 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4279                                 uint64_t seqnum)
4280 {
4281         const char *keyname = CTDB_DB_SEQNUM_KEY;
4282         TDB_DATA key, data;
4283
4284         key.dptr = (uint8_t *)discard_const(keyname);
4285         key.dsize = strlen(keyname) + 1;
4286
4287         data.dptr = (uint8_t *)&seqnum;
4288         data.dsize = sizeof(seqnum);
4289
4290         return ctdb_transaction_store(h, key, data);
4291 }
4292
4293
4294 /**
4295  * commit a transaction
4296  */
4297 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4298 {
4299         int ret;
4300         uint64_t old_seqnum, new_seqnum;
4301         int32_t status;
4302         struct timeval timeout;
4303
4304         if (h->m_write == NULL) {
4305                 /* no changes were made */
4306                 talloc_free(h);
4307                 return 0;
4308         }
4309
4310         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4311         if (ret != 0) {
4312                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4313                 ret = -1;
4314                 goto done;
4315         }
4316
4317         new_seqnum = old_seqnum + 1;
4318         ret = ctdb_store_db_seqnum(h, new_seqnum);
4319         if (ret != 0) {
4320                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4321                 ret = -1;
4322                 goto done;
4323         }
4324
4325 again:
4326         timeout = timeval_current_ofs(3,0);
4327         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4328                            h->ctdb_db->db_id,
4329                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4330                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4331                            &status, &timeout, NULL);
4332         if (ret != 0 || status != 0) {
4333                 /*
4334                  * TRANS3_COMMIT control will only fail if recovery has been
4335                  * triggered.  Check if the database has been updated or not.
4336                  */
4337                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4338                 if (ret != 0) {
4339                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4340                         goto done;
4341                 }
4342
4343                 if (new_seqnum == old_seqnum) {
4344                         /* Database not yet updated, try again */
4345                         goto again;
4346                 }
4347
4348                 if (new_seqnum != (old_seqnum + 1)) {
4349                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4350                                           (long long unsigned)new_seqnum,
4351                                           (long long unsigned)old_seqnum));
4352                         ret = -1;
4353                         goto done;
4354                 }
4355         }
4356
4357         ret = 0;
4358
4359 done:
4360         talloc_free(h);
4361         return ret;
4362 }
4363
4364 /**
4365  * cancel a transaction
4366  */
4367 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4368 {
4369         talloc_free(h);
4370         return 0;
4371 }
4372
4373
4374 /*
4375   recovery daemon ping to main daemon
4376  */
4377 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4378 {
4379         int ret;
4380         int32_t res;
4381
4382         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4383                            ctdb, NULL, &res, NULL, NULL);
4384         if (ret != 0 || res != 0) {
4385                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4386                 return -1;
4387         }
4388
4389         return 0;
4390 }
4391
4392 /* When forking the main daemon and the child process needs to connect
4393  * back to the daemon as a client process, this function can be used
4394  * to change the ctdb context from daemon into client mode.  The child
4395  * process must be created using ctdb_fork() and not fork() -
4396  * ctdb_fork() does some necessary housekeeping.
4397  */
4398 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4399 {
4400         int ret;
4401         va_list ap;
4402
4403         /* Add extra information so we can identify this in the logs */
4404         va_start(ap, fmt);
4405         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4406         va_end(ap);
4407
4408         /* get a new event context */
4409         ctdb->ev = event_context_init(ctdb);
4410         tevent_loop_allow_nesting(ctdb->ev);
4411
4412         /* Connect to main CTDB daemon */
4413         ret = ctdb_socket_connect(ctdb);
4414         if (ret != 0) {
4415                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4416                 return -1;
4417         }
4418
4419         ctdb->can_send_controls = true;
4420
4421         return 0;
4422 }
4423
4424 /*
4425   get the status of running the monitor eventscripts: NULL means never run.
4426  */
4427 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4428                 struct timeval timeout, uint32_t destnode, 
4429                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4430                 struct ctdb_scripts_wire **scripts)
4431 {
4432         int ret;
4433         TDB_DATA outdata, indata;
4434         int32_t res;
4435         uint32_t uinttype = type;
4436
4437         indata.dptr = (uint8_t *)&uinttype;
4438         indata.dsize = sizeof(uinttype);
4439
4440         ret = ctdb_control(ctdb, destnode, 0, 
4441                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4442                            mem_ctx, &outdata, &res, &timeout, NULL);
4443         if (ret != 0 || res != 0) {
4444                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4445                 return -1;
4446         }
4447
4448         if (outdata.dsize == 0) {
4449                 *scripts = NULL;
4450         } else {
4451                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4452                 talloc_free(outdata.dptr);
4453         }
4454                     
4455         return 0;
4456 }
4457
4458 /*
4459   tell the main daemon how long it took to lock the reclock file
4460  */
4461 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4462 {
4463         int ret;
4464         int32_t res;
4465         TDB_DATA data;
4466
4467         data.dptr = (uint8_t *)&latency;
4468         data.dsize = sizeof(latency);
4469
4470         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4471                            ctdb, NULL, &res, NULL, NULL);
4472         if (ret != 0 || res != 0) {
4473                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4474                 return -1;
4475         }
4476
4477         return 0;
4478 }
4479
4480 /*
4481   get the name of the reclock file
4482  */
4483 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4484                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4485                          const char **name)
4486 {
4487         int ret;
4488         int32_t res;
4489         TDB_DATA data;
4490
4491         ret = ctdb_control(ctdb, destnode, 0, 
4492                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4493                            mem_ctx, &data, &res, &timeout, NULL);
4494         if (ret != 0 || res != 0) {
4495                 return -1;
4496         }
4497
4498         if (data.dsize == 0) {
4499                 *name = NULL;
4500         } else {
4501                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4502         }
4503         talloc_free(data.dptr);
4504
4505         return 0;
4506 }
4507
4508 /*
4509   set the reclock filename for a node
4510  */
4511 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4512 {
4513         int ret;
4514         TDB_DATA data;
4515         int32_t res;
4516
4517         if (reclock == NULL) {
4518                 data.dsize = 0;
4519                 data.dptr  = NULL;
4520         } else {
4521                 data.dsize = strlen(reclock) + 1;
4522                 data.dptr  = discard_const(reclock);
4523         }
4524
4525         ret = ctdb_control(ctdb, destnode, 0, 
4526                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4527                            NULL, NULL, &res, &timeout, NULL);
4528         if (ret != 0 || res != 0) {
4529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4530                 return -1;
4531         }
4532
4533         return 0;
4534 }
4535
4536 /*
4537   stop a node
4538  */
4539 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4540 {
4541         int ret;
4542         int32_t res;
4543
4544         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4545                            ctdb, NULL, &res, &timeout, NULL);
4546         if (ret != 0 || res != 0) {
4547                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4548                 return -1;
4549         }
4550
4551         return 0;
4552 }
4553
4554 /*
4555   continue a node
4556  */
4557 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4558 {
4559         int ret;
4560
4561         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4562                            ctdb, NULL, NULL, &timeout, NULL);
4563         if (ret != 0) {
4564                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4565                 return -1;
4566         }
4567
4568         return 0;
4569 }
4570
4571 /*
4572   set the natgw state for a node
4573  */
4574 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4575 {
4576         int ret;
4577         TDB_DATA data;
4578         int32_t res;
4579
4580         data.dsize = sizeof(natgwstate);
4581         data.dptr  = (uint8_t *)&natgwstate;
4582
4583         ret = ctdb_control(ctdb, destnode, 0, 
4584                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4585                            NULL, NULL, &res, &timeout, NULL);
4586         if (ret != 0 || res != 0) {
4587                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4588                 return -1;
4589         }
4590
4591         return 0;
4592 }
4593
4594 /*
4595   set the lmaster role for a node
4596  */
4597 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4598 {
4599         int ret;
4600         TDB_DATA data;
4601         int32_t res;
4602
4603         data.dsize = sizeof(lmasterrole);
4604         data.dptr  = (uint8_t *)&lmasterrole;
4605
4606         ret = ctdb_control(ctdb, destnode, 0, 
4607                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4608                            NULL, NULL, &res, &timeout, NULL);
4609         if (ret != 0 || res != 0) {
4610                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4611                 return -1;
4612         }
4613
4614         return 0;
4615 }
4616
4617 /*
4618   set the recmaster role for a node
4619  */
4620 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4621 {
4622         int ret;
4623         TDB_DATA data;
4624         int32_t res;
4625
4626         data.dsize = sizeof(recmasterrole);
4627         data.dptr  = (uint8_t *)&recmasterrole;
4628
4629         ret = ctdb_control(ctdb, destnode, 0, 
4630                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4631                            NULL, NULL, &res, &timeout, NULL);
4632         if (ret != 0 || res != 0) {
4633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4634                 return -1;
4635         }
4636
4637         return 0;
4638 }
4639
4640 /* enable an eventscript
4641  */
4642 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4643 {
4644         int ret;
4645         TDB_DATA data;
4646         int32_t res;
4647
4648         data.dsize = strlen(script) + 1;
4649         data.dptr  = discard_const(script);
4650
4651         ret = ctdb_control(ctdb, destnode, 0, 
4652                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4653                            NULL, NULL, &res, &timeout, NULL);
4654         if (ret != 0 || res != 0) {
4655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4656                 return -1;
4657         }
4658
4659         return 0;
4660 }
4661
4662 /* disable an eventscript
4663  */
4664 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4665 {
4666         int ret;
4667         TDB_DATA data;
4668         int32_t res;
4669
4670         data.dsize = strlen(script) + 1;
4671         data.dptr  = discard_const(script);
4672
4673         ret = ctdb_control(ctdb, destnode, 0, 
4674                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4675                            NULL, NULL, &res, &timeout, NULL);
4676         if (ret != 0 || res != 0) {
4677                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4678                 return -1;
4679         }
4680
4681         return 0;
4682 }
4683
4684
4685 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4686 {
4687         int ret;
4688         TDB_DATA data;
4689         int32_t res;
4690
4691         data.dsize = sizeof(*bantime);
4692         data.dptr  = (uint8_t *)bantime;
4693
4694         ret = ctdb_control(ctdb, destnode, 0, 
4695                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4696                            NULL, NULL, &res, &timeout, NULL);
4697         if (ret != 0 || res != 0) {
4698                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4699                 return -1;
4700         }
4701
4702         return 0;
4703 }
4704
4705
4706 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4707 {
4708         int ret;
4709         TDB_DATA outdata;
4710         int32_t res;
4711         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4712
4713         ret = ctdb_control(ctdb, destnode, 0, 
4714                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4715                            tmp_ctx, &outdata, &res, &timeout, NULL);
4716         if (ret != 0 || res != 0) {
4717                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4718                 talloc_free(tmp_ctx);
4719                 return -1;
4720         }
4721
4722         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4723         talloc_free(tmp_ctx);
4724
4725         return 0;
4726 }
4727
4728
4729 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4730 {
4731         int ret;
4732         int32_t res;
4733         TDB_DATA data;
4734         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4735
4736         data.dptr = (uint8_t*)db_prio;
4737         data.dsize = sizeof(*db_prio);
4738
4739         ret = ctdb_control(ctdb, destnode, 0, 
4740                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4741                            tmp_ctx, NULL, &res, &timeout, NULL);
4742         if (ret != 0 || res != 0) {
4743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4744                 talloc_free(tmp_ctx);
4745                 return -1;
4746         }
4747
4748         talloc_free(tmp_ctx);
4749
4750         return 0;
4751 }
4752
4753 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4754 {
4755         int ret;
4756         int32_t res;
4757         TDB_DATA data;
4758         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4759
4760         data.dptr = (uint8_t*)&db_id;
4761         data.dsize = sizeof(db_id);
4762
4763         ret = ctdb_control(ctdb, destnode, 0, 
4764                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4765                            tmp_ctx, NULL, &res, &timeout, NULL);
4766         if (ret != 0 || res < 0) {
4767                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4768                 talloc_free(tmp_ctx);
4769                 return -1;
4770         }
4771
4772         if (priority) {
4773                 *priority = res;
4774         }
4775
4776         talloc_free(tmp_ctx);
4777
4778         return 0;
4779 }
4780
4781 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4782 {
4783         int ret;
4784         TDB_DATA outdata;
4785         int32_t res;
4786
4787         ret = ctdb_control(ctdb, destnode, 0, 
4788                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4789                            mem_ctx, &outdata, &res, &timeout, NULL);
4790         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4791                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4792                 return -1;
4793         }
4794
4795         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4796         talloc_free(outdata.dptr);
4797                     
4798         return 0;
4799 }
4800
4801 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4802 {
4803         if (h == NULL) {
4804                 return NULL;
4805         }
4806
4807         return &h->header;
4808 }
4809
4810
4811 struct ctdb_client_control_state *
4812 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4813 {
4814         struct ctdb_client_control_state *handle;
4815         struct ctdb_marshall_buffer *m;
4816         struct ctdb_rec_data *rec;
4817         TDB_DATA outdata;
4818
4819         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4820         if (m == NULL) {
4821                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4822                 return NULL;
4823         }
4824
4825         m->db_id = ctdb_db->db_id;
4826
4827         rec = ctdb_marshall_record(m, 0, key, header, data);
4828         if (rec == NULL) {
4829                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4830                 talloc_free(m);
4831                 return NULL;
4832         }
4833         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4834         if (m == NULL) {
4835                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4836                 talloc_free(m);
4837                 return NULL;
4838         }
4839         m->count++;
4840         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4841
4842
4843         outdata.dptr = (uint8_t *)m;
4844         outdata.dsize = talloc_get_size(m);
4845
4846         handle = ctdb_control_send(ctdb, destnode, 0, 
4847                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4848                            mem_ctx, &timeout, NULL);
4849         talloc_free(m);
4850         return handle;
4851 }
4852
4853 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4854 {
4855         int ret;
4856         int32_t res;
4857
4858         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4859         if ( (ret != 0) || (res != 0) ){
4860                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4861                 return -1;
4862         }
4863
4864         return 0;
4865 }
4866
4867 int
4868 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4869 {
4870         struct ctdb_client_control_state *state;
4871
4872         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4873         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4874 }
4875
4876
4877
4878
4879
4880
4881 /*
4882   set a database to be readonly
4883  */
4884 struct ctdb_client_control_state *
4885 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4886 {
4887         TDB_DATA data;
4888
4889         data.dptr = (uint8_t *)&dbid;
4890         data.dsize = sizeof(dbid);
4891
4892         return ctdb_control_send(ctdb, destnode, 0, 
4893                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4894                            ctdb, NULL, NULL);
4895 }
4896
4897 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4898 {
4899         int ret;
4900         int32_t res;
4901
4902         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4903         if (ret != 0 || res != 0) {
4904           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4905                 return -1;
4906         }
4907
4908         return 0;
4909 }
4910
4911 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4912 {
4913         struct ctdb_client_control_state *state;
4914
4915         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4916         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4917 }
4918
4919 /*
4920   set a database to be sticky
4921  */
4922 struct ctdb_client_control_state *
4923 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4924 {
4925         TDB_DATA data;
4926
4927         data.dptr = (uint8_t *)&dbid;
4928         data.dsize = sizeof(dbid);
4929
4930         return ctdb_control_send(ctdb, destnode, 0, 
4931                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4932                            ctdb, NULL, NULL);
4933 }
4934
4935 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4936 {
4937         int ret;
4938         int32_t res;
4939
4940         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4941         if (ret != 0 || res != 0) {
4942                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4943                 return -1;
4944         }
4945
4946         return 0;
4947 }
4948
4949 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4950 {
4951         struct ctdb_client_control_state *state;
4952
4953         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4954         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4955 }