client: Add functions to parse g_lock.tdb records
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   (long unsigned int)num*sizeof(uint8_t),
538                                   outdata.dsize));
539                 talloc_free(outdata.dptr);
540                 return -1;
541         }
542
543         for (i=0; i<num; i++) {
544                 result[i] = outdata.dptr[i];
545         }
546
547         talloc_free(outdata.dptr);
548         return 0;
549 }
550
551 /*
552   send a message - from client context
553  */
554 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
555                       uint64_t srvid, TDB_DATA data)
556 {
557         struct ctdb_req_message *r;
558         int len, res;
559
560         len = offsetof(struct ctdb_req_message, data) + data.dsize;
561         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
562                                len, struct ctdb_req_message);
563         CTDB_NO_MEMORY(ctdb, r);
564
565         r->hdr.destnode  = pnn;
566         r->srvid         = srvid;
567         r->datalen       = data.dsize;
568         memcpy(&r->data[0], data.dptr, data.dsize);
569         
570         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
571         talloc_free(r);
572         return res;
573 }
574
575
576 /*
577   cancel a ctdb_fetch_lock operation, releasing the lock
578  */
579 static int fetch_lock_destructor(struct ctdb_record_handle *h)
580 {
581         ctdb_ltdb_unlock(h->ctdb_db, h->key);
582         return 0;
583 }
584
585 /*
586   force the migration of a record to this node
587  */
588 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
589 {
590         struct ctdb_call call;
591         ZERO_STRUCT(call);
592         call.call_id = CTDB_NULL_FUNC;
593         call.key = key;
594         call.flags = CTDB_IMMEDIATE_MIGRATION;
595         return ctdb_call(ctdb_db, &call);
596 }
597
598 /*
599   try to fetch a readonly copy of a record
600  */
601 static int
602 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
603 {
604         int ret;
605
606         struct ctdb_call call;
607         ZERO_STRUCT(call);
608
609         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
610         call.call_data.dptr = NULL;
611         call.call_data.dsize = 0;
612         call.key = key;
613         call.flags = CTDB_WANT_READONLY;
614         ret = ctdb_call(ctdb_db, &call);
615
616         if (ret != 0) {
617                 return -1;
618         }
619         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
620                 return -1;
621         }
622
623         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
624         if (*hdr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 return -1;
627         }
628
629         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
630         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
631         if (data->dptr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 talloc_free(hdr);
634                 return -1;
635         }
636
637         return 0;
638 }
639
640 /*
641   get a lock on a record, and return the records data. Blocks until it gets the lock
642  */
643 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                                            TDB_DATA key, TDB_DATA *data)
645 {
646         int ret;
647         struct ctdb_record_handle *h;
648
649         /*
650           procedure is as follows:
651
652           1) get the chain lock. 
653           2) check if we are dmaster
654           3) if we are the dmaster then return handle 
655           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
656              reply from ctdbd
657           5) when we get the reply, goto (1)
658          */
659
660         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
661         if (h == NULL) {
662                 return NULL;
663         }
664
665         h->ctdb_db = ctdb_db;
666         h->key     = key;
667         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
668         if (h->key.dptr == NULL) {
669                 talloc_free(h);
670                 return NULL;
671         }
672         h->data    = data;
673
674         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
675                  (const char *)key.dptr));
676
677 again:
678         /* step 1 - get the chain lock */
679         ret = ctdb_ltdb_lock(ctdb_db, key);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
682                 talloc_free(h);
683                 return NULL;
684         }
685
686         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
687
688         talloc_set_destructor(h, fetch_lock_destructor);
689
690         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
691
692         /* when torturing, ensure we test the remote path */
693         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
694             random() % 5 == 0) {
695                 h->header.dmaster = (uint32_t)-1;
696         }
697
698
699         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
700
701         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
702                 ctdb_ltdb_unlock(ctdb_db, key);
703                 ret = ctdb_client_force_migration(ctdb_db, key);
704                 if (ret != 0) {
705                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
706                         talloc_free(h);
707                         return NULL;
708                 }
709                 goto again;
710         }
711
712         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
713         return h;
714 }
715
716 /*
717   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
718  */
719 struct ctdb_record_handle *
720 ctdb_fetch_readonly_lock(
721         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
722         TDB_DATA key, TDB_DATA *data,
723         int read_only)
724 {
725         int ret;
726         struct ctdb_record_handle *h;
727         struct ctdb_ltdb_header *roheader = NULL;
728
729         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
730         if (h == NULL) {
731                 return NULL;
732         }
733
734         h->ctdb_db = ctdb_db;
735         h->key     = key;
736         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
737         if (h->key.dptr == NULL) {
738                 talloc_free(h);
739                 return NULL;
740         }
741         h->data    = data;
742
743         data->dptr = NULL;
744         data->dsize = 0;
745
746
747 again:
748         talloc_free(roheader);
749         roheader = NULL;
750
751         talloc_free(data->dptr);
752         data->dptr = NULL;
753         data->dsize = 0;
754
755         /* Lock the record/chain */
756         ret = ctdb_ltdb_lock(ctdb_db, key);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
759                 talloc_free(h);
760                 return NULL;
761         }
762
763         talloc_set_destructor(h, fetch_lock_destructor);
764
765         /* Check if record exists yet in the TDB */
766         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
767         if (ret != 0) {
768                 ctdb_ltdb_unlock(ctdb_db, key);
769                 ret = ctdb_client_force_migration(ctdb_db, key);
770                 if (ret != 0) {
771                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
772                         talloc_free(h);
773                         return NULL;
774                 }
775                 goto again;
776         }
777
778         /* if this is a request for read/write and we have delegations
779            we have to revoke all delegations first
780         */
781         if ((read_only == 0) 
782         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
783         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_force_migration(ctdb_db, key);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
788                         talloc_free(h);
789                         return NULL;
790                 }
791                 goto again;
792         }
793
794         /* if we are dmaster, just return the handle */
795         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
796                 return h;
797         }
798
799         if (read_only != 0) {
800                 TDB_DATA rodata = {NULL, 0};
801
802                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
803                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
804                         return h;
805                 }
806
807                 ctdb_ltdb_unlock(ctdb_db, key);
808                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
809                 if (ret != 0) {
810                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
811                         ret = ctdb_client_force_migration(ctdb_db, key);
812                         if (ret != 0) {
813                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
814                                 talloc_free(h);
815                                 return NULL;
816                         }
817
818                         goto again;
819                 }
820
821                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
822                         ret = ctdb_client_force_migration(ctdb_db, key);
823                         if (ret != 0) {
824                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
825                                 talloc_free(h);
826                                 return NULL;
827                         }
828
829                         goto again;
830                 }
831
832                 ret = ctdb_ltdb_lock(ctdb_db, key);
833                 if (ret != 0) {
834                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
835                         talloc_free(h);
836                         return NULL;
837                 }
838
839                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
840                 if (ret != 0) {
841                         ctdb_ltdb_unlock(ctdb_db, key);
842
843                         ret = ctdb_client_force_migration(ctdb_db, key);
844                         if (ret != 0) {
845                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
846                                 talloc_free(h);
847                                 return NULL;
848                         }
849
850                         goto again;
851                 }
852
853                 return h;
854         }
855
856         /* we are not dmaster and this was not a request for a readonly lock
857          * so unlock the record, migrate it and try again
858          */
859         ctdb_ltdb_unlock(ctdb_db, key);
860         ret = ctdb_client_force_migration(ctdb_db, key);
861         if (ret != 0) {
862                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
863                 talloc_free(h);
864                 return NULL;
865         }
866         goto again;
867 }
868
869 /*
870   store some data to the record that was locked with ctdb_fetch_lock()
871 */
872 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
873 {
874         if (h->ctdb_db->persistent) {
875                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
876                 return -1;
877         }
878
879         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
880 }
881
882 /*
883   non-locking fetch of a record
884  */
885 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
886                TDB_DATA key, TDB_DATA *data)
887 {
888         struct ctdb_call call;
889         int ret;
890
891         call.call_id = CTDB_FETCH_FUNC;
892         call.call_data.dptr = NULL;
893         call.call_data.dsize = 0;
894         call.key = key;
895
896         ret = ctdb_call(ctdb_db, &call);
897
898         if (ret == 0) {
899                 *data = call.reply_data;
900                 talloc_steal(mem_ctx, data->dptr);
901         }
902
903         return ret;
904 }
905
906
907
908 /*
909    called when a control completes or timesout to invoke the callback
910    function the user provided
911 */
912 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
913         struct timeval t, void *private_data)
914 {
915         struct ctdb_client_control_state *state;
916         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
917         int ret;
918
919         state = talloc_get_type(private_data, struct ctdb_client_control_state);
920         talloc_steal(tmp_ctx, state);
921
922         ret = ctdb_control_recv(state->ctdb, state, state,
923                         NULL, 
924                         NULL, 
925                         NULL);
926         if (ret != 0) {
927                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
928         }
929
930         talloc_free(tmp_ctx);
931 }
932
933 /*
934   called when a CTDB_REPLY_CONTROL packet comes in in the client
935
936   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
937   contains any reply data from the control
938 */
939 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
940                                       struct ctdb_req_header *hdr)
941 {
942         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
943         struct ctdb_client_control_state *state;
944
945         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
946         if (state == NULL) {
947                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
948                 return;
949         }
950
951         if (hdr->reqid != state->reqid) {
952                 /* we found a record  but it was the wrong one */
953                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
954                 return;
955         }
956
957         state->outdata.dptr = c->data;
958         state->outdata.dsize = c->datalen;
959         state->status = c->status;
960         if (c->errorlen) {
961                 state->errormsg = talloc_strndup(state, 
962                                                  (char *)&c->data[c->datalen], 
963                                                  c->errorlen);
964         }
965
966         /* state->outdata now uses resources from c so we dont want c
967            to just dissappear from under us while state is still alive
968         */
969         talloc_steal(state, c);
970
971         state->state = CTDB_CONTROL_DONE;
972
973         /* if we had a callback registered for this control, pull the response
974            and call the callback.
975         */
976         if (state->async.fn) {
977                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
978         }
979 }
980
981
982 /*
983   destroy a ctdb_control in client
984 */
985 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
986 {
987         ctdb_reqid_remove(state->ctdb, state->reqid);
988         return 0;
989 }
990
991
992 /* time out handler for ctdb_control */
993 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
994         struct timeval t, void *private_data)
995 {
996         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
997
998         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
999                          "dstnode:%u\n", state->reqid, state->c->opcode,
1000                          state->c->hdr.destnode));
1001
1002         state->state = CTDB_CONTROL_TIMEOUT;
1003
1004         /* if we had a callback registered for this control, pull the response
1005            and call the callback.
1006         */
1007         if (state->async.fn) {
1008                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1009         }
1010 }
1011
1012 /* async version of send control request */
1013 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1014                 uint32_t destnode, uint64_t srvid, 
1015                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1016                 TALLOC_CTX *mem_ctx,
1017                 struct timeval *timeout,
1018                 char **errormsg)
1019 {
1020         struct ctdb_client_control_state *state;
1021         size_t len;
1022         struct ctdb_req_control *c;
1023         int ret;
1024
1025         if (errormsg) {
1026                 *errormsg = NULL;
1027         }
1028
1029         /* if the domain socket is not yet open, open it */
1030         if (ctdb->daemon.sd==-1) {
1031                 ctdb_socket_connect(ctdb);
1032         }
1033
1034         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1035         CTDB_NO_MEMORY_NULL(ctdb, state);
1036
1037         state->ctdb       = ctdb;
1038         state->reqid      = ctdb_reqid_new(ctdb, state);
1039         state->state      = CTDB_CONTROL_WAIT;
1040         state->errormsg   = NULL;
1041
1042         talloc_set_destructor(state, ctdb_client_control_destructor);
1043
1044         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1045         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1046                                len, struct ctdb_req_control);
1047         state->c            = c;        
1048         CTDB_NO_MEMORY_NULL(ctdb, c);
1049         c->hdr.reqid        = state->reqid;
1050         c->hdr.destnode     = destnode;
1051         c->opcode           = opcode;
1052         c->client_id        = 0;
1053         c->flags            = flags;
1054         c->srvid            = srvid;
1055         c->datalen          = data.dsize;
1056         if (data.dsize) {
1057                 memcpy(&c->data[0], data.dptr, data.dsize);
1058         }
1059
1060         /* timeout */
1061         if (timeout && !timeval_is_zero(timeout)) {
1062                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1063         }
1064
1065         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1066         if (ret != 0) {
1067                 talloc_free(state);
1068                 return NULL;
1069         }
1070
1071         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1072                 talloc_free(state);
1073                 return NULL;
1074         }
1075
1076         return state;
1077 }
1078
1079
1080 /* async version of receive control reply */
1081 int ctdb_control_recv(struct ctdb_context *ctdb, 
1082                 struct ctdb_client_control_state *state, 
1083                 TALLOC_CTX *mem_ctx,
1084                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1085 {
1086         TALLOC_CTX *tmp_ctx;
1087
1088         if (status != NULL) {
1089                 *status = -1;
1090         }
1091         if (errormsg != NULL) {
1092                 *errormsg = NULL;
1093         }
1094
1095         if (state == NULL) {
1096                 return -1;
1097         }
1098
1099         /* prevent double free of state */
1100         tmp_ctx = talloc_new(ctdb);
1101         talloc_steal(tmp_ctx, state);
1102
1103         /* loop one event at a time until we either timeout or the control
1104            completes.
1105         */
1106         while (state->state == CTDB_CONTROL_WAIT) {
1107                 event_loop_once(ctdb->ev);
1108         }
1109
1110         if (state->state != CTDB_CONTROL_DONE) {
1111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1112                 if (state->async.fn) {
1113                         state->async.fn(state);
1114                 }
1115                 talloc_free(tmp_ctx);
1116                 return -1;
1117         }
1118
1119         if (state->errormsg) {
1120                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1121                 if (errormsg) {
1122                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1123                 }
1124                 if (state->async.fn) {
1125                         state->async.fn(state);
1126                 }
1127                 talloc_free(tmp_ctx);
1128                 return -1;
1129         }
1130
1131         if (outdata) {
1132                 *outdata = state->outdata;
1133                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1134         }
1135
1136         if (status) {
1137                 *status = state->status;
1138         }
1139
1140         if (state->async.fn) {
1141                 state->async.fn(state);
1142         }
1143
1144         talloc_free(tmp_ctx);
1145         return 0;
1146 }
1147
1148
1149
1150 /*
1151   send a ctdb control message
1152   timeout specifies how long we should wait for a reply.
1153   if timeout is NULL we wait indefinitely
1154  */
1155 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1156                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1157                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1158                  struct timeval *timeout,
1159                  char **errormsg)
1160 {
1161         struct ctdb_client_control_state *state;
1162
1163         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1164                         flags, data, mem_ctx,
1165                         timeout, errormsg);
1166
1167         /* FIXME: Error conditions in ctdb_control_send return NULL without
1168          * setting errormsg.  So, there is no way to distinguish between sucess
1169          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1170         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1171                 if (status != NULL) {
1172                         *status = 0;
1173                 }
1174                 return 0;
1175         }
1176
1177         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1178                         errormsg);
1179 }
1180
1181
1182
1183
1184 /*
1185   a process exists call. Returns 0 if process exists, -1 otherwise
1186  */
1187 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1188 {
1189         int ret;
1190         TDB_DATA data;
1191         int32_t status;
1192
1193         data.dptr = (uint8_t*)&pid;
1194         data.dsize = sizeof(pid);
1195
1196         ret = ctdb_control(ctdb, destnode, 0, 
1197                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1198                            NULL, NULL, &status, NULL, NULL);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1201                 return -1;
1202         }
1203
1204         return status;
1205 }
1206
1207 /*
1208   get remote statistics
1209  */
1210 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1211 {
1212         int ret;
1213         TDB_DATA data;
1214         int32_t res;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1218                            ctdb, &data, &res, NULL, NULL);
1219         if (ret != 0 || res != 0) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1221                 return -1;
1222         }
1223
1224         if (data.dsize != sizeof(struct ctdb_statistics)) {
1225                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1226                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1227                       return -1;
1228         }
1229
1230         *status = *(struct ctdb_statistics *)data.dptr;
1231         talloc_free(data.dptr);
1232                         
1233         return 0;
1234 }
1235
1236 /*
1237  * get db statistics
1238  */
1239 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1240                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1241 {
1242         int ret;
1243         TDB_DATA indata, outdata;
1244         int32_t res;
1245         struct ctdb_db_statistics *wire, *s;
1246         char *ptr;
1247         int i;
1248
1249         indata.dptr = (uint8_t *)&dbid;
1250         indata.dsize = sizeof(dbid);
1251
1252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1253                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1254         if (ret != 0 || res != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1256                 return -1;
1257         }
1258
1259         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1260                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1261                                  outdata.dsize,
1262                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1263                 return -1;
1264         }
1265
1266         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1267         if (s == NULL) {
1268                 talloc_free(outdata.dptr);
1269                 CTDB_NO_MEMORY(ctdb, s);
1270         }
1271
1272         wire = (struct ctdb_db_statistics *)outdata.dptr;
1273         *s = *wire;
1274         ptr = &wire->hot_keys_wire[0];
1275         for (i=0; i<wire->num_hot_keys; i++) {
1276                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1277                 if (s->hot_keys[i].key.dptr == NULL) {
1278                         talloc_free(outdata.dptr);
1279                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1280                 }
1281
1282                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1283                 ptr += wire->hot_keys[i].key.dsize;
1284         }
1285
1286         talloc_free(outdata.dptr);
1287         *dbstat = s;
1288         return 0;
1289 }
1290
1291 /*
1292   shutdown a remote ctdb node
1293  */
1294 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1295 {
1296         struct ctdb_client_control_state *state;
1297
1298         state = ctdb_control_send(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1300                            NULL, &timeout, NULL);
1301         if (state == NULL) {
1302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1303                 return -1;
1304         }
1305
1306         return 0;
1307 }
1308
1309 /*
1310   get vnn map from a remote node
1311  */
1312 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1313 {
1314         int ret;
1315         TDB_DATA outdata;
1316         int32_t res;
1317         struct ctdb_vnn_map_wire *map;
1318
1319         ret = ctdb_control(ctdb, destnode, 0, 
1320                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1321                            mem_ctx, &outdata, &res, &timeout, NULL);
1322         if (ret != 0 || res != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1324                 return -1;
1325         }
1326         
1327         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1328         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1329             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1330                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1331                 return -1;
1332         }
1333
1334         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1335         CTDB_NO_MEMORY(ctdb, *vnnmap);
1336         (*vnnmap)->generation = map->generation;
1337         (*vnnmap)->size       = map->size;
1338         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1339
1340         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1341         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1342         talloc_free(outdata.dptr);
1343                     
1344         return 0;
1345 }
1346
1347
1348 /*
1349   get the recovery mode of a remote node
1350  */
1351 struct ctdb_client_control_state *
1352 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_control_send(ctdb, destnode, 0, 
1355                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1356                            mem_ctx, &timeout, NULL);
1357 }
1358
1359 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1360 {
1361         int ret;
1362         int32_t res;
1363
1364         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1367                 return -1;
1368         }
1369
1370         if (recmode) {
1371                 *recmode = (uint32_t)res;
1372         }
1373
1374         return 0;
1375 }
1376
1377 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1378 {
1379         struct ctdb_client_control_state *state;
1380
1381         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1382         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1383 }
1384
1385
1386
1387
1388 /*
1389   set the recovery mode of a remote node
1390  */
1391 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1392 {
1393         int ret;
1394         TDB_DATA data;
1395         int32_t res;
1396
1397         data.dsize = sizeof(uint32_t);
1398         data.dptr = (unsigned char *)&recmode;
1399
1400         ret = ctdb_control(ctdb, destnode, 0, 
1401                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1402                            NULL, NULL, &res, &timeout, NULL);
1403         if (ret != 0 || res != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1405                 return -1;
1406         }
1407
1408         return 0;
1409 }
1410
1411
1412
1413 /*
1414   get the recovery master of a remote node
1415  */
1416 struct ctdb_client_control_state *
1417 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1418                         struct timeval timeout, uint32_t destnode)
1419 {
1420         return ctdb_control_send(ctdb, destnode, 0, 
1421                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1422                            mem_ctx, &timeout, NULL);
1423 }
1424
1425 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1431         if (ret != 0) {
1432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1433                 return -1;
1434         }
1435
1436         if (recmaster) {
1437                 *recmaster = (uint32_t)res;
1438         }
1439
1440         return 0;
1441 }
1442
1443 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1444 {
1445         struct ctdb_client_control_state *state;
1446
1447         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1448         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1449 }
1450
1451
1452 /*
1453   set the recovery master of a remote node
1454  */
1455 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1456 {
1457         int ret;
1458         TDB_DATA data;
1459         int32_t res;
1460
1461         ZERO_STRUCT(data);
1462         data.dsize = sizeof(uint32_t);
1463         data.dptr = (unsigned char *)&recmaster;
1464
1465         ret = ctdb_control(ctdb, destnode, 0, 
1466                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1467                            NULL, NULL, &res, &timeout, NULL);
1468         if (ret != 0 || res != 0) {
1469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1470                 return -1;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   get a list of databases off a remote node
1479  */
1480 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1481                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1482 {
1483         int ret;
1484         TDB_DATA outdata;
1485         int32_t res;
1486
1487         ret = ctdb_control(ctdb, destnode, 0, 
1488                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1489                            mem_ctx, &outdata, &res, &timeout, NULL);
1490         if (ret != 0 || res != 0) {
1491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1492                 return -1;
1493         }
1494
1495         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1496         talloc_free(outdata.dptr);
1497                     
1498         return 0;
1499 }
1500
1501 /*
1502   get a list of nodes (vnn and flags ) from a remote node
1503  */
1504 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1505                 struct timeval timeout, uint32_t destnode, 
1506                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1507 {
1508         int ret;
1509         TDB_DATA outdata;
1510         int32_t res;
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1514                            mem_ctx, &outdata, &res, &timeout, NULL);
1515         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1516                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1517                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1518         }
1519         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1521                 return -1;
1522         }
1523
1524         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1525         talloc_free(outdata.dptr);
1526                     
1527         return 0;
1528 }
1529
1530 /*
1531   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1532  */
1533 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1534                 struct timeval timeout, uint32_t destnode, 
1535                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1536 {
1537         int ret, i, len;
1538         TDB_DATA outdata;
1539         struct ctdb_node_mapv4 *nodemapv4;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1551
1552         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1553         (*nodemap) = talloc_zero_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, (*nodemap));
1555
1556         (*nodemap)->num = nodemapv4->num;
1557         for (i=0; i<nodemapv4->num; i++) {
1558                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1559                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1560                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1561                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1562         }
1563                 
1564         talloc_free(outdata.dptr);
1565                     
1566         return 0;
1567 }
1568
1569 /*
1570   drop the transport, reload the nodes file and restart the transport
1571  */
1572 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1573                     struct timeval timeout, uint32_t destnode)
1574 {
1575         int ret;
1576         int32_t res;
1577
1578         ret = ctdb_control(ctdb, destnode, 0, 
1579                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1580                            NULL, NULL, &res, &timeout, NULL);
1581         if (ret != 0 || res != 0) {
1582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1583                 return -1;
1584         }
1585
1586         return 0;
1587 }
1588
1589
1590 /*
1591   set vnn map on a node
1592  */
1593 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1594                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1595 {
1596         int ret;
1597         TDB_DATA data;
1598         int32_t res;
1599         struct ctdb_vnn_map_wire *map;
1600         size_t len;
1601
1602         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1603         map = talloc_size(mem_ctx, len);
1604         CTDB_NO_MEMORY(ctdb, map);
1605
1606         map->generation = vnnmap->generation;
1607         map->size = vnnmap->size;
1608         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1609         
1610         data.dsize = len;
1611         data.dptr  = (uint8_t *)map;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1615                            NULL, NULL, &res, &timeout, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1618                 return -1;
1619         }
1620
1621         talloc_free(map);
1622
1623         return 0;
1624 }
1625
1626
1627 /*
1628   async send for pull database
1629  */
1630 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1631         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1632         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1633 {
1634         TDB_DATA indata;
1635         struct ctdb_control_pulldb *pull;
1636         struct ctdb_client_control_state *state;
1637
1638         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1639         CTDB_NO_MEMORY_NULL(ctdb, pull);
1640
1641         pull->db_id   = dbid;
1642         pull->lmaster = lmaster;
1643
1644         indata.dsize = sizeof(struct ctdb_control_pulldb);
1645         indata.dptr  = (unsigned char *)pull;
1646
1647         state = ctdb_control_send(ctdb, destnode, 0, 
1648                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1649                                   mem_ctx, &timeout, NULL);
1650         talloc_free(pull);
1651
1652         return state;
1653 }
1654
1655 /*
1656   async recv for pull database
1657  */
1658 int ctdb_ctrl_pulldb_recv(
1659         struct ctdb_context *ctdb, 
1660         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1661         TDB_DATA *outdata)
1662 {
1663         int ret;
1664         int32_t res;
1665
1666         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1667         if ( (ret != 0) || (res != 0) ){
1668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1669                 return -1;
1670         }
1671
1672         return 0;
1673 }
1674
1675 /*
1676   pull all keys and records for a specific database on a node
1677  */
1678 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1679                 uint32_t dbid, uint32_t lmaster, 
1680                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1681                 TDB_DATA *outdata)
1682 {
1683         struct ctdb_client_control_state *state;
1684
1685         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1686                                       timeout);
1687         
1688         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1689 }
1690
1691
1692 /*
1693   change dmaster for all keys in the database to the new value
1694  */
1695 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1696                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1697 {
1698         int ret;
1699         TDB_DATA indata;
1700         int32_t res;
1701
1702         indata.dsize = 2*sizeof(uint32_t);
1703         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1704
1705         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1706         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1710                            NULL, NULL, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1713                 return -1;
1714         }
1715
1716         return 0;
1717 }
1718
1719 /*
1720   ping a node, return number of clients connected
1721  */
1722 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1723 {
1724         int ret;
1725         int32_t res;
1726
1727         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1728                            tdb_null, NULL, NULL, &res, NULL, NULL);
1729         if (ret != 0) {
1730                 return -1;
1731         }
1732         return res;
1733 }
1734
1735 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1736                            struct timeval timeout, 
1737                            uint32_t destnode,
1738                            uint32_t *runstate)
1739 {
1740         TDB_DATA outdata;
1741         int32_t res;
1742         int ret;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1745                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1746         if (ret != 0 || res != 0) {
1747                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1748                 return ret != 0 ? ret : res;
1749         }
1750
1751         if (outdata.dsize != sizeof(uint32_t)) {
1752                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1753                 talloc_free(outdata.dptr);
1754                 return -1;
1755         }
1756
1757         if (runstate != NULL) {
1758                 *runstate = *(uint32_t *)outdata.dptr;
1759         }
1760         talloc_free(outdata.dptr);
1761
1762         return 0;
1763 }
1764
1765 /*
1766   find the real path to a ltdb 
1767  */
1768 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1769                    const char **path)
1770 {
1771         int ret;
1772         int32_t res;
1773         TDB_DATA data;
1774
1775         data.dptr = (uint8_t *)&dbid;
1776         data.dsize = sizeof(dbid);
1777
1778         ret = ctdb_control(ctdb, destnode, 0, 
1779                            CTDB_CONTROL_GETDBPATH, 0, data, 
1780                            mem_ctx, &data, &res, &timeout, NULL);
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1786         if ((*path) == NULL) {
1787                 return -1;
1788         }
1789
1790         talloc_free(data.dptr);
1791
1792         return 0;
1793 }
1794
1795 /*
1796   find the name of a db 
1797  */
1798 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1799                    const char **name)
1800 {
1801         int ret;
1802         int32_t res;
1803         TDB_DATA data;
1804
1805         data.dptr = (uint8_t *)&dbid;
1806         data.dsize = sizeof(dbid);
1807
1808         ret = ctdb_control(ctdb, destnode, 0, 
1809                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1810                            mem_ctx, &data, &res, &timeout, NULL);
1811         if (ret != 0 || res != 0) {
1812                 return -1;
1813         }
1814
1815         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1816         if ((*name) == NULL) {
1817                 return -1;
1818         }
1819
1820         talloc_free(data.dptr);
1821
1822         return 0;
1823 }
1824
1825 /*
1826   get the health status of a db
1827  */
1828 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1829                           struct timeval timeout,
1830                           uint32_t destnode,
1831                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1832                           const char **reason)
1833 {
1834         int ret;
1835         int32_t res;
1836         TDB_DATA data;
1837
1838         data.dptr = (uint8_t *)&dbid;
1839         data.dsize = sizeof(dbid);
1840
1841         ret = ctdb_control(ctdb, destnode, 0,
1842                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1843                            mem_ctx, &data, &res, &timeout, NULL);
1844         if (ret != 0 || res != 0) {
1845                 return -1;
1846         }
1847
1848         if (data.dsize == 0) {
1849                 (*reason) = NULL;
1850                 return 0;
1851         }
1852
1853         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1854         if ((*reason) == NULL) {
1855                 return -1;
1856         }
1857
1858         talloc_free(data.dptr);
1859
1860         return 0;
1861 }
1862
1863 /*
1864  * get db sequence number
1865  */
1866 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1867                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1868 {
1869         int ret;
1870         int32_t res;
1871         TDB_DATA data, outdata;
1872
1873         data.dptr = (uint8_t *)&dbid;
1874         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1875
1876         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1877                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1878         if (ret != 0 || res != 0) {
1879                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1880                 return -1;
1881         }
1882
1883         if (outdata.dsize != sizeof(uint64_t)) {
1884                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1885                 talloc_free(outdata.dptr);
1886                 return -1;
1887         }
1888
1889         if (seqnum != NULL) {
1890                 *seqnum = *(uint64_t *)outdata.dptr;
1891         }
1892         talloc_free(outdata.dptr);
1893
1894         return 0;
1895 }
1896
1897 /*
1898   create a database
1899  */
1900 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1901                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1902 {
1903         int ret;
1904         int32_t res;
1905         TDB_DATA data;
1906         uint64_t tdb_flags = 0;
1907
1908         data.dptr = discard_const(name);
1909         data.dsize = strlen(name)+1;
1910
1911         /* Make sure that volatile databases use jenkins hash */
1912         if (!persistent) {
1913                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1914         }
1915
1916         ret = ctdb_control(ctdb, destnode, tdb_flags,
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1918                            0, data, 
1919                            mem_ctx, &data, &res, &timeout, NULL);
1920
1921         if (ret != 0 || res != 0) {
1922                 return -1;
1923         }
1924
1925         return 0;
1926 }
1927
1928 /*
1929   get debug level on a node
1930  */
1931 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1932 {
1933         int ret;
1934         int32_t res;
1935         TDB_DATA data;
1936
1937         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1938                            ctdb, &data, &res, NULL, NULL);
1939         if (ret != 0 || res != 0) {
1940                 return -1;
1941         }
1942         if (data.dsize != sizeof(int32_t)) {
1943                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1944                          (unsigned)data.dsize));
1945                 return -1;
1946         }
1947         *level = *(int32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949         return 0;
1950 }
1951
1952 /*
1953   set debug level on a node
1954  */
1955 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1956 {
1957         int ret;
1958         int32_t res;
1959         TDB_DATA data;
1960
1961         data.dptr = (uint8_t *)&level;
1962         data.dsize = sizeof(level);
1963
1964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1965                            NULL, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 return -1;
1968         }
1969         return 0;
1970 }
1971
1972
1973 /*
1974   get a list of connected nodes
1975  */
1976 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1977                                 struct timeval timeout,
1978                                 TALLOC_CTX *mem_ctx,
1979                                 uint32_t *num_nodes)
1980 {
1981         struct ctdb_node_map *map=NULL;
1982         int ret, i;
1983         uint32_t *nodes;
1984
1985         *num_nodes = 0;
1986
1987         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1988         if (ret != 0) {
1989                 return NULL;
1990         }
1991
1992         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1993         if (nodes == NULL) {
1994                 return NULL;
1995         }
1996
1997         for (i=0;i<map->num;i++) {
1998                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1999                         nodes[*num_nodes] = map->nodes[i].pnn;
2000                         (*num_nodes)++;
2001                 }
2002         }
2003
2004         return nodes;
2005 }
2006
2007
2008 /*
2009   reset remote status
2010  */
2011 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2012 {
2013         int ret;
2014         int32_t res;
2015
2016         ret = ctdb_control(ctdb, destnode, 0, 
2017                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2018                            NULL, NULL, &res, NULL, NULL);
2019         if (ret != 0 || res != 0) {
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2021                 return -1;
2022         }
2023         return 0;
2024 }
2025
2026 /*
2027   attach to a specific database - client call
2028 */
2029 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2030                                     struct timeval timeout,
2031                                     const char *name,
2032                                     bool persistent,
2033                                     uint32_t tdb_flags)
2034 {
2035         struct ctdb_db_context *ctdb_db;
2036         TDB_DATA data;
2037         int ret;
2038         int32_t res;
2039
2040         ctdb_db = ctdb_db_handle(ctdb, name);
2041         if (ctdb_db) {
2042                 return ctdb_db;
2043         }
2044
2045         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2046         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2047
2048         ctdb_db->ctdb = ctdb;
2049         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2050         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2051
2052         data.dptr = discard_const(name);
2053         data.dsize = strlen(name)+1;
2054
2055         /* CTDB has switched to using jenkins hash for volatile databases.
2056          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2057          * always set it.
2058          */
2059         if (!persistent) {
2060                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2061         }
2062
2063         /* tell ctdb daemon to attach */
2064         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2065                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2066                            0, data, ctdb_db, &data, &res, NULL, NULL);
2067         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2068                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2069                 talloc_free(ctdb_db);
2070                 return NULL;
2071         }
2072         
2073         ctdb_db->db_id = *(uint32_t *)data.dptr;
2074         talloc_free(data.dptr);
2075
2076         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2079                 talloc_free(ctdb_db);
2080                 return NULL;
2081         }
2082
2083         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2084         if (ctdb->valgrinding) {
2085                 tdb_flags |= TDB_NOMMAP;
2086         }
2087         tdb_flags |= TDB_DISALLOW_NESTING;
2088
2089         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2090         if (ctdb_db->ltdb == NULL) {
2091                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2092                 talloc_free(ctdb_db);
2093                 return NULL;
2094         }
2095
2096         ctdb_db->persistent = persistent;
2097
2098         DLIST_ADD(ctdb->db_list, ctdb_db);
2099
2100         /* add well known functions */
2101         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2102         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2103         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2104
2105         return ctdb_db;
2106 }
2107
2108
2109 /*
2110   setup a call for a database
2111  */
2112 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2113 {
2114         struct ctdb_registered_call *call;
2115
2116 #if 0
2117         TDB_DATA data;
2118         int32_t status;
2119         struct ctdb_control_set_call c;
2120         int ret;
2121
2122         /* this is no longer valid with the separate daemon architecture */
2123         c.db_id = ctdb_db->db_id;
2124         c.fn    = fn;
2125         c.id    = id;
2126
2127         data.dptr = (uint8_t *)&c;
2128         data.dsize = sizeof(c);
2129
2130         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2131                            data, NULL, NULL, &status, NULL, NULL);
2132         if (ret != 0 || status != 0) {
2133                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2134                 return -1;
2135         }
2136 #endif
2137
2138         /* also register locally */
2139         call = talloc(ctdb_db, struct ctdb_registered_call);
2140         call->fn = fn;
2141         call->id = id;
2142
2143         DLIST_ADD(ctdb_db->calls, call);        
2144         return 0;
2145 }
2146
2147
2148 struct traverse_state {
2149         bool done;
2150         uint32_t count;
2151         ctdb_traverse_func fn;
2152         void *private_data;
2153         bool listemptyrecords;
2154 };
2155
2156 /*
2157   called on each key during a ctdb_traverse
2158  */
2159 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2160 {
2161         struct traverse_state *state = (struct traverse_state *)p;
2162         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2163         TDB_DATA key;
2164
2165         if (data.dsize < sizeof(uint32_t) ||
2166             d->length != data.dsize) {
2167                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2168                 state->done = true;
2169                 return;
2170         }
2171
2172         key.dsize = d->keylen;
2173         key.dptr  = &d->data[0];
2174         data.dsize = d->datalen;
2175         data.dptr = &d->data[d->keylen];
2176
2177         if (key.dsize == 0 && data.dsize == 0) {
2178                 /* end of traverse */
2179                 state->done = true;
2180                 return;
2181         }
2182
2183         if (!state->listemptyrecords &&
2184             data.dsize == sizeof(struct ctdb_ltdb_header))
2185         {
2186                 /* empty records are deleted records in ctdb */
2187                 return;
2188         }
2189
2190         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2191                 state->done = true;
2192         }
2193
2194         state->count++;
2195 }
2196
2197 /**
2198  * start a cluster wide traverse, calling the supplied fn on each record
2199  * return the number of records traversed, or -1 on error
2200  *
2201  * Extendet variant with a flag to signal whether empty records should
2202  * be listed.
2203  */
2204 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2205                              ctdb_traverse_func fn,
2206                              bool withemptyrecords,
2207                              void *private_data)
2208 {
2209         TDB_DATA data;
2210         struct ctdb_traverse_start_ext t;
2211         int32_t status;
2212         int ret;
2213         uint64_t srvid = (getpid() | 0xFLL<<60);
2214         struct traverse_state state;
2215
2216         state.done = false;
2217         state.count = 0;
2218         state.private_data = private_data;
2219         state.fn = fn;
2220         state.listemptyrecords = withemptyrecords;
2221
2222         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2223         if (ret != 0) {
2224                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2225                 return -1;
2226         }
2227
2228         t.db_id = ctdb_db->db_id;
2229         t.srvid = srvid;
2230         t.reqid = 0;
2231         t.withemptyrecords = withemptyrecords;
2232
2233         data.dptr = (uint8_t *)&t;
2234         data.dsize = sizeof(t);
2235
2236         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2237                            data, NULL, NULL, &status, NULL, NULL);
2238         if (ret != 0 || status != 0) {
2239                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2240                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2241                 return -1;
2242         }
2243
2244         while (!state.done) {
2245                 event_loop_once(ctdb_db->ctdb->ev);
2246         }
2247
2248         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2251                 return -1;
2252         }
2253
2254         return state.count;
2255 }
2256
2257 /**
2258  * start a cluster wide traverse, calling the supplied fn on each record
2259  * return the number of records traversed, or -1 on error
2260  *
2261  * Standard version which does not list the empty records:
2262  * These are considered deleted.
2263  */
2264 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2265 {
2266         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2267 }
2268
2269 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2270 /*
2271   called on each key during a catdb
2272  */
2273 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2274 {
2275         int i;
2276         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2277         FILE *f = c->f;
2278         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2279
2280         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2281         for (i=0;i<key.dsize;i++) {
2282                 if (ISASCII(key.dptr[i])) {
2283                         fprintf(f, "%c", key.dptr[i]);
2284                 } else {
2285                         fprintf(f, "\\%02X", key.dptr[i]);
2286                 }
2287         }
2288         fprintf(f, "\"\n");
2289
2290         fprintf(f, "dmaster: %u\n", h->dmaster);
2291         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2292
2293         if (c->printlmaster && ctdb->vnn_map != NULL) {
2294                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2295         }
2296
2297         if (c->printhash) {
2298                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2299         }
2300
2301         if (c->printrecordflags) {
2302                 fprintf(f, "flags: 0x%08x", h->flags);
2303                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2304                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2305                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2306                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2307                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2308                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2309                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2310                 fprintf(f, "\n");
2311         }
2312
2313         if (c->printdatasize) {
2314                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2315         } else {
2316                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2317                 for (i=sizeof(*h);i<data.dsize;i++) {
2318                         if (ISASCII(data.dptr[i])) {
2319                                 fprintf(f, "%c", data.dptr[i]);
2320                         } else {
2321                                 fprintf(f, "\\%02X", data.dptr[i]);
2322                         }
2323                 }
2324                 fprintf(f, "\"\n");
2325         }
2326
2327         fprintf(f, "\n");
2328
2329         return 0;
2330 }
2331
2332 /*
2333   convenience function to list all keys to stdout
2334  */
2335 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2336                  struct ctdb_dump_db_context *ctx)
2337 {
2338         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2339                                  ctx->printemptyrecords, ctx);
2340 }
2341
2342 /*
2343   get the pid of a ctdb daemon
2344  */
2345 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2346 {
2347         int ret;
2348         int32_t res;
2349
2350         ret = ctdb_control(ctdb, destnode, 0, 
2351                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2352                            NULL, NULL, &res, &timeout, NULL);
2353         if (ret != 0) {
2354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2355                 return -1;
2356         }
2357
2358         *pid = res;
2359
2360         return 0;
2361 }
2362
2363
2364 /*
2365   async freeze send control
2366  */
2367 struct ctdb_client_control_state *
2368 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2369 {
2370         return ctdb_control_send(ctdb, destnode, priority, 
2371                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2372                            mem_ctx, &timeout, NULL);
2373 }
2374
2375 /* 
2376    async freeze recv control
2377 */
2378 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2384         if ( (ret != 0) || (res != 0) ){
2385                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2386                 return -1;
2387         }
2388
2389         return 0;
2390 }
2391
2392 /*
2393   freeze databases of a certain priority
2394  */
2395 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2396 {
2397         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2398         struct ctdb_client_control_state *state;
2399         int ret;
2400
2401         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2402         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2403         talloc_free(tmp_ctx);
2404
2405         return ret;
2406 }
2407
2408 /* Freeze all databases */
2409 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2410 {
2411         int i;
2412
2413         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2414                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2415                         return -1;
2416                 }
2417         }
2418         return 0;
2419 }
2420
2421 /*
2422   thaw databases of a certain priority
2423  */
2424 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2425 {
2426         int ret;
2427         int32_t res;
2428
2429         ret = ctdb_control(ctdb, destnode, priority, 
2430                            CTDB_CONTROL_THAW, 0, tdb_null, 
2431                            NULL, NULL, &res, &timeout, NULL);
2432         if (ret != 0 || res != 0) {
2433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2434                 return -1;
2435         }
2436
2437         return 0;
2438 }
2439
2440 /* thaw all databases */
2441 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2442 {
2443         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2444 }
2445
2446 /*
2447   get pnn of a node, or -1
2448  */
2449 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2450 {
2451         int ret;
2452         int32_t res;
2453
2454         ret = ctdb_control(ctdb, destnode, 0, 
2455                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2456                            NULL, NULL, &res, &timeout, NULL);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2459                 return -1;
2460         }
2461
2462         return res;
2463 }
2464
2465 /*
2466   get the monitoring mode of a remote node
2467  */
2468 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2469 {
2470         int ret;
2471         int32_t res;
2472
2473         ret = ctdb_control(ctdb, destnode, 0, 
2474                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2475                            NULL, NULL, &res, &timeout, NULL);
2476         if (ret != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2478                 return -1;
2479         }
2480
2481         *monmode = res;
2482
2483         return 0;
2484 }
2485
2486
2487 /*
2488  set the monitoring mode of a remote node to active
2489  */
2490 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2491 {
2492         int ret;
2493         
2494
2495         ret = ctdb_control(ctdb, destnode, 0, 
2496                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2497                            NULL, NULL,NULL, &timeout, NULL);
2498         if (ret != 0) {
2499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2500                 return -1;
2501         }
2502
2503         
2504
2505         return 0;
2506 }
2507
2508 /*
2509   set the monitoring mode of a remote node to disable
2510  */
2511 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2512 {
2513         int ret;
2514         
2515
2516         ret = ctdb_control(ctdb, destnode, 0, 
2517                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2518                            NULL, NULL, NULL, &timeout, NULL);
2519         if (ret != 0) {
2520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2521                 return -1;
2522         }
2523
2524         
2525
2526         return 0;
2527 }
2528
2529
2530
2531 /* 
2532   sent to a node to make it take over an ip address
2533 */
2534 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2535                           uint32_t destnode, struct ctdb_public_ip *ip)
2536 {
2537         TDB_DATA data;
2538         struct ctdb_public_ipv4 ipv4;
2539         int ret;
2540         int32_t res;
2541
2542         if (ip->addr.sa.sa_family == AF_INET) {
2543                 ipv4.pnn = ip->pnn;
2544                 ipv4.sin = ip->addr.ip;
2545
2546                 data.dsize = sizeof(ipv4);
2547                 data.dptr  = (uint8_t *)&ipv4;
2548
2549                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2550                            NULL, &res, &timeout, NULL);
2551         } else {
2552                 data.dsize = sizeof(*ip);
2553                 data.dptr  = (uint8_t *)ip;
2554
2555                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2556                            NULL, &res, &timeout, NULL);
2557         }
2558
2559         if (ret != 0 || res != 0) {
2560                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2561                 return -1;
2562         }
2563
2564         return 0;       
2565 }
2566
2567
2568 /* 
2569   sent to a node to make it release an ip address
2570 */
2571 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2572                          uint32_t destnode, struct ctdb_public_ip *ip)
2573 {
2574         TDB_DATA data;
2575         struct ctdb_public_ipv4 ipv4;
2576         int ret;
2577         int32_t res;
2578
2579         if (ip->addr.sa.sa_family == AF_INET) {
2580                 ipv4.pnn = ip->pnn;
2581                 ipv4.sin = ip->addr.ip;
2582
2583                 data.dsize = sizeof(ipv4);
2584                 data.dptr  = (uint8_t *)&ipv4;
2585
2586                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2587                                    NULL, &res, &timeout, NULL);
2588         } else {
2589                 data.dsize = sizeof(*ip);
2590                 data.dptr  = (uint8_t *)ip;
2591
2592                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2593                                    NULL, &res, &timeout, NULL);
2594         }
2595
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;       
2602 }
2603
2604
2605 /*
2606   get a tunable
2607  */
2608 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2609                           struct timeval timeout, 
2610                           uint32_t destnode,
2611                           const char *name, uint32_t *value)
2612 {
2613         struct ctdb_control_get_tunable *t;
2614         TDB_DATA data, outdata;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2619         data.dptr  = talloc_size(ctdb, data.dsize);
2620         CTDB_NO_MEMORY(ctdb, data.dptr);
2621
2622         t = (struct ctdb_control_get_tunable *)data.dptr;
2623         t->length = strlen(name)+1;
2624         memcpy(t->name, name, t->length);
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         talloc_free(data.dptr);
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2631                 return ret != 0 ? ret : res;
2632         }
2633
2634         if (outdata.dsize != sizeof(uint32_t)) {
2635                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2636                 talloc_free(outdata.dptr);
2637                 return -1;
2638         }
2639         
2640         *value = *(uint32_t *)outdata.dptr;
2641         talloc_free(outdata.dptr);
2642
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2650                           struct timeval timeout, 
2651                           uint32_t destnode,
2652                           const char *name, uint32_t value)
2653 {
2654         struct ctdb_control_set_tunable *t;
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2660         data.dptr  = talloc_size(ctdb, data.dsize);
2661         CTDB_NO_MEMORY(ctdb, data.dptr);
2662
2663         t = (struct ctdb_control_set_tunable *)data.dptr;
2664         t->length = strlen(name)+1;
2665         memcpy(t->name, name, t->length);
2666         t->value = value;
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2669                            NULL, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2673                 return -1;
2674         }
2675
2676         return 0;
2677 }
2678
2679 /*
2680   list tunables
2681  */
2682 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2683                             struct timeval timeout, 
2684                             uint32_t destnode,
2685                             TALLOC_CTX *mem_ctx,
2686                             const char ***list, uint32_t *count)
2687 {
2688         TDB_DATA outdata;
2689         int32_t res;
2690         int ret;
2691         struct ctdb_control_list_tunable *t;
2692         char *p, *s, *ptr;
2693
2694         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2698                 return -1;
2699         }
2700
2701         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2702         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2703             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2704                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2705                 talloc_free(outdata.dptr);
2706                 return -1;              
2707         }
2708         
2709         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2710         CTDB_NO_MEMORY(ctdb, p);
2711
2712         talloc_free(outdata.dptr);
2713         
2714         (*list) = NULL;
2715         (*count) = 0;
2716
2717         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2718                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2719                 CTDB_NO_MEMORY(ctdb, *list);
2720                 (*list)[*count] = talloc_strdup(*list, s);
2721                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2722                 (*count)++;
2723         }
2724
2725         talloc_free(p);
2726
2727         return 0;
2728 }
2729
2730
2731 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2732                                    struct timeval timeout, uint32_t destnode,
2733                                    TALLOC_CTX *mem_ctx,
2734                                    uint32_t flags,
2735                                    struct ctdb_all_public_ips **ips)
2736 {
2737         int ret;
2738         TDB_DATA outdata;
2739         int32_t res;
2740
2741         ret = ctdb_control(ctdb, destnode, 0, 
2742                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2743                            mem_ctx, &outdata, &res, &timeout, NULL);
2744         if (ret == 0 && res == -1) {
2745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2746                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2747         }
2748         if (ret != 0 || res != 0) {
2749           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2750                 return -1;
2751         }
2752
2753         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2754         talloc_free(outdata.dptr);
2755                     
2756         return 0;
2757 }
2758
2759 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2760                              struct timeval timeout, uint32_t destnode,
2761                              TALLOC_CTX *mem_ctx,
2762                              struct ctdb_all_public_ips **ips)
2763 {
2764         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2765                                               destnode, mem_ctx,
2766                                               0, ips);
2767 }
2768
2769 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2770                         struct timeval timeout, uint32_t destnode, 
2771                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2772 {
2773         int ret, i, len;
2774         TDB_DATA outdata;
2775         int32_t res;
2776         struct ctdb_all_public_ipsv4 *ipsv4;
2777
2778         ret = ctdb_control(ctdb, destnode, 0, 
2779                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2780                            mem_ctx, &outdata, &res, &timeout, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2783                 return -1;
2784         }
2785
2786         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2787         len = offsetof(struct ctdb_all_public_ips, ips) +
2788                 ipsv4->num*sizeof(struct ctdb_public_ip);
2789         *ips = talloc_zero_size(mem_ctx, len);
2790         CTDB_NO_MEMORY(ctdb, *ips);
2791         (*ips)->num = ipsv4->num;
2792         for (i=0; i<ipsv4->num; i++) {
2793                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2794                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2795         }
2796
2797         talloc_free(outdata.dptr);
2798                     
2799         return 0;
2800 }
2801
2802 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2803                                  struct timeval timeout, uint32_t destnode,
2804                                  TALLOC_CTX *mem_ctx,
2805                                  const ctdb_sock_addr *addr,
2806                                  struct ctdb_control_public_ip_info **_info)
2807 {
2808         int ret;
2809         TDB_DATA indata;
2810         TDB_DATA outdata;
2811         int32_t res;
2812         struct ctdb_control_public_ip_info *info;
2813         uint32_t len;
2814         uint32_t i;
2815
2816         indata.dptr = discard_const_p(uint8_t, addr);
2817         indata.dsize = sizeof(*addr);
2818
2819         ret = ctdb_control(ctdb, destnode, 0,
2820                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2821                            mem_ctx, &outdata, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2824                                 "failed ret:%d res:%d\n",
2825                                 ret, res));
2826                 return -1;
2827         }
2828
2829         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2830         if (len > outdata.dsize) {
2831                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2832                                 "returned invalid data with size %u > %u\n",
2833                                 (unsigned int)outdata.dsize,
2834                                 (unsigned int)len));
2835                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2836                 return -1;
2837         }
2838
2839         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2840         len += info->num*sizeof(struct ctdb_control_iface_info);
2841
2842         if (len > outdata.dsize) {
2843                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2844                                 "returned invalid data with size %u > %u\n",
2845                                 (unsigned int)outdata.dsize,
2846                                 (unsigned int)len));
2847                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2848                 return -1;
2849         }
2850
2851         /* make sure we null terminate the returned strings */
2852         for (i=0; i < info->num; i++) {
2853                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2854         }
2855
2856         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2857                                                                 outdata.dptr,
2858                                                                 outdata.dsize);
2859         talloc_free(outdata.dptr);
2860         if (*_info == NULL) {
2861                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2862                                 "talloc_memdup size %u failed\n",
2863                                 (unsigned int)outdata.dsize));
2864                 return -1;
2865         }
2866
2867         return 0;
2868 }
2869
2870 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2871                          struct timeval timeout, uint32_t destnode,
2872                          TALLOC_CTX *mem_ctx,
2873                          struct ctdb_control_get_ifaces **_ifaces)
2874 {
2875         int ret;
2876         TDB_DATA outdata;
2877         int32_t res;
2878         struct ctdb_control_get_ifaces *ifaces;
2879         uint32_t len;
2880         uint32_t i;
2881
2882         ret = ctdb_control(ctdb, destnode, 0,
2883                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2884                            mem_ctx, &outdata, &res, &timeout, NULL);
2885         if (ret != 0 || res != 0) {
2886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2887                                 "failed ret:%d res:%d\n",
2888                                 ret, res));
2889                 return -1;
2890         }
2891
2892         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2893         if (len > outdata.dsize) {
2894                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2895                                 "returned invalid data with size %u > %u\n",
2896                                 (unsigned int)outdata.dsize,
2897                                 (unsigned int)len));
2898                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2899                 return -1;
2900         }
2901
2902         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2903         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2904
2905         if (len > outdata.dsize) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2907                                 "returned invalid data with size %u > %u\n",
2908                                 (unsigned int)outdata.dsize,
2909                                 (unsigned int)len));
2910                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2911                 return -1;
2912         }
2913
2914         /* make sure we null terminate the returned strings */
2915         for (i=0; i < ifaces->num; i++) {
2916                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2917         }
2918
2919         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2920                                                                   outdata.dptr,
2921                                                                   outdata.dsize);
2922         talloc_free(outdata.dptr);
2923         if (*_ifaces == NULL) {
2924                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2925                                 "talloc_memdup size %u failed\n",
2926                                 (unsigned int)outdata.dsize));
2927                 return -1;
2928         }
2929
2930         return 0;
2931 }
2932
2933 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2934                              struct timeval timeout, uint32_t destnode,
2935                              TALLOC_CTX *mem_ctx,
2936                              const struct ctdb_control_iface_info *info)
2937 {
2938         int ret;
2939         TDB_DATA indata;
2940         int32_t res;
2941
2942         indata.dptr = discard_const_p(uint8_t, info);
2943         indata.dsize = sizeof(*info);
2944
2945         ret = ctdb_control(ctdb, destnode, 0,
2946                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2947                            mem_ctx, NULL, &res, &timeout, NULL);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2950                                 "failed ret:%d res:%d\n",
2951                                 ret, res));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   set/clear the permanent disabled bit on a remote node
2960  */
2961 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2962                        uint32_t set, uint32_t clear)
2963 {
2964         int ret;
2965         TDB_DATA data;
2966         struct ctdb_node_map *nodemap=NULL;
2967         struct ctdb_node_flag_change c;
2968         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2969         uint32_t recmaster;
2970         uint32_t *nodes;
2971
2972
2973         /* find the recovery master */
2974         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2977                 talloc_free(tmp_ctx);
2978                 return ret;
2979         }
2980
2981
2982         /* read the node flags from the recmaster */
2983         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2984         if (ret != 0) {
2985                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2986                 talloc_free(tmp_ctx);
2987                 return -1;
2988         }
2989         if (destnode >= nodemap->num) {
2990                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2991                 talloc_free(tmp_ctx);
2992                 return -1;
2993         }
2994
2995         c.pnn       = destnode;
2996         c.old_flags = nodemap->nodes[destnode].flags;
2997         c.new_flags = c.old_flags;
2998         c.new_flags |= set;
2999         c.new_flags &= ~clear;
3000
3001         data.dsize = sizeof(c);
3002         data.dptr = (unsigned char *)&c;
3003
3004         /* send the flags update to all connected nodes */
3005         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3006
3007         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3008                                         nodes, 0,
3009                                         timeout, false, data,
3010                                         NULL, NULL,
3011                                         NULL) != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3013
3014                 talloc_free(tmp_ctx);
3015                 return -1;
3016         }
3017
3018         talloc_free(tmp_ctx);
3019         return 0;
3020 }
3021
3022
3023 /*
3024   get all tunables
3025  */
3026 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3027                                struct timeval timeout, 
3028                                uint32_t destnode,
3029                                struct ctdb_tunable *tunables)
3030 {
3031         TDB_DATA outdata;
3032         int ret;
3033         int32_t res;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3036                            &outdata, &res, &timeout, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3039                 return -1;
3040         }
3041
3042         if (outdata.dsize != sizeof(*tunables)) {
3043                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3044                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3045                 return -1;              
3046         }
3047
3048         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3049         talloc_free(outdata.dptr);
3050         return 0;
3051 }
3052
3053 /*
3054   add a public address to a node
3055  */
3056 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3057                       struct timeval timeout, 
3058                       uint32_t destnode,
3059                       struct ctdb_control_ip_iface *pub)
3060 {
3061         TDB_DATA data;
3062         int32_t res;
3063         int ret;
3064
3065         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3066         data.dptr  = (unsigned char *)pub;
3067
3068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3069                            NULL, &res, &timeout, NULL);
3070         if (ret != 0 || res != 0) {
3071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3072                 return -1;
3073         }
3074
3075         return 0;
3076 }
3077
3078 /*
3079   delete a public address from a node
3080  */
3081 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3082                       struct timeval timeout, 
3083                       uint32_t destnode,
3084                       struct ctdb_control_ip_iface *pub)
3085 {
3086         TDB_DATA data;
3087         int32_t res;
3088         int ret;
3089
3090         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3091         data.dptr  = (unsigned char *)pub;
3092
3093         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3094                            NULL, &res, &timeout, NULL);
3095         if (ret != 0 || res != 0) {
3096                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3097                 return -1;
3098         }
3099
3100         return 0;
3101 }
3102
3103 /*
3104   kill a tcp connection
3105  */
3106 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3107                       struct timeval timeout, 
3108                       uint32_t destnode,
3109                       struct ctdb_control_killtcp *killtcp)
3110 {
3111         TDB_DATA data;
3112         int32_t res;
3113         int ret;
3114
3115         data.dsize = sizeof(struct ctdb_control_killtcp);
3116         data.dptr  = (unsigned char *)killtcp;
3117
3118         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3119                            NULL, &res, &timeout, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128 /*
3129   send a gratious arp
3130  */
3131 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3132                       struct timeval timeout, 
3133                       uint32_t destnode,
3134                       ctdb_sock_addr *addr,
3135                       const char *ifname)
3136 {
3137         TDB_DATA data;
3138         int32_t res;
3139         int ret, len;
3140         struct ctdb_control_gratious_arp *gratious_arp;
3141         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3142
3143
3144         len = strlen(ifname)+1;
3145         gratious_arp = talloc_size(tmp_ctx, 
3146                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3147         CTDB_NO_MEMORY(ctdb, gratious_arp);
3148
3149         gratious_arp->addr = *addr;
3150         gratious_arp->len = len;
3151         memcpy(&gratious_arp->iface[0], ifname, len);
3152
3153
3154         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3155         data.dptr  = (unsigned char *)gratious_arp;
3156
3157         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3158                            NULL, &res, &timeout, NULL);
3159         if (ret != 0 || res != 0) {
3160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3161                 talloc_free(tmp_ctx);
3162                 return -1;
3163         }
3164
3165         talloc_free(tmp_ctx);
3166         return 0;
3167 }
3168
3169 /*
3170   get a list of all tcp tickles that a node knows about for a particular vnn
3171  */
3172 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3173                               struct timeval timeout, uint32_t destnode, 
3174                               TALLOC_CTX *mem_ctx, 
3175                               ctdb_sock_addr *addr,
3176                               struct ctdb_control_tcp_tickle_list **list)
3177 {
3178         int ret;
3179         TDB_DATA data, outdata;
3180         int32_t status;
3181
3182         data.dptr = (uint8_t*)addr;
3183         data.dsize = sizeof(ctdb_sock_addr);
3184
3185         ret = ctdb_control(ctdb, destnode, 0, 
3186                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3187                            mem_ctx, &outdata, &status, NULL, NULL);
3188         if (ret != 0 || status != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3190                 return -1;
3191         }
3192
3193         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3194
3195         return status;
3196 }
3197
3198 /*
3199   register a server id
3200  */
3201 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3202                       struct timeval timeout, 
3203                       struct ctdb_server_id *id)
3204 {
3205         TDB_DATA data;
3206         int32_t res;
3207         int ret;
3208
3209         data.dsize = sizeof(struct ctdb_server_id);
3210         data.dptr  = (unsigned char *)id;
3211
3212         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3213                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3214                         0, data, NULL,
3215                         NULL, &res, &timeout, NULL);
3216         if (ret != 0 || res != 0) {
3217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3218                 return -1;
3219         }
3220
3221         return 0;
3222 }
3223
3224 /*
3225   unregister a server id
3226  */
3227 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3228                       struct timeval timeout, 
3229                       struct ctdb_server_id *id)
3230 {
3231         TDB_DATA data;
3232         int32_t res;
3233         int ret;
3234
3235         data.dsize = sizeof(struct ctdb_server_id);
3236         data.dptr  = (unsigned char *)id;
3237
3238         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3239                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3240                         0, data, NULL,
3241                         NULL, &res, &timeout, NULL);
3242         if (ret != 0 || res != 0) {
3243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3244                 return -1;
3245         }
3246
3247         return 0;
3248 }
3249
3250
3251 /*
3252   check if a server id exists
3253
3254   if a server id does exist, return *status == 1, otherwise *status == 0
3255  */
3256 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3257                       struct timeval timeout, 
3258                       uint32_t destnode,
3259                       struct ctdb_server_id *id,
3260                       uint32_t *status)
3261 {
3262         TDB_DATA data;
3263         int32_t res;
3264         int ret;
3265
3266         data.dsize = sizeof(struct ctdb_server_id);
3267         data.dptr  = (unsigned char *)id;
3268
3269         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3270                         0, data, NULL,
3271                         NULL, &res, &timeout, NULL);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3274                 return -1;
3275         }
3276
3277         if (res) {
3278                 *status = 1;
3279         } else {
3280                 *status = 0;
3281         }
3282
3283         return 0;
3284 }
3285
3286 /*
3287    get the list of server ids that are registered on a node
3288 */
3289 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3290                 TALLOC_CTX *mem_ctx,
3291                 struct timeval timeout, uint32_t destnode, 
3292                 struct ctdb_server_id_list **svid_list)
3293 {
3294         int ret;
3295         TDB_DATA outdata;
3296         int32_t res;
3297
3298         ret = ctdb_control(ctdb, destnode, 0, 
3299                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3300                            mem_ctx, &outdata, &res, &timeout, NULL);
3301         if (ret != 0 || res != 0) {
3302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3303                 return -1;
3304         }
3305
3306         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3307                     
3308         return 0;
3309 }
3310
3311 /*
3312   initialise the ctdb daemon for client applications
3313
3314   NOTE: In current code the daemon does not fork. This is for testing purposes only
3315   and to simplify the code.
3316 */
3317 struct ctdb_context *ctdb_init(struct event_context *ev)
3318 {
3319         int ret;
3320         struct ctdb_context *ctdb;
3321
3322         ctdb = talloc_zero(ev, struct ctdb_context);
3323         if (ctdb == NULL) {
3324                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3325                 return NULL;
3326         }
3327         ctdb->ev  = ev;
3328         ctdb->idr = idr_init(ctdb);
3329         /* Wrap early to exercise code. */
3330         ctdb->lastid = INT_MAX-200;
3331         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3332
3333         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3336                 talloc_free(ctdb);
3337                 return NULL;
3338         }
3339
3340         ctdb->statistics.statistics_start_time = timeval_current();
3341
3342         return ctdb;
3343 }
3344
3345
3346 /*
3347   set some ctdb flags
3348 */
3349 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3350 {
3351         ctdb->flags |= flags;
3352 }
3353
3354 /*
3355   setup the local socket name
3356 */
3357 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3358 {
3359         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3360         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3361
3362         return 0;
3363 }
3364
3365 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3366 {
3367         return ctdb->daemon.name;
3368 }
3369
3370 /*
3371   return the pnn of this node
3372 */
3373 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3374 {
3375         return ctdb->pnn;
3376 }
3377
3378
3379 /*
3380   get the uptime of a remote node
3381  */
3382 struct ctdb_client_control_state *
3383 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3384 {
3385         return ctdb_control_send(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3387                            mem_ctx, &timeout, NULL);
3388 }
3389
3390 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3391 {
3392         int ret;
3393         int32_t res;
3394         TDB_DATA outdata;
3395
3396         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3397         if (ret != 0 || res != 0) {
3398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3399                 return -1;
3400         }
3401
3402         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3403
3404         return 0;
3405 }
3406
3407 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3408 {
3409         struct ctdb_client_control_state *state;
3410
3411         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3412         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3413 }
3414
3415 /*
3416   send a control to execute the "recovered" event script on a node
3417  */
3418 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3419 {
3420         int ret;
3421         int32_t status;
3422
3423         ret = ctdb_control(ctdb, destnode, 0, 
3424                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3425                            NULL, NULL, &status, &timeout, NULL);
3426         if (ret != 0 || status != 0) {
3427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3428                 return -1;
3429         }
3430
3431         return 0;
3432 }
3433
3434 /* 
3435   callback for the async helpers used when sending the same control
3436   to multiple nodes in parallell.
3437 */
3438 static void async_callback(struct ctdb_client_control_state *state)
3439 {
3440         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3441         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res = -1;
3445         uint32_t destnode = state->c->hdr.destnode;
3446
3447         /* one more node has responded with recmode data */
3448         data->count--;
3449
3450         /* if we failed to push the db, then return an error and let
3451            the main loop try again.
3452         */
3453         if (state->state != CTDB_CONTROL_DONE) {
3454                 if ( !data->dont_log_errors) {
3455                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3456                 }
3457                 data->fail_count++;
3458                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3459                         res = -ETIME;
3460                 } else {
3461                         res = -1;
3462                 }
3463                 if (data->fail_callback) {
3464                         data->fail_callback(ctdb, destnode, res, outdata,
3465                                         data->callback_data);
3466                 }
3467                 return;
3468         }
3469         
3470         state->async.fn = NULL;
3471
3472         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3473         if ((ret != 0) || (res != 0)) {
3474                 if ( !data->dont_log_errors) {
3475                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3476                 }
3477                 data->fail_count++;
3478                 if (data->fail_callback) {
3479                         data->fail_callback(ctdb, destnode, res, outdata,
3480                                         data->callback_data);
3481                 }
3482         }
3483         if ((ret == 0) && (data->callback != NULL)) {
3484                 data->callback(ctdb, destnode, res, outdata,
3485                                         data->callback_data);
3486         }
3487 }
3488
3489
3490 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3491 {
3492         /* set up the callback functions */
3493         state->async.fn = async_callback;
3494         state->async.private_data = data;
3495         
3496         /* one more control to wait for to complete */
3497         data->count++;
3498 }
3499
3500
3501 /* wait for up to the maximum number of seconds allowed
3502    or until all nodes we expect a response from has replied
3503 */
3504 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3505 {
3506         while (data->count > 0) {
3507                 event_loop_once(ctdb->ev);
3508         }
3509         if (data->fail_count != 0) {
3510                 if (!data->dont_log_errors) {
3511                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3512                                  data->fail_count));
3513                 }
3514                 return -1;
3515         }
3516         return 0;
3517 }
3518
3519
3520 /* 
3521    perform a simple control on the listed nodes
3522    The control cannot return data
3523  */
3524 int ctdb_client_async_control(struct ctdb_context *ctdb,
3525                                 enum ctdb_controls opcode,
3526                                 uint32_t *nodes,
3527                                 uint64_t srvid,
3528                                 struct timeval timeout,
3529                                 bool dont_log_errors,
3530                                 TDB_DATA data,
3531                                 client_async_callback client_callback,
3532                                 client_async_callback fail_callback,
3533                                 void *callback_data)
3534 {
3535         struct client_async_data *async_data;
3536         struct ctdb_client_control_state *state;
3537         int j, num_nodes;
3538
3539         async_data = talloc_zero(ctdb, struct client_async_data);
3540         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3541         async_data->dont_log_errors = dont_log_errors;
3542         async_data->callback = client_callback;
3543         async_data->fail_callback = fail_callback;
3544         async_data->callback_data = callback_data;
3545         async_data->opcode        = opcode;
3546
3547         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3548
3549         /* loop over all nodes and send an async control to each of them */
3550         for (j=0; j<num_nodes; j++) {
3551                 uint32_t pnn = nodes[j];
3552
3553                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3554                                           0, data, async_data, &timeout, NULL);
3555                 if (state == NULL) {
3556                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3557                         talloc_free(async_data);
3558                         return -1;
3559                 }
3560                 
3561                 ctdb_client_async_add(async_data, state);
3562         }
3563
3564         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3565                 talloc_free(async_data);
3566                 return -1;
3567         }
3568
3569         talloc_free(async_data);
3570         return 0;
3571 }
3572
3573 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3574                                 struct ctdb_vnn_map *vnn_map,
3575                                 TALLOC_CTX *mem_ctx,
3576                                 bool include_self)
3577 {
3578         int i, j, num_nodes;
3579         uint32_t *nodes;
3580
3581         for (i=num_nodes=0;i<vnn_map->size;i++) {
3582                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3583                         continue;
3584                 }
3585                 num_nodes++;
3586         } 
3587
3588         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3589         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3590
3591         for (i=j=0;i<vnn_map->size;i++) {
3592                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3593                         continue;
3594                 }
3595                 nodes[j++] = vnn_map->map[i];
3596         } 
3597
3598         return nodes;
3599 }
3600
3601 /* Get list of nodes not including those with flags specified by mask.
3602  * If exclude_pnn is not -1 then exclude that pnn from the list.
3603  */
3604 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3605                         struct ctdb_node_map *node_map,
3606                         TALLOC_CTX *mem_ctx,
3607                         uint32_t mask,
3608                         int exclude_pnn)
3609 {
3610         int i, j, num_nodes;
3611         uint32_t *nodes;
3612
3613         for (i=num_nodes=0;i<node_map->num;i++) {
3614                 if (node_map->nodes[i].flags & mask) {
3615                         continue;
3616                 }
3617                 if (node_map->nodes[i].pnn == exclude_pnn) {
3618                         continue;
3619                 }
3620                 num_nodes++;
3621         } 
3622
3623         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3624         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3625
3626         for (i=j=0;i<node_map->num;i++) {
3627                 if (node_map->nodes[i].flags & mask) {
3628                         continue;
3629                 }
3630                 if (node_map->nodes[i].pnn == exclude_pnn) {
3631                         continue;
3632                 }
3633                 nodes[j++] = node_map->nodes[i].pnn;
3634         } 
3635
3636         return nodes;
3637 }
3638
3639 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3640                                 struct ctdb_node_map *node_map,
3641                                 TALLOC_CTX *mem_ctx,
3642                                 bool include_self)
3643 {
3644         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3645                              include_self ? -1 : ctdb->pnn);
3646 }
3647
3648 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3649                                 struct ctdb_node_map *node_map,
3650                                 TALLOC_CTX *mem_ctx,
3651                                 bool include_self)
3652 {
3653         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3654                              include_self ? -1 : ctdb->pnn);
3655 }
3656
3657 /* 
3658   this is used to test if a pnn lock exists and if it exists will return
3659   the number of connections that pnn has reported or -1 if that recovery
3660   daemon is not running.
3661 */
3662 int
3663 ctdb_read_pnn_lock(int fd, int32_t pnn)
3664 {
3665         struct flock lock;
3666         char c;
3667
3668         lock.l_type = F_WRLCK;
3669         lock.l_whence = SEEK_SET;
3670         lock.l_start = pnn;
3671         lock.l_len = 1;
3672         lock.l_pid = 0;
3673
3674         if (fcntl(fd, F_GETLK, &lock) != 0) {
3675                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3676                 return -1;
3677         }
3678
3679         if (lock.l_type == F_UNLCK) {
3680                 return -1;
3681         }
3682
3683         if (pread(fd, &c, 1, pnn) == -1) {
3684                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3685                 return -1;
3686         }
3687
3688         return c;
3689 }
3690
3691 /*
3692   get capabilities of a remote node
3693  */
3694 struct ctdb_client_control_state *
3695 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3696 {
3697         return ctdb_control_send(ctdb, destnode, 0, 
3698                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3699                            mem_ctx, &timeout, NULL);
3700 }
3701
3702 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3703 {
3704         int ret;
3705         int32_t res;
3706         TDB_DATA outdata;
3707
3708         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3709         if ( (ret != 0) || (res != 0) ) {
3710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3711                 return -1;
3712         }
3713
3714         if (capabilities) {
3715                 *capabilities = *((uint32_t *)outdata.dptr);
3716         }
3717
3718         return 0;
3719 }
3720
3721 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3722 {
3723         struct ctdb_client_control_state *state;
3724         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3725         int ret;
3726
3727         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3728         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3729         talloc_free(tmp_ctx);
3730         return ret;
3731 }
3732
3733 struct server_id {
3734         uint64_t pid;
3735         uint32_t task_id;
3736         uint32_t vnn;
3737         uint64_t unique_id;
3738 };
3739
3740 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3741 {
3742         struct server_id id;
3743
3744         id.pid = getpid();
3745         id.task_id = reqid;
3746         id.vnn = ctdb_get_pnn(ctdb);
3747         id.unique_id = id.vnn;
3748         id.unique_id = (id.unique_id << 32) | reqid;
3749
3750         return id;
3751 }
3752
3753 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3754 {
3755         if (id1->pid != id2->pid) {
3756                 return false;
3757         }
3758
3759         if (id1->task_id != id2->task_id) {
3760                 return false;
3761         }
3762
3763         if (id1->vnn != id2->vnn) {
3764                 return false;
3765         }
3766
3767         if (id1->unique_id != id2->unique_id) {
3768                 return false;
3769         }
3770
3771         return true;
3772 }
3773
3774 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3775 {
3776         struct ctdb_server_id sid;
3777         int ret;
3778         uint32_t result;
3779
3780         sid.type = SERVER_TYPE_SAMBA;
3781         sid.pnn = id->vnn;
3782         sid.server_id = id->pid;
3783
3784         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3785                                         id->vnn, &sid, &result);
3786         if (ret != 0) {
3787                 /* If control times out, assume server_id exists. */
3788                 return true;
3789         }
3790
3791         if (result) {
3792                 return true;
3793         }
3794
3795         return false;
3796 }
3797
3798
3799 enum g_lock_type {
3800         G_LOCK_READ = 0,
3801         G_LOCK_WRITE = 1,
3802 };
3803
3804 struct g_lock_rec {
3805         enum g_lock_type type;
3806         struct server_id id;
3807 };
3808
3809 struct g_lock_recs {
3810         unsigned int num;
3811         struct g_lock_rec *lock;
3812 };
3813
3814 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3815                          struct g_lock_recs **locks)
3816 {
3817         struct g_lock_recs *recs;
3818
3819         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3820         if (recs == NULL) {
3821                 return false;
3822         }
3823
3824         if (data.dsize == 0) {
3825                 goto done;
3826         }
3827
3828         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3829                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3830                                   data.dsize));
3831                 talloc_free(recs);
3832                 return false;
3833         }
3834
3835         recs->num = data.dsize / sizeof(struct g_lock_rec);
3836         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3837         if (recs->lock == NULL) {
3838                 talloc_free(recs);
3839                 return false;
3840         }
3841
3842 done:
3843         if (locks != NULL) {
3844                 *locks = recs;
3845         }
3846
3847         return true;
3848 }
3849
3850
3851 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3852                         struct ctdb_db_context *ctdb_db,
3853                         const char *keyname, uint32_t reqid)
3854 {
3855         TDB_DATA key, data;
3856         struct ctdb_record_handle *h;
3857         struct g_lock_recs *locks;
3858         struct server_id id;
3859         int i;
3860
3861         key.dptr = (uint8_t *)discard_const(keyname);
3862         key.dsize = strlen(keyname) + 1;
3863         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3864         if (h == NULL) {
3865                 return false;
3866         }
3867
3868         if (!g_lock_parse(h, data, &locks)) {
3869                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3870                 talloc_free(data.dptr);
3871                 talloc_free(h);
3872                 return false;
3873         }
3874
3875         talloc_free(data.dptr);
3876
3877         id = server_id_get(ctdb_db->ctdb, reqid);
3878
3879         i = 0;
3880         while (i < locks->num) {
3881                 if (server_id_equal(&locks->lock[i].id, &id)) {
3882                         /* Internal error */
3883                         talloc_free(h);
3884                         return false;
3885                 }
3886
3887                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3888                         if (i < locks->num-1) {
3889                                 locks->lock[i] = locks->lock[locks->num-1];
3890                         }
3891                         locks->num--;
3892                         continue;
3893                 }
3894
3895                 /* This entry is locked. */
3896                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3897                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3898                                    (unsigned long long)id.pid,
3899                                    id.task_id, id.vnn,
3900                                    (unsigned long long)id.unique_id));
3901                 talloc_free(h);
3902                 return false;
3903         }
3904
3905         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3906                                      locks->num+1);
3907         if (locks->lock == NULL) {
3908                 talloc_free(h);
3909                 return false;
3910         }
3911
3912         locks->lock[locks->num].type = G_LOCK_WRITE;
3913         locks->lock[locks->num].id = id;
3914         locks->num++;
3915
3916         data.dptr = (uint8_t *)locks->lock;
3917         data.dsize = locks->num * sizeof(struct g_lock_rec);
3918
3919         if (ctdb_record_store(h, data) != 0) {
3920                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3921                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3922                                   (unsigned long long)id.pid,
3923                                   id.task_id, id.vnn,
3924                                   (unsigned long long)id.unique_id));
3925                 talloc_free(h);
3926                 return false;
3927         }
3928
3929         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3930                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3931                            (unsigned long long)id.pid,
3932                            id.task_id, id.vnn,
3933                            (unsigned long long)id.unique_id));
3934
3935         talloc_free(h);
3936         return true;
3937 }
3938
3939 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3940                           struct ctdb_db_context *ctdb_db,
3941                           const char *keyname, uint32_t reqid)
3942 {
3943         TDB_DATA key, data;
3944         struct ctdb_record_handle *h;
3945         struct g_lock_recs *locks;
3946         struct server_id id;
3947         int i;
3948         bool found = false;
3949
3950         key.dptr = (uint8_t *)discard_const(keyname);
3951         key.dsize = strlen(keyname) + 1;
3952         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3953         if (h == NULL) {
3954                 return false;
3955         }
3956
3957         if (!g_lock_parse(h, data, &locks)) {
3958                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3959                 talloc_free(data.dptr);
3960                 talloc_free(h);
3961                 return false;
3962         }
3963
3964         talloc_free(data.dptr);
3965
3966         id = server_id_get(ctdb_db->ctdb, reqid);
3967
3968         for (i=0; i<locks->num; i++) {
3969                 if (server_id_equal(&locks->lock[i].id, &id)) {
3970                         if (i < locks->num-1) {
3971                                 locks->lock[i] = locks->lock[locks->num-1];
3972                         }
3973                         locks->num--;
3974                         found = true;
3975                         break;
3976                 }
3977         }
3978
3979         if (!found) {
3980                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3981                 talloc_free(h);
3982                 return false;
3983         }
3984
3985         data.dptr = (uint8_t *)locks->lock;
3986         data.dsize = locks->num * sizeof(struct g_lock_rec);
3987
3988         if (ctdb_record_store(h, data) != 0) {
3989                 talloc_free(h);
3990                 return false;
3991         }
3992
3993         talloc_free(h);
3994         return true;
3995 }
3996
3997 /**
3998  * check whether a transaction is active on a given db on a given node
3999  */
4000 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
4001                                      uint32_t destnode,
4002                                      uint32_t db_id)
4003 {
4004         int32_t status;
4005         int ret;
4006         TDB_DATA indata;
4007
4008         indata.dptr = (uint8_t *)&db_id;
4009         indata.dsize = sizeof(db_id);
4010
4011         ret = ctdb_control(ctdb, destnode, 0,
4012                            CTDB_CONTROL_TRANS2_ACTIVE,
4013                            0, indata, NULL, NULL, &status,
4014                            NULL, NULL);
4015
4016         if (ret != 0) {
4017                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
4018                 return -1;
4019         }
4020
4021         return status;
4022 }
4023
4024
4025 struct ctdb_transaction_handle {
4026         struct ctdb_db_context *ctdb_db;
4027         bool in_replay;
4028         /*
4029          * we store the reads and writes done under a transaction:
4030          * - one list stores both reads and writes (m_all),
4031          * - the other just writes (m_write)
4032          */
4033         struct ctdb_marshall_buffer *m_all;
4034         struct ctdb_marshall_buffer *m_write;
4035 };
4036
4037 /* start a transaction on a database */
4038 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4039 {
4040         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4041         return 0;
4042 }
4043
4044 /* start a transaction on a database */
4045 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
4046 {
4047         struct ctdb_record_handle *rh;
4048         TDB_DATA key;
4049         TDB_DATA data;
4050         struct ctdb_ltdb_header header;
4051         TALLOC_CTX *tmp_ctx;
4052         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
4053         int ret;
4054         struct ctdb_db_context *ctdb_db = h->ctdb_db;
4055         pid_t pid;
4056         int32_t status;
4057
4058         key.dptr = discard_const(keyname);
4059         key.dsize = strlen(keyname);
4060
4061         if (!ctdb_db->persistent) {
4062                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
4063                 return -1;
4064         }
4065
4066 again:
4067         tmp_ctx = talloc_new(h);
4068
4069         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
4070         if (rh == NULL) {
4071                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
4072                 talloc_free(tmp_ctx);
4073                 return -1;
4074         }
4075
4076         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
4077                                               CTDB_CURRENT_NODE,
4078                                               ctdb_db->db_id);
4079         if (status == 1) {
4080                 unsigned long int usec = (1000 + random()) % 100000;
4081                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
4082                                     "on db_id[0x%08x]. waiting for %lu "
4083                                     "microseconds\n",
4084                                     ctdb_db->db_id, usec));
4085                 talloc_free(tmp_ctx);
4086                 usleep(usec);
4087                 goto again;
4088         }
4089
4090         /*
4091          * store the pid in the database:
4092          * it is not enough that the node is dmaster...
4093          */
4094         pid = getpid();
4095         data.dptr = (unsigned char *)&pid;
4096         data.dsize = sizeof(pid_t);
4097         rh->header.rsn++;
4098         rh->header.dmaster = ctdb_db->ctdb->pnn;
4099         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
4100         if (ret != 0) {
4101                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
4102                                   "transaction record\n"));
4103                 talloc_free(tmp_ctx);
4104                 return -1;
4105         }
4106
4107         talloc_free(rh);
4108
4109         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
4110         if (ret != 0) {
4111                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
4112                 talloc_free(tmp_ctx);
4113                 return -1;
4114         }
4115
4116         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
4117         if (ret != 0) {
4118                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
4119                                  "lock record inside transaction\n"));
4120                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4121                 talloc_free(tmp_ctx);
4122                 goto again;
4123         }
4124
4125         if (header.dmaster != ctdb_db->ctdb->pnn) {
4126                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
4127                                    "transaction lock record\n"));
4128                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4129                 talloc_free(tmp_ctx);
4130                 goto again;
4131         }
4132
4133         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
4134                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
4135                                     "the transaction lock record\n"));
4136                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4137                 talloc_free(tmp_ctx);
4138                 goto again;
4139         }
4140
4141         talloc_free(tmp_ctx);
4142
4143         return 0;
4144 }
4145
4146
4147 /* start a transaction on a database */
4148 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4149                                                        TALLOC_CTX *mem_ctx)
4150 {
4151         struct ctdb_transaction_handle *h;
4152         int ret;
4153
4154         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4155         if (h == NULL) {
4156                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
4157                 return NULL;
4158         }
4159
4160         h->ctdb_db = ctdb_db;
4161
4162         ret = ctdb_transaction_fetch_start(h);
4163         if (ret != 0) {
4164                 talloc_free(h);
4165                 return NULL;
4166         }
4167
4168         talloc_set_destructor(h, ctdb_transaction_destructor);
4169
4170         return h;
4171 }
4172
4173
4174
4175 /*
4176   fetch a record inside a transaction
4177  */
4178 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
4179                            TALLOC_CTX *mem_ctx, 
4180                            TDB_DATA key, TDB_DATA *data)
4181 {
4182         struct ctdb_ltdb_header header;
4183         int ret;
4184
4185         ZERO_STRUCT(header);
4186
4187         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4188         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4189                 /* record doesn't exist yet */
4190                 *data = tdb_null;
4191                 ret = 0;
4192         }
4193         
4194         if (ret != 0) {
4195                 return ret;
4196         }
4197
4198         if (!h->in_replay) {
4199                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4200                 if (h->m_all == NULL) {
4201                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4202                         return -1;
4203                 }
4204         }
4205
4206         return 0;
4207 }
4208
4209 /*
4210   stores a record inside a transaction
4211  */
4212 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
4213                            TDB_DATA key, TDB_DATA data)
4214 {
4215         TALLOC_CTX *tmp_ctx = talloc_new(h);
4216         struct ctdb_ltdb_header header;
4217         TDB_DATA olddata;
4218         int ret;
4219
4220         ZERO_STRUCT(header);
4221
4222         /* we need the header so we can update the RSN */
4223         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4224         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4225                 /* the record doesn't exist - create one with us as dmaster.
4226                    This is only safe because we are in a transaction and this
4227                    is a persistent database */
4228                 ZERO_STRUCT(header);
4229         } else if (ret != 0) {
4230                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4231                 talloc_free(tmp_ctx);
4232                 return ret;
4233         }
4234
4235         if (data.dsize == olddata.dsize &&
4236             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
4237                 /* save writing the same data */
4238                 talloc_free(tmp_ctx);
4239                 return 0;
4240         }
4241
4242         header.dmaster = h->ctdb_db->ctdb->pnn;
4243         header.rsn++;
4244
4245         if (!h->in_replay) {
4246                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4247                 if (h->m_all == NULL) {
4248                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4249                         talloc_free(tmp_ctx);
4250                         return -1;
4251                 }
4252         }               
4253
4254         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4255         if (h->m_write == NULL) {
4256                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4257                 talloc_free(tmp_ctx);
4258                 return -1;
4259         }
4260         
4261         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
4262
4263         talloc_free(tmp_ctx);
4264         
4265         return ret;
4266 }
4267
4268 /*
4269   replay a transaction
4270  */
4271 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
4272 {
4273         int ret, i;
4274         struct ctdb_rec_data *rec = NULL;
4275
4276         h->in_replay = true;
4277         talloc_free(h->m_write);
4278         h->m_write = NULL;
4279
4280         ret = ctdb_transaction_fetch_start(h);
4281         if (ret != 0) {
4282                 return ret;
4283         }
4284
4285         for (i=0;i<h->m_all->count;i++) {
4286                 TDB_DATA key, data;
4287
4288                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
4289                 if (rec == NULL) {
4290                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
4291                         goto failed;
4292                 }
4293
4294                 if (rec->reqid == 0) {
4295                         /* its a store */
4296                         if (ctdb_transaction_store(h, key, data) != 0) {
4297                                 goto failed;
4298                         }
4299                 } else {
4300                         TDB_DATA data2;
4301                         TALLOC_CTX *tmp_ctx = talloc_new(h);
4302
4303                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
4304                                 talloc_free(tmp_ctx);
4305                                 goto failed;
4306                         }
4307                         if (data2.dsize != data.dsize ||
4308                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
4309                                 /* the record has changed on us - we have to give up */
4310                                 talloc_free(tmp_ctx);
4311                                 goto failed;
4312                         }
4313                         talloc_free(tmp_ctx);
4314                 }
4315         }
4316         
4317         return 0;
4318
4319 failed:
4320         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4321         return -1;
4322 }
4323
4324
4325 /*
4326   commit a transaction
4327  */
4328 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4329 {
4330         int ret, retries=0;
4331         int32_t status;
4332         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
4333         struct timeval timeout;
4334         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
4335
4336         talloc_set_destructor(h, NULL);
4337
4338         /* our commit strategy is quite complex.
4339
4340            - we first try to commit the changes to all other nodes
4341
4342            - if that works, then we commit locally and we are done
4343
4344            - if a commit on another node fails, then we need to cancel
4345              the transaction, then restart the transaction (thus
4346              opening a window of time for a pending recovery to
4347              complete), then replay the transaction, checking all the
4348              reads and writes (checking that reads give the same data,
4349              and writes succeed). Then we retry the transaction to the
4350              other nodes
4351         */
4352
4353 again:
4354         if (h->m_write == NULL) {
4355                 /* no changes were made */
4356                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4357                 talloc_free(h);
4358                 return 0;
4359         }
4360
4361         /* tell ctdbd to commit to the other nodes */
4362         timeout = timeval_current_ofs(1, 0);
4363         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4364                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4365                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4366                            &timeout, NULL);
4367         if (ret != 0 || status != 0) {
4368                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4369                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4370                                      ", retrying after 1 second...\n",
4371                                      (retries==0)?"":"retry "));
4372                 sleep(1);
4373
4374                 if (ret != 0) {
4375                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4376                 } else {
4377                         /* work out what error code we will give if we 
4378                            have to fail the operation */
4379                         switch ((enum ctdb_trans2_commit_error)status) {
4380                         case CTDB_TRANS2_COMMIT_SUCCESS:
4381                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4382                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4383                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4384                                 break;
4385                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4386                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4387                                 break;
4388                         }
4389                 }
4390
4391                 if (++retries == 100) {
4392                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4393                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4394                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4395                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4396                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4397                         talloc_free(h);
4398                         return -1;
4399                 }               
4400
4401                 if (ctdb_replay_transaction(h) != 0) {
4402                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4403                                           "transaction on db 0x%08x, "
4404                                           "failure control =%u\n",
4405                                           h->ctdb_db->db_id,
4406                                           (unsigned)failure_control));
4407                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4408                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4409                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4410                         talloc_free(h);
4411                         return -1;
4412                 }
4413                 goto again;
4414         } else {
4415                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4416         }
4417
4418         /* do the real commit locally */
4419         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4420         if (ret != 0) {
4421                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4422                                   "on db id 0x%08x locally, "
4423                                   "failure_control=%u\n",
4424                                   h->ctdb_db->db_id,
4425                                   (unsigned)failure_control));
4426                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4427                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4428                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4429                 talloc_free(h);
4430                 return ret;
4431         }
4432
4433         /* tell ctdbd that we are finished with our local commit */
4434         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4435                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4436                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4437         talloc_free(h);
4438         return 0;
4439 }
4440
4441 /*
4442   recovery daemon ping to main daemon
4443  */
4444 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4445 {
4446         int ret;
4447         int32_t res;
4448
4449         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4450                            ctdb, NULL, &res, NULL, NULL);
4451         if (ret != 0 || res != 0) {
4452                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4453                 return -1;
4454         }
4455
4456         return 0;
4457 }
4458
4459 /* When forking the main daemon and the child process needs to connect
4460  * back to the daemon as a client process, this function can be used
4461  * to change the ctdb context from daemon into client mode.  The child
4462  * process must be created using ctdb_fork() and not fork() -
4463  * ctdb_fork() does some necessary housekeeping.
4464  */
4465 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4466 {
4467         int ret;
4468         va_list ap;
4469
4470         /* Add extra information so we can identify this in the logs */
4471         va_start(ap, fmt);
4472         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4473         va_end(ap);
4474
4475         /* get a new event context */
4476         ctdb->ev = event_context_init(ctdb);
4477         tevent_loop_allow_nesting(ctdb->ev);
4478
4479         /* Connect to main CTDB daemon */
4480         ret = ctdb_socket_connect(ctdb);
4481         if (ret != 0) {
4482                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4483                 return -1;
4484         }
4485
4486         ctdb->can_send_controls = true;
4487
4488         return 0;
4489 }
4490
4491 /*
4492   get the status of running the monitor eventscripts: NULL means never run.
4493  */
4494 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4495                 struct timeval timeout, uint32_t destnode, 
4496                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4497                 struct ctdb_scripts_wire **scripts)
4498 {
4499         int ret;
4500         TDB_DATA outdata, indata;
4501         int32_t res;
4502         uint32_t uinttype = type;
4503
4504         indata.dptr = (uint8_t *)&uinttype;
4505         indata.dsize = sizeof(uinttype);
4506
4507         ret = ctdb_control(ctdb, destnode, 0, 
4508                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4509                            mem_ctx, &outdata, &res, &timeout, NULL);
4510         if (ret != 0 || res != 0) {
4511                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4512                 return -1;
4513         }
4514
4515         if (outdata.dsize == 0) {
4516                 *scripts = NULL;
4517         } else {
4518                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4519                 talloc_free(outdata.dptr);
4520         }
4521                     
4522         return 0;
4523 }
4524
4525 /*
4526   tell the main daemon how long it took to lock the reclock file
4527  */
4528 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4529 {
4530         int ret;
4531         int32_t res;
4532         TDB_DATA data;
4533
4534         data.dptr = (uint8_t *)&latency;
4535         data.dsize = sizeof(latency);
4536
4537         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4538                            ctdb, NULL, &res, NULL, NULL);
4539         if (ret != 0 || res != 0) {
4540                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4541                 return -1;
4542         }
4543
4544         return 0;
4545 }
4546
4547 /*
4548   get the name of the reclock file
4549  */
4550 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4551                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4552                          const char **name)
4553 {
4554         int ret;
4555         int32_t res;
4556         TDB_DATA data;
4557
4558         ret = ctdb_control(ctdb, destnode, 0, 
4559                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4560                            mem_ctx, &data, &res, &timeout, NULL);
4561         if (ret != 0 || res != 0) {
4562                 return -1;
4563         }
4564
4565         if (data.dsize == 0) {
4566                 *name = NULL;
4567         } else {
4568                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4569         }
4570         talloc_free(data.dptr);
4571
4572         return 0;
4573 }
4574
4575 /*
4576   set the reclock filename for a node
4577  */
4578 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4579 {
4580         int ret;
4581         TDB_DATA data;
4582         int32_t res;
4583
4584         if (reclock == NULL) {
4585                 data.dsize = 0;
4586                 data.dptr  = NULL;
4587         } else {
4588                 data.dsize = strlen(reclock) + 1;
4589                 data.dptr  = discard_const(reclock);
4590         }
4591
4592         ret = ctdb_control(ctdb, destnode, 0, 
4593                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4594                            NULL, NULL, &res, &timeout, NULL);
4595         if (ret != 0 || res != 0) {
4596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4597                 return -1;
4598         }
4599
4600         return 0;
4601 }
4602
4603 /*
4604   stop a node
4605  */
4606 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4607 {
4608         int ret;
4609         int32_t res;
4610
4611         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4612                            ctdb, NULL, &res, &timeout, NULL);
4613         if (ret != 0 || res != 0) {
4614                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4615                 return -1;
4616         }
4617
4618         return 0;
4619 }
4620
4621 /*
4622   continue a node
4623  */
4624 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4625 {
4626         int ret;
4627
4628         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4629                            ctdb, NULL, NULL, &timeout, NULL);
4630         if (ret != 0) {
4631                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4632                 return -1;
4633         }
4634
4635         return 0;
4636 }
4637
4638 /*
4639   set the natgw state for a node
4640  */
4641 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4642 {
4643         int ret;
4644         TDB_DATA data;
4645         int32_t res;
4646
4647         data.dsize = sizeof(natgwstate);
4648         data.dptr  = (uint8_t *)&natgwstate;
4649
4650         ret = ctdb_control(ctdb, destnode, 0, 
4651                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4652                            NULL, NULL, &res, &timeout, NULL);
4653         if (ret != 0 || res != 0) {
4654                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4655                 return -1;
4656         }
4657
4658         return 0;
4659 }
4660
4661 /*
4662   set the lmaster role for a node
4663  */
4664 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4665 {
4666         int ret;
4667         TDB_DATA data;
4668         int32_t res;
4669
4670         data.dsize = sizeof(lmasterrole);
4671         data.dptr  = (uint8_t *)&lmasterrole;
4672
4673         ret = ctdb_control(ctdb, destnode, 0, 
4674                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4675                            NULL, NULL, &res, &timeout, NULL);
4676         if (ret != 0 || res != 0) {
4677                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4678                 return -1;
4679         }
4680
4681         return 0;
4682 }
4683
4684 /*
4685   set the recmaster role for a node
4686  */
4687 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4688 {
4689         int ret;
4690         TDB_DATA data;
4691         int32_t res;
4692
4693         data.dsize = sizeof(recmasterrole);
4694         data.dptr  = (uint8_t *)&recmasterrole;
4695
4696         ret = ctdb_control(ctdb, destnode, 0, 
4697                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4698                            NULL, NULL, &res, &timeout, NULL);
4699         if (ret != 0 || res != 0) {
4700                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4701                 return -1;
4702         }
4703
4704         return 0;
4705 }
4706
4707 /* enable an eventscript
4708  */
4709 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4710 {
4711         int ret;
4712         TDB_DATA data;
4713         int32_t res;
4714
4715         data.dsize = strlen(script) + 1;
4716         data.dptr  = discard_const(script);
4717
4718         ret = ctdb_control(ctdb, destnode, 0, 
4719                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4720                            NULL, NULL, &res, &timeout, NULL);
4721         if (ret != 0 || res != 0) {
4722                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4723                 return -1;
4724         }
4725
4726         return 0;
4727 }
4728
4729 /* disable an eventscript
4730  */
4731 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4732 {
4733         int ret;
4734         TDB_DATA data;
4735         int32_t res;
4736
4737         data.dsize = strlen(script) + 1;
4738         data.dptr  = discard_const(script);
4739
4740         ret = ctdb_control(ctdb, destnode, 0, 
4741                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4742                            NULL, NULL, &res, &timeout, NULL);
4743         if (ret != 0 || res != 0) {
4744                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4745                 return -1;
4746         }
4747
4748         return 0;
4749 }
4750
4751
4752 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4753 {
4754         int ret;
4755         TDB_DATA data;
4756         int32_t res;
4757
4758         data.dsize = sizeof(*bantime);
4759         data.dptr  = (uint8_t *)bantime;
4760
4761         ret = ctdb_control(ctdb, destnode, 0, 
4762                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4763                            NULL, NULL, &res, &timeout, NULL);
4764         if (ret != 0 || res != 0) {
4765                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4766                 return -1;
4767         }
4768
4769         return 0;
4770 }
4771
4772
4773 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4774 {
4775         int ret;
4776         TDB_DATA outdata;
4777         int32_t res;
4778         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4779
4780         ret = ctdb_control(ctdb, destnode, 0, 
4781                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4782                            tmp_ctx, &outdata, &res, &timeout, NULL);
4783         if (ret != 0 || res != 0) {
4784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4785                 talloc_free(tmp_ctx);
4786                 return -1;
4787         }
4788
4789         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4790         talloc_free(tmp_ctx);
4791
4792         return 0;
4793 }
4794
4795
4796 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4797 {
4798         int ret;
4799         int32_t res;
4800         TDB_DATA data;
4801         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4802
4803         data.dptr = (uint8_t*)db_prio;
4804         data.dsize = sizeof(*db_prio);
4805
4806         ret = ctdb_control(ctdb, destnode, 0, 
4807                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4808                            tmp_ctx, NULL, &res, &timeout, NULL);
4809         if (ret != 0 || res != 0) {
4810                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4811                 talloc_free(tmp_ctx);
4812                 return -1;
4813         }
4814
4815         talloc_free(tmp_ctx);
4816
4817         return 0;
4818 }
4819
4820 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4821 {
4822         int ret;
4823         int32_t res;
4824         TDB_DATA data;
4825         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4826
4827         data.dptr = (uint8_t*)&db_id;
4828         data.dsize = sizeof(db_id);
4829
4830         ret = ctdb_control(ctdb, destnode, 0, 
4831                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4832                            tmp_ctx, NULL, &res, &timeout, NULL);
4833         if (ret != 0 || res < 0) {
4834                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4835                 talloc_free(tmp_ctx);
4836                 return -1;
4837         }
4838
4839         if (priority) {
4840                 *priority = res;
4841         }
4842
4843         talloc_free(tmp_ctx);
4844
4845         return 0;
4846 }
4847
4848 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4849 {
4850         int ret;
4851         TDB_DATA outdata;
4852         int32_t res;
4853
4854         ret = ctdb_control(ctdb, destnode, 0, 
4855                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4856                            mem_ctx, &outdata, &res, &timeout, NULL);
4857         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4858                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4859                 return -1;
4860         }
4861
4862         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4863         talloc_free(outdata.dptr);
4864                     
4865         return 0;
4866 }
4867
4868 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4869 {
4870         if (h == NULL) {
4871                 return NULL;
4872         }
4873
4874         return &h->header;
4875 }
4876
4877
4878 struct ctdb_client_control_state *
4879 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4880 {
4881         struct ctdb_client_control_state *handle;
4882         struct ctdb_marshall_buffer *m;
4883         struct ctdb_rec_data *rec;
4884         TDB_DATA outdata;
4885
4886         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4887         if (m == NULL) {
4888                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4889                 return NULL;
4890         }
4891
4892         m->db_id = ctdb_db->db_id;
4893
4894         rec = ctdb_marshall_record(m, 0, key, header, data);
4895         if (rec == NULL) {
4896                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4897                 talloc_free(m);
4898                 return NULL;
4899         }
4900         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4901         if (m == NULL) {
4902                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4903                 talloc_free(m);
4904                 return NULL;
4905         }
4906         m->count++;
4907         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4908
4909
4910         outdata.dptr = (uint8_t *)m;
4911         outdata.dsize = talloc_get_size(m);
4912
4913         handle = ctdb_control_send(ctdb, destnode, 0, 
4914                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4915                            mem_ctx, &timeout, NULL);
4916         talloc_free(m);
4917         return handle;
4918 }
4919
4920 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4921 {
4922         int ret;
4923         int32_t res;
4924
4925         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4926         if ( (ret != 0) || (res != 0) ){
4927                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4928                 return -1;
4929         }
4930
4931         return 0;
4932 }
4933
4934 int
4935 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4936 {
4937         struct ctdb_client_control_state *state;
4938
4939         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4940         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4941 }
4942
4943
4944
4945
4946
4947
4948 /*
4949   set a database to be readonly
4950  */
4951 struct ctdb_client_control_state *
4952 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4953 {
4954         TDB_DATA data;
4955
4956         data.dptr = (uint8_t *)&dbid;
4957         data.dsize = sizeof(dbid);
4958
4959         return ctdb_control_send(ctdb, destnode, 0, 
4960                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4961                            ctdb, NULL, NULL);
4962 }
4963
4964 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4965 {
4966         int ret;
4967         int32_t res;
4968
4969         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4970         if (ret != 0 || res != 0) {
4971           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4972                 return -1;
4973         }
4974
4975         return 0;
4976 }
4977
4978 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4979 {
4980         struct ctdb_client_control_state *state;
4981
4982         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4983         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4984 }
4985
4986 /*
4987   set a database to be sticky
4988  */
4989 struct ctdb_client_control_state *
4990 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4991 {
4992         TDB_DATA data;
4993
4994         data.dptr = (uint8_t *)&dbid;
4995         data.dsize = sizeof(dbid);
4996
4997         return ctdb_control_send(ctdb, destnode, 0, 
4998                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4999                            ctdb, NULL, NULL);
5000 }
5001
5002 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
5003 {
5004         int ret;
5005         int32_t res;
5006
5007         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
5008         if (ret != 0 || res != 0) {
5009                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
5010                 return -1;
5011         }
5012
5013         return 0;
5014 }
5015
5016 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
5017 {
5018         struct ctdb_client_control_state *state;
5019
5020         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
5021         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
5022 }