ReadOnly: Add clientside functions to send the UPDATE_RECORD control
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return 0;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
391
392         return state;
393 }
394
395 /*
396   make a ctdb call to the local daemon - async send. Called from client context.
397
398   This constructs a ctdb_call request and queues it for processing. 
399   This call never blocks.
400 */
401 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
402                                               struct ctdb_call *call)
403 {
404         struct ctdb_client_call_state *state;
405         struct ctdb_context *ctdb = ctdb_db->ctdb;
406         struct ctdb_ltdb_header header;
407         TDB_DATA data;
408         int ret;
409         size_t len;
410         struct ctdb_req_call *c;
411
412         /* if the domain socket is not yet open, open it */
413         if (ctdb->daemon.sd==-1) {
414                 ctdb_socket_connect(ctdb);
415         }
416
417         ret = ctdb_ltdb_lock(ctdb_db, call->key);
418         if (ret != 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
420                 return NULL;
421         }
422
423         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424
425         if (ret == 0 && header.dmaster == ctdb->pnn) {
426                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
427                 talloc_free(data.dptr);
428                 ctdb_ltdb_unlock(ctdb_db, call->key);
429                 return state;
430         }
431
432         ctdb_ltdb_unlock(ctdb_db, call->key);
433         talloc_free(data.dptr);
434
435         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
436         if (state == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
438                 return NULL;
439         }
440         state->call = talloc_zero(state, struct ctdb_call);
441         if (state->call == NULL) {
442                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
443                 return NULL;
444         }
445
446         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
447         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
448         if (c == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
450                 return NULL;
451         }
452
453         state->reqid     = ctdb_reqid_new(ctdb, state);
454         state->ctdb_db = ctdb_db;
455         talloc_set_destructor(state, ctdb_client_call_destructor);
456
457         c->hdr.reqid     = state->reqid;
458         c->flags         = call->flags;
459         c->db_id         = ctdb_db->db_id;
460         c->callid        = call->call_id;
461         c->hopcount      = 0;
462         c->keylen        = call->key.dsize;
463         c->calldatalen   = call->call_data.dsize;
464         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
465         memcpy(&c->data[call->key.dsize], 
466                call->call_data.dptr, call->call_data.dsize);
467         *(state->call)              = *call;
468         state->call->call_data.dptr = &c->data[call->key.dsize];
469         state->call->key.dptr       = &c->data[0];
470
471         state->state  = CTDB_CALL_WAIT;
472
473
474         ctdb_client_queue_pkt(ctdb, &c->hdr);
475
476         return state;
477 }
478
479
480 /*
481   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
482 */
483 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
484 {
485         struct ctdb_client_call_state *state;
486
487         state = ctdb_call_send(ctdb_db, call);
488         return ctdb_call_recv(state, call);
489 }
490
491
492 /*
493   tell the daemon what messaging srvid we will use, and register the message
494   handler function in the client
495 */
496 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
497                              ctdb_msg_fn_t handler,
498                              void *private_data)
499                                     
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
513 }
514
515 /*
516   tell the daemon we no longer want a srvid
517 */
518 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
519 {
520         int res;
521         int32_t status;
522         
523         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
524                            tdb_null, NULL, NULL, &status, NULL, NULL);
525         if (res != 0 || status != 0) {
526                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
527                 return -1;
528         }
529
530         /* also need to register the handler with our own ctdb structure */
531         ctdb_deregister_message_handler(ctdb, srvid, private_data);
532         return 0;
533 }
534
535
536 /*
537   send a message - from client context
538  */
539 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
540                       uint64_t srvid, TDB_DATA data)
541 {
542         struct ctdb_req_message *r;
543         int len, res;
544
545         len = offsetof(struct ctdb_req_message, data) + data.dsize;
546         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
547                                len, struct ctdb_req_message);
548         CTDB_NO_MEMORY(ctdb, r);
549
550         r->hdr.destnode  = pnn;
551         r->srvid         = srvid;
552         r->datalen       = data.dsize;
553         memcpy(&r->data[0], data.dptr, data.dsize);
554         
555         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
556         if (res != 0) {
557                 return res;
558         }
559
560         talloc_free(r);
561         return 0;
562 }
563
564
565 /*
566   cancel a ctdb_fetch_lock operation, releasing the lock
567  */
568 static int fetch_lock_destructor(struct ctdb_record_handle *h)
569 {
570         ctdb_ltdb_unlock(h->ctdb_db, h->key);
571         return 0;
572 }
573
574 /*
575   force the migration of a record to this node
576  */
577 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
578 {
579         struct ctdb_call call;
580         ZERO_STRUCT(call);
581         call.call_id = CTDB_NULL_FUNC;
582         call.key = key;
583         call.flags = CTDB_IMMEDIATE_MIGRATION;
584         return ctdb_call(ctdb_db, &call);
585 }
586
587 /*
588   get a lock on a record, and return the records data. Blocks until it gets the lock
589  */
590 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
591                                            TDB_DATA key, TDB_DATA *data)
592 {
593         int ret;
594         struct ctdb_record_handle *h;
595
596         /*
597           procedure is as follows:
598
599           1) get the chain lock. 
600           2) check if we are dmaster
601           3) if we are the dmaster then return handle 
602           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
603              reply from ctdbd
604           5) when we get the reply, goto (1)
605          */
606
607         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
608         if (h == NULL) {
609                 return NULL;
610         }
611
612         h->ctdb_db = ctdb_db;
613         h->key     = key;
614         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
615         if (h->key.dptr == NULL) {
616                 talloc_free(h);
617                 return NULL;
618         }
619         h->data    = data;
620
621         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
622                  (const char *)key.dptr));
623
624 again:
625         /* step 1 - get the chain lock */
626         ret = ctdb_ltdb_lock(ctdb_db, key);
627         if (ret != 0) {
628                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
629                 talloc_free(h);
630                 return NULL;
631         }
632
633         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
634
635         talloc_set_destructor(h, fetch_lock_destructor);
636
637         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
638
639         /* when torturing, ensure we test the remote path */
640         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
641             random() % 5 == 0) {
642                 h->header.dmaster = (uint32_t)-1;
643         }
644
645
646         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
647
648         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
649                 ctdb_ltdb_unlock(ctdb_db, key);
650                 ret = ctdb_client_force_migration(ctdb_db, key);
651                 if (ret != 0) {
652                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
653                         talloc_free(h);
654                         return NULL;
655                 }
656                 goto again;
657         }
658
659         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
660         return h;
661 }
662
663 /*
664   store some data to the record that was locked with ctdb_fetch_lock()
665 */
666 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
667 {
668         if (h->ctdb_db->persistent) {
669                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
670                 return -1;
671         }
672
673         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
674 }
675
676 /*
677   non-locking fetch of a record
678  */
679 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
680                TDB_DATA key, TDB_DATA *data)
681 {
682         struct ctdb_call call;
683         int ret;
684
685         call.call_id = CTDB_FETCH_FUNC;
686         call.call_data.dptr = NULL;
687         call.call_data.dsize = 0;
688
689         ret = ctdb_call(ctdb_db, &call);
690
691         if (ret == 0) {
692                 *data = call.reply_data;
693                 talloc_steal(mem_ctx, data->dptr);
694         }
695
696         return ret;
697 }
698
699
700
701 /*
702    called when a control completes or timesout to invoke the callback
703    function the user provided
704 */
705 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
706         struct timeval t, void *private_data)
707 {
708         struct ctdb_client_control_state *state;
709         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
710         int ret;
711
712         state = talloc_get_type(private_data, struct ctdb_client_control_state);
713         talloc_steal(tmp_ctx, state);
714
715         ret = ctdb_control_recv(state->ctdb, state, state,
716                         NULL, 
717                         NULL, 
718                         NULL);
719
720         talloc_free(tmp_ctx);
721 }
722
723 /*
724   called when a CTDB_REPLY_CONTROL packet comes in in the client
725
726   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
727   contains any reply data from the control
728 */
729 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
730                                       struct ctdb_req_header *hdr)
731 {
732         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
733         struct ctdb_client_control_state *state;
734
735         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
736         if (state == NULL) {
737                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
738                 return;
739         }
740
741         if (hdr->reqid != state->reqid) {
742                 /* we found a record  but it was the wrong one */
743                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
744                 return;
745         }
746
747         state->outdata.dptr = c->data;
748         state->outdata.dsize = c->datalen;
749         state->status = c->status;
750         if (c->errorlen) {
751                 state->errormsg = talloc_strndup(state, 
752                                                  (char *)&c->data[c->datalen], 
753                                                  c->errorlen);
754         }
755
756         /* state->outdata now uses resources from c so we dont want c
757            to just dissappear from under us while state is still alive
758         */
759         talloc_steal(state, c);
760
761         state->state = CTDB_CONTROL_DONE;
762
763         /* if we had a callback registered for this control, pull the response
764            and call the callback.
765         */
766         if (state->async.fn) {
767                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
768         }
769 }
770
771
772 /*
773   destroy a ctdb_control in client
774 */
775 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
776 {
777         ctdb_reqid_remove(state->ctdb, state->reqid);
778         return 0;
779 }
780
781
782 /* time out handler for ctdb_control */
783 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
784         struct timeval t, void *private_data)
785 {
786         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
787
788         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
789                          "dstnode:%u\n", state->reqid, state->c->opcode,
790                          state->c->hdr.destnode));
791
792         state->state = CTDB_CONTROL_TIMEOUT;
793
794         /* if we had a callback registered for this control, pull the response
795            and call the callback.
796         */
797         if (state->async.fn) {
798                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
799         }
800 }
801
802 /* async version of send control request */
803 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
804                 uint32_t destnode, uint64_t srvid, 
805                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
806                 TALLOC_CTX *mem_ctx,
807                 struct timeval *timeout,
808                 char **errormsg)
809 {
810         struct ctdb_client_control_state *state;
811         size_t len;
812         struct ctdb_req_control *c;
813         int ret;
814
815         if (errormsg) {
816                 *errormsg = NULL;
817         }
818
819         /* if the domain socket is not yet open, open it */
820         if (ctdb->daemon.sd==-1) {
821                 ctdb_socket_connect(ctdb);
822         }
823
824         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
825         CTDB_NO_MEMORY_NULL(ctdb, state);
826
827         state->ctdb       = ctdb;
828         state->reqid      = ctdb_reqid_new(ctdb, state);
829         state->state      = CTDB_CONTROL_WAIT;
830         state->errormsg   = NULL;
831
832         talloc_set_destructor(state, ctdb_control_destructor);
833
834         len = offsetof(struct ctdb_req_control, data) + data.dsize;
835         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
836                                len, struct ctdb_req_control);
837         state->c            = c;        
838         CTDB_NO_MEMORY_NULL(ctdb, c);
839         c->hdr.reqid        = state->reqid;
840         c->hdr.destnode     = destnode;
841         c->opcode           = opcode;
842         c->client_id        = 0;
843         c->flags            = flags;
844         c->srvid            = srvid;
845         c->datalen          = data.dsize;
846         if (data.dsize) {
847                 memcpy(&c->data[0], data.dptr, data.dsize);
848         }
849
850         /* timeout */
851         if (timeout && !timeval_is_zero(timeout)) {
852                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
853         }
854
855         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
856         if (ret != 0) {
857                 talloc_free(state);
858                 return NULL;
859         }
860
861         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
862                 talloc_free(state);
863                 return NULL;
864         }
865
866         return state;
867 }
868
869
870 /* async version of receive control reply */
871 int ctdb_control_recv(struct ctdb_context *ctdb, 
872                 struct ctdb_client_control_state *state, 
873                 TALLOC_CTX *mem_ctx,
874                 TDB_DATA *outdata, int32_t *status, char **errormsg)
875 {
876         TALLOC_CTX *tmp_ctx;
877
878         if (status != NULL) {
879                 *status = -1;
880         }
881         if (errormsg != NULL) {
882                 *errormsg = NULL;
883         }
884
885         if (state == NULL) {
886                 return -1;
887         }
888
889         /* prevent double free of state */
890         tmp_ctx = talloc_new(ctdb);
891         talloc_steal(tmp_ctx, state);
892
893         /* loop one event at a time until we either timeout or the control
894            completes.
895         */
896         while (state->state == CTDB_CONTROL_WAIT) {
897                 event_loop_once(ctdb->ev);
898         }
899
900         if (state->state != CTDB_CONTROL_DONE) {
901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
902                 if (state->async.fn) {
903                         state->async.fn(state);
904                 }
905                 talloc_free(tmp_ctx);
906                 return -1;
907         }
908
909         if (state->errormsg) {
910                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
911                 if (errormsg) {
912                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
913                 }
914                 if (state->async.fn) {
915                         state->async.fn(state);
916                 }
917                 talloc_free(tmp_ctx);
918                 return -1;
919         }
920
921         if (outdata) {
922                 *outdata = state->outdata;
923                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
924         }
925
926         if (status) {
927                 *status = state->status;
928         }
929
930         if (state->async.fn) {
931                 state->async.fn(state);
932         }
933
934         talloc_free(tmp_ctx);
935         return 0;
936 }
937
938
939
940 /*
941   send a ctdb control message
942   timeout specifies how long we should wait for a reply.
943   if timeout is NULL we wait indefinitely
944  */
945 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
946                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
947                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
948                  struct timeval *timeout,
949                  char **errormsg)
950 {
951         struct ctdb_client_control_state *state;
952
953         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
954                         flags, data, mem_ctx,
955                         timeout, errormsg);
956         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
957                         errormsg);
958 }
959
960
961
962
963 /*
964   a process exists call. Returns 0 if process exists, -1 otherwise
965  */
966 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
967 {
968         int ret;
969         TDB_DATA data;
970         int32_t status;
971
972         data.dptr = (uint8_t*)&pid;
973         data.dsize = sizeof(pid);
974
975         ret = ctdb_control(ctdb, destnode, 0, 
976                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
977                            NULL, NULL, &status, NULL, NULL);
978         if (ret != 0) {
979                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
980                 return -1;
981         }
982
983         return status;
984 }
985
986 /*
987   get remote statistics
988  */
989 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
990 {
991         int ret;
992         TDB_DATA data;
993         int32_t res;
994
995         ret = ctdb_control(ctdb, destnode, 0, 
996                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
997                            ctdb, &data, &res, NULL, NULL);
998         if (ret != 0 || res != 0) {
999                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1000                 return -1;
1001         }
1002
1003         if (data.dsize != sizeof(struct ctdb_statistics)) {
1004                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1005                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1006                       return -1;
1007         }
1008
1009         *status = *(struct ctdb_statistics *)data.dptr;
1010         talloc_free(data.dptr);
1011                         
1012         return 0;
1013 }
1014
1015 /*
1016   shutdown a remote ctdb node
1017  */
1018 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1019 {
1020         struct ctdb_client_control_state *state;
1021
1022         state = ctdb_control_send(ctdb, destnode, 0, 
1023                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1024                            NULL, &timeout, NULL);
1025         if (state == NULL) {
1026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1027                 return -1;
1028         }
1029
1030         return 0;
1031 }
1032
1033 /*
1034   get vnn map from a remote node
1035  */
1036 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1037 {
1038         int ret;
1039         TDB_DATA outdata;
1040         int32_t res;
1041         struct ctdb_vnn_map_wire *map;
1042
1043         ret = ctdb_control(ctdb, destnode, 0, 
1044                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1045                            mem_ctx, &outdata, &res, &timeout, NULL);
1046         if (ret != 0 || res != 0) {
1047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1048                 return -1;
1049         }
1050         
1051         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1052         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1053             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1054                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1055                 return -1;
1056         }
1057
1058         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1059         CTDB_NO_MEMORY(ctdb, *vnnmap);
1060         (*vnnmap)->generation = map->generation;
1061         (*vnnmap)->size       = map->size;
1062         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1063
1064         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1065         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1066         talloc_free(outdata.dptr);
1067                     
1068         return 0;
1069 }
1070
1071
1072 /*
1073   get the recovery mode of a remote node
1074  */
1075 struct ctdb_client_control_state *
1076 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1077 {
1078         return ctdb_control_send(ctdb, destnode, 0, 
1079                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1080                            mem_ctx, &timeout, NULL);
1081 }
1082
1083 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1084 {
1085         int ret;
1086         int32_t res;
1087
1088         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1089         if (ret != 0) {
1090                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1091                 return -1;
1092         }
1093
1094         if (recmode) {
1095                 *recmode = (uint32_t)res;
1096         }
1097
1098         return 0;
1099 }
1100
1101 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1102 {
1103         struct ctdb_client_control_state *state;
1104
1105         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1106         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1107 }
1108
1109
1110
1111
1112 /*
1113   set the recovery mode of a remote node
1114  */
1115 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1116 {
1117         int ret;
1118         TDB_DATA data;
1119         int32_t res;
1120
1121         data.dsize = sizeof(uint32_t);
1122         data.dptr = (unsigned char *)&recmode;
1123
1124         ret = ctdb_control(ctdb, destnode, 0, 
1125                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1126                            NULL, NULL, &res, &timeout, NULL);
1127         if (ret != 0 || res != 0) {
1128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1129                 return -1;
1130         }
1131
1132         return 0;
1133 }
1134
1135
1136
1137 /*
1138   get the recovery master of a remote node
1139  */
1140 struct ctdb_client_control_state *
1141 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1142                         struct timeval timeout, uint32_t destnode)
1143 {
1144         return ctdb_control_send(ctdb, destnode, 0, 
1145                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1146                            mem_ctx, &timeout, NULL);
1147 }
1148
1149 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1150 {
1151         int ret;
1152         int32_t res;
1153
1154         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1155         if (ret != 0) {
1156                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1157                 return -1;
1158         }
1159
1160         if (recmaster) {
1161                 *recmaster = (uint32_t)res;
1162         }
1163
1164         return 0;
1165 }
1166
1167 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1168 {
1169         struct ctdb_client_control_state *state;
1170
1171         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1172         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1173 }
1174
1175
1176 /*
1177   set the recovery master of a remote node
1178  */
1179 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1180 {
1181         int ret;
1182         TDB_DATA data;
1183         int32_t res;
1184
1185         ZERO_STRUCT(data);
1186         data.dsize = sizeof(uint32_t);
1187         data.dptr = (unsigned char *)&recmaster;
1188
1189         ret = ctdb_control(ctdb, destnode, 0, 
1190                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1191                            NULL, NULL, &res, &timeout, NULL);
1192         if (ret != 0 || res != 0) {
1193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1194                 return -1;
1195         }
1196
1197         return 0;
1198 }
1199
1200
1201 /*
1202   get a list of databases off a remote node
1203  */
1204 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1205                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1206 {
1207         int ret;
1208         TDB_DATA outdata;
1209         int32_t res;
1210
1211         ret = ctdb_control(ctdb, destnode, 0, 
1212                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1213                            mem_ctx, &outdata, &res, &timeout, NULL);
1214         if (ret != 0 || res != 0) {
1215                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1216                 return -1;
1217         }
1218
1219         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1220         talloc_free(outdata.dptr);
1221                     
1222         return 0;
1223 }
1224
1225 /*
1226   get a list of nodes (vnn and flags ) from a remote node
1227  */
1228 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1229                 struct timeval timeout, uint32_t destnode, 
1230                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1231 {
1232         int ret;
1233         TDB_DATA outdata;
1234         int32_t res;
1235
1236         ret = ctdb_control(ctdb, destnode, 0, 
1237                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1238                            mem_ctx, &outdata, &res, &timeout, NULL);
1239         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1240                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1241                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1242         }
1243         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1245                 return -1;
1246         }
1247
1248         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1249         talloc_free(outdata.dptr);
1250                     
1251         return 0;
1252 }
1253
1254 /*
1255   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1256  */
1257 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1258                 struct timeval timeout, uint32_t destnode, 
1259                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1260 {
1261         int ret, i, len;
1262         TDB_DATA outdata;
1263         struct ctdb_node_mapv4 *nodemapv4;
1264         int32_t res;
1265
1266         ret = ctdb_control(ctdb, destnode, 0, 
1267                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1268                            mem_ctx, &outdata, &res, &timeout, NULL);
1269         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1271                 return -1;
1272         }
1273
1274         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1275
1276         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1277         (*nodemap) = talloc_zero_size(mem_ctx, len);
1278         CTDB_NO_MEMORY(ctdb, (*nodemap));
1279
1280         (*nodemap)->num = nodemapv4->num;
1281         for (i=0; i<nodemapv4->num; i++) {
1282                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1283                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1284                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1285                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1286         }
1287                 
1288         talloc_free(outdata.dptr);
1289                     
1290         return 0;
1291 }
1292
1293 /*
1294   drop the transport, reload the nodes file and restart the transport
1295  */
1296 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1297                     struct timeval timeout, uint32_t destnode)
1298 {
1299         int ret;
1300         int32_t res;
1301
1302         ret = ctdb_control(ctdb, destnode, 0, 
1303                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1304                            NULL, NULL, &res, &timeout, NULL);
1305         if (ret != 0 || res != 0) {
1306                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1307                 return -1;
1308         }
1309
1310         return 0;
1311 }
1312
1313
1314 /*
1315   set vnn map on a node
1316  */
1317 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1318                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1319 {
1320         int ret;
1321         TDB_DATA data;
1322         int32_t res;
1323         struct ctdb_vnn_map_wire *map;
1324         size_t len;
1325
1326         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1327         map = talloc_size(mem_ctx, len);
1328         CTDB_NO_MEMORY(ctdb, map);
1329
1330         map->generation = vnnmap->generation;
1331         map->size = vnnmap->size;
1332         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1333         
1334         data.dsize = len;
1335         data.dptr  = (uint8_t *)map;
1336
1337         ret = ctdb_control(ctdb, destnode, 0, 
1338                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1339                            NULL, NULL, &res, &timeout, NULL);
1340         if (ret != 0 || res != 0) {
1341                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1342                 return -1;
1343         }
1344
1345         talloc_free(map);
1346
1347         return 0;
1348 }
1349
1350
1351 /*
1352   async send for pull database
1353  */
1354 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1355         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1356         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1357 {
1358         TDB_DATA indata;
1359         struct ctdb_control_pulldb *pull;
1360         struct ctdb_client_control_state *state;
1361
1362         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1363         CTDB_NO_MEMORY_NULL(ctdb, pull);
1364
1365         pull->db_id   = dbid;
1366         pull->lmaster = lmaster;
1367
1368         indata.dsize = sizeof(struct ctdb_control_pulldb);
1369         indata.dptr  = (unsigned char *)pull;
1370
1371         state = ctdb_control_send(ctdb, destnode, 0, 
1372                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1373                                   mem_ctx, &timeout, NULL);
1374         talloc_free(pull);
1375
1376         return state;
1377 }
1378
1379 /*
1380   async recv for pull database
1381  */
1382 int ctdb_ctrl_pulldb_recv(
1383         struct ctdb_context *ctdb, 
1384         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1385         TDB_DATA *outdata)
1386 {
1387         int ret;
1388         int32_t res;
1389
1390         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1391         if ( (ret != 0) || (res != 0) ){
1392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1393                 return -1;
1394         }
1395
1396         return 0;
1397 }
1398
1399 /*
1400   pull all keys and records for a specific database on a node
1401  */
1402 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1403                 uint32_t dbid, uint32_t lmaster, 
1404                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1405                 TDB_DATA *outdata)
1406 {
1407         struct ctdb_client_control_state *state;
1408
1409         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1410                                       timeout);
1411         
1412         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1413 }
1414
1415
1416 /*
1417   change dmaster for all keys in the database to the new value
1418  */
1419 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1420                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1421 {
1422         int ret;
1423         TDB_DATA indata;
1424         int32_t res;
1425
1426         indata.dsize = 2*sizeof(uint32_t);
1427         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1428
1429         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1430         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, 
1433                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1434                            NULL, NULL, &res, &timeout, NULL);
1435         if (ret != 0 || res != 0) {
1436                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1437                 return -1;
1438         }
1439
1440         return 0;
1441 }
1442
1443 /*
1444   ping a node, return number of clients connected
1445  */
1446 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1447 {
1448         int ret;
1449         int32_t res;
1450
1451         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1452                            tdb_null, NULL, NULL, &res, NULL, NULL);
1453         if (ret != 0) {
1454                 return -1;
1455         }
1456         return res;
1457 }
1458
1459 /*
1460   find the real path to a ltdb 
1461  */
1462 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1463                    const char **path)
1464 {
1465         int ret;
1466         int32_t res;
1467         TDB_DATA data;
1468
1469         data.dptr = (uint8_t *)&dbid;
1470         data.dsize = sizeof(dbid);
1471
1472         ret = ctdb_control(ctdb, destnode, 0, 
1473                            CTDB_CONTROL_GETDBPATH, 0, data, 
1474                            mem_ctx, &data, &res, &timeout, NULL);
1475         if (ret != 0 || res != 0) {
1476                 return -1;
1477         }
1478
1479         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1480         if ((*path) == NULL) {
1481                 return -1;
1482         }
1483
1484         talloc_free(data.dptr);
1485
1486         return 0;
1487 }
1488
1489 /*
1490   find the name of a db 
1491  */
1492 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1493                    const char **name)
1494 {
1495         int ret;
1496         int32_t res;
1497         TDB_DATA data;
1498
1499         data.dptr = (uint8_t *)&dbid;
1500         data.dsize = sizeof(dbid);
1501
1502         ret = ctdb_control(ctdb, destnode, 0, 
1503                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1504                            mem_ctx, &data, &res, &timeout, NULL);
1505         if (ret != 0 || res != 0) {
1506                 return -1;
1507         }
1508
1509         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1510         if ((*name) == NULL) {
1511                 return -1;
1512         }
1513
1514         talloc_free(data.dptr);
1515
1516         return 0;
1517 }
1518
1519 /*
1520   get the health status of a db
1521  */
1522 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1523                           struct timeval timeout,
1524                           uint32_t destnode,
1525                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1526                           const char **reason)
1527 {
1528         int ret;
1529         int32_t res;
1530         TDB_DATA data;
1531
1532         data.dptr = (uint8_t *)&dbid;
1533         data.dsize = sizeof(dbid);
1534
1535         ret = ctdb_control(ctdb, destnode, 0,
1536                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1537                            mem_ctx, &data, &res, &timeout, NULL);
1538         if (ret != 0 || res != 0) {
1539                 return -1;
1540         }
1541
1542         if (data.dsize == 0) {
1543                 (*reason) = NULL;
1544                 return 0;
1545         }
1546
1547         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1548         if ((*reason) == NULL) {
1549                 return -1;
1550         }
1551
1552         talloc_free(data.dptr);
1553
1554         return 0;
1555 }
1556
1557 /*
1558   create a database
1559  */
1560 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1561                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1562 {
1563         int ret;
1564         int32_t res;
1565         TDB_DATA data;
1566
1567         data.dptr = discard_const(name);
1568         data.dsize = strlen(name)+1;
1569
1570         ret = ctdb_control(ctdb, destnode, 0, 
1571                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1572                            0, data, 
1573                            mem_ctx, &data, &res, &timeout, NULL);
1574
1575         if (ret != 0 || res != 0) {
1576                 return -1;
1577         }
1578
1579         return 0;
1580 }
1581
1582 /*
1583   get debug level on a node
1584  */
1585 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1586 {
1587         int ret;
1588         int32_t res;
1589         TDB_DATA data;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1592                            ctdb, &data, &res, NULL, NULL);
1593         if (ret != 0 || res != 0) {
1594                 return -1;
1595         }
1596         if (data.dsize != sizeof(int32_t)) {
1597                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1598                          (unsigned)data.dsize));
1599                 return -1;
1600         }
1601         *level = *(int32_t *)data.dptr;
1602         talloc_free(data.dptr);
1603         return 0;
1604 }
1605
1606 /*
1607   set debug level on a node
1608  */
1609 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1610 {
1611         int ret;
1612         int32_t res;
1613         TDB_DATA data;
1614
1615         data.dptr = (uint8_t *)&level;
1616         data.dsize = sizeof(level);
1617
1618         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1619                            NULL, NULL, &res, NULL, NULL);
1620         if (ret != 0 || res != 0) {
1621                 return -1;
1622         }
1623         return 0;
1624 }
1625
1626
1627 /*
1628   get a list of connected nodes
1629  */
1630 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1631                                 struct timeval timeout,
1632                                 TALLOC_CTX *mem_ctx,
1633                                 uint32_t *num_nodes)
1634 {
1635         struct ctdb_node_map *map=NULL;
1636         int ret, i;
1637         uint32_t *nodes;
1638
1639         *num_nodes = 0;
1640
1641         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1642         if (ret != 0) {
1643                 return NULL;
1644         }
1645
1646         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1647         if (nodes == NULL) {
1648                 return NULL;
1649         }
1650
1651         for (i=0;i<map->num;i++) {
1652                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1653                         nodes[*num_nodes] = map->nodes[i].pnn;
1654                         (*num_nodes)++;
1655                 }
1656         }
1657
1658         return nodes;
1659 }
1660
1661
1662 /*
1663   reset remote status
1664  */
1665 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1666 {
1667         int ret;
1668         int32_t res;
1669
1670         ret = ctdb_control(ctdb, destnode, 0, 
1671                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1672                            NULL, NULL, &res, NULL, NULL);
1673         if (ret != 0 || res != 0) {
1674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1675                 return -1;
1676         }
1677         return 0;
1678 }
1679
1680 /*
1681   this is the dummy null procedure that all databases support
1682 */
1683 static int ctdb_null_func(struct ctdb_call_info *call)
1684 {
1685         return 0;
1686 }
1687
1688 /*
1689   this is a plain fetch procedure that all databases support
1690 */
1691 static int ctdb_fetch_func(struct ctdb_call_info *call)
1692 {
1693         call->reply_data = &call->record_data;
1694         return 0;
1695 }
1696
1697 /*
1698   this is a plain fetch procedure that all databases support
1699   this returns the full record including the ltdb header
1700 */
1701 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1702 {
1703         call->reply_data = talloc(call, TDB_DATA);
1704         if (call->reply_data == NULL) {
1705                 return -1;
1706         }
1707         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1708         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1709         if (call->reply_data->dptr == NULL) {
1710                 return -1;
1711         }
1712         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1713         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1714
1715         return 0;
1716 }
1717
1718 /*
1719   attach to a specific database - client call
1720 */
1721 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1722 {
1723         struct ctdb_db_context *ctdb_db;
1724         TDB_DATA data;
1725         int ret;
1726         int32_t res;
1727
1728         ctdb_db = ctdb_db_handle(ctdb, name);
1729         if (ctdb_db) {
1730                 return ctdb_db;
1731         }
1732
1733         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1734         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1735
1736         ctdb_db->ctdb = ctdb;
1737         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1738         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1739
1740         data.dptr = discard_const(name);
1741         data.dsize = strlen(name)+1;
1742
1743         /* tell ctdb daemon to attach */
1744         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1745                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1746                            0, data, ctdb_db, &data, &res, NULL, NULL);
1747         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1748                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1749                 talloc_free(ctdb_db);
1750                 return NULL;
1751         }
1752         
1753         ctdb_db->db_id = *(uint32_t *)data.dptr;
1754         talloc_free(data.dptr);
1755
1756         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1757         if (ret != 0) {
1758                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1759                 talloc_free(ctdb_db);
1760                 return NULL;
1761         }
1762
1763         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1764         if (ctdb->valgrinding) {
1765                 tdb_flags |= TDB_NOMMAP;
1766         }
1767         tdb_flags |= TDB_DISALLOW_NESTING;
1768
1769         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1770         if (ctdb_db->ltdb == NULL) {
1771                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1772                 talloc_free(ctdb_db);
1773                 return NULL;
1774         }
1775
1776         ctdb_db->persistent = persistent;
1777
1778         DLIST_ADD(ctdb->db_list, ctdb_db);
1779
1780         /* add well known functions */
1781         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1782         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1783         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1784
1785         return ctdb_db;
1786 }
1787
1788
1789 /*
1790   setup a call for a database
1791  */
1792 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1793 {
1794         struct ctdb_registered_call *call;
1795
1796 #if 0
1797         TDB_DATA data;
1798         int32_t status;
1799         struct ctdb_control_set_call c;
1800         int ret;
1801
1802         /* this is no longer valid with the separate daemon architecture */
1803         c.db_id = ctdb_db->db_id;
1804         c.fn    = fn;
1805         c.id    = id;
1806
1807         data.dptr = (uint8_t *)&c;
1808         data.dsize = sizeof(c);
1809
1810         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1811                            data, NULL, NULL, &status, NULL, NULL);
1812         if (ret != 0 || status != 0) {
1813                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1814                 return -1;
1815         }
1816 #endif
1817
1818         /* also register locally */
1819         call = talloc(ctdb_db, struct ctdb_registered_call);
1820         call->fn = fn;
1821         call->id = id;
1822
1823         DLIST_ADD(ctdb_db->calls, call);        
1824         return 0;
1825 }
1826
1827
1828 struct traverse_state {
1829         bool done;
1830         uint32_t count;
1831         ctdb_traverse_func fn;
1832         void *private_data;
1833 };
1834
1835 /*
1836   called on each key during a ctdb_traverse
1837  */
1838 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1839 {
1840         struct traverse_state *state = (struct traverse_state *)p;
1841         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1842         TDB_DATA key;
1843
1844         if (data.dsize < sizeof(uint32_t) ||
1845             d->length != data.dsize) {
1846                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1847                 state->done = True;
1848                 return;
1849         }
1850
1851         key.dsize = d->keylen;
1852         key.dptr  = &d->data[0];
1853         data.dsize = d->datalen;
1854         data.dptr = &d->data[d->keylen];
1855
1856         if (key.dsize == 0 && data.dsize == 0) {
1857                 /* end of traverse */
1858                 state->done = True;
1859                 return;
1860         }
1861
1862         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1863                 /* empty records are deleted records in ctdb */
1864                 return;
1865         }
1866
1867         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1868                 state->done = True;
1869         }
1870
1871         state->count++;
1872 }
1873
1874
1875 /*
1876   start a cluster wide traverse, calling the supplied fn on each record
1877   return the number of records traversed, or -1 on error
1878  */
1879 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1880 {
1881         TDB_DATA data;
1882         struct ctdb_traverse_start t;
1883         int32_t status;
1884         int ret;
1885         uint64_t srvid = (getpid() | 0xFLL<<60);
1886         struct traverse_state state;
1887
1888         state.done = False;
1889         state.count = 0;
1890         state.private_data = private_data;
1891         state.fn = fn;
1892
1893         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1894         if (ret != 0) {
1895                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1896                 return -1;
1897         }
1898
1899         t.db_id = ctdb_db->db_id;
1900         t.srvid = srvid;
1901         t.reqid = 0;
1902
1903         data.dptr = (uint8_t *)&t;
1904         data.dsize = sizeof(t);
1905
1906         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1907                            data, NULL, NULL, &status, NULL, NULL);
1908         if (ret != 0 || status != 0) {
1909                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1910                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1911                 return -1;
1912         }
1913
1914         while (!state.done) {
1915                 event_loop_once(ctdb_db->ctdb->ev);
1916         }
1917
1918         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1919         if (ret != 0) {
1920                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1921                 return -1;
1922         }
1923
1924         return state.count;
1925 }
1926
1927 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1928 /*
1929   called on each key during a catdb
1930  */
1931 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1932 {
1933         int i;
1934         FILE *f = (FILE *)p;
1935         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1936
1937         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1938         for (i=0;i<key.dsize;i++) {
1939                 if (ISASCII(key.dptr[i])) {
1940                         fprintf(f, "%c", key.dptr[i]);
1941                 } else {
1942                         fprintf(f, "\\%02X", key.dptr[i]);
1943                 }
1944         }
1945         fprintf(f, "\"\n");
1946
1947         fprintf(f, "dmaster: %u\n", h->dmaster);
1948         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1949
1950         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1951         for (i=sizeof(*h);i<data.dsize;i++) {
1952                 if (ISASCII(data.dptr[i])) {
1953                         fprintf(f, "%c", data.dptr[i]);
1954                 } else {
1955                         fprintf(f, "\\%02X", data.dptr[i]);
1956                 }
1957         }
1958         fprintf(f, "\"\n");
1959
1960         fprintf(f, "\n");
1961
1962         return 0;
1963 }
1964
1965 /*
1966   convenience function to list all keys to stdout
1967  */
1968 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1969 {
1970         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1971 }
1972
1973 /*
1974   get the pid of a ctdb daemon
1975  */
1976 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1977 {
1978         int ret;
1979         int32_t res;
1980
1981         ret = ctdb_control(ctdb, destnode, 0, 
1982                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1983                            NULL, NULL, &res, &timeout, NULL);
1984         if (ret != 0) {
1985                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1986                 return -1;
1987         }
1988
1989         *pid = res;
1990
1991         return 0;
1992 }
1993
1994
1995 /*
1996   async freeze send control
1997  */
1998 struct ctdb_client_control_state *
1999 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2000 {
2001         return ctdb_control_send(ctdb, destnode, priority, 
2002                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2003                            mem_ctx, &timeout, NULL);
2004 }
2005
2006 /* 
2007    async freeze recv control
2008 */
2009 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2010 {
2011         int ret;
2012         int32_t res;
2013
2014         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2015         if ( (ret != 0) || (res != 0) ){
2016                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2017                 return -1;
2018         }
2019
2020         return 0;
2021 }
2022
2023 /*
2024   freeze databases of a certain priority
2025  */
2026 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2027 {
2028         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2029         struct ctdb_client_control_state *state;
2030         int ret;
2031
2032         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2033         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2034         talloc_free(tmp_ctx);
2035
2036         return ret;
2037 }
2038
2039 /* Freeze all databases */
2040 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2041 {
2042         int i;
2043
2044         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2045                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2046                         return -1;
2047                 }
2048         }
2049         return 0;
2050 }
2051
2052 /*
2053   thaw databases of a certain priority
2054  */
2055 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2056 {
2057         int ret;
2058         int32_t res;
2059
2060         ret = ctdb_control(ctdb, destnode, priority, 
2061                            CTDB_CONTROL_THAW, 0, tdb_null, 
2062                            NULL, NULL, &res, &timeout, NULL);
2063         if (ret != 0 || res != 0) {
2064                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2065                 return -1;
2066         }
2067
2068         return 0;
2069 }
2070
2071 /* thaw all databases */
2072 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2073 {
2074         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2075 }
2076
2077 /*
2078   get pnn of a node, or -1
2079  */
2080 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2081 {
2082         int ret;
2083         int32_t res;
2084
2085         ret = ctdb_control(ctdb, destnode, 0, 
2086                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2087                            NULL, NULL, &res, &timeout, NULL);
2088         if (ret != 0) {
2089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2090                 return -1;
2091         }
2092
2093         return res;
2094 }
2095
2096 /*
2097   get the monitoring mode of a remote node
2098  */
2099 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2100 {
2101         int ret;
2102         int32_t res;
2103
2104         ret = ctdb_control(ctdb, destnode, 0, 
2105                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2106                            NULL, NULL, &res, &timeout, NULL);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2109                 return -1;
2110         }
2111
2112         *monmode = res;
2113
2114         return 0;
2115 }
2116
2117
2118 /*
2119  set the monitoring mode of a remote node to active
2120  */
2121 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2122 {
2123         int ret;
2124         
2125
2126         ret = ctdb_control(ctdb, destnode, 0, 
2127                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2128                            NULL, NULL,NULL, &timeout, NULL);
2129         if (ret != 0) {
2130                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2131                 return -1;
2132         }
2133
2134         
2135
2136         return 0;
2137 }
2138
2139 /*
2140   set the monitoring mode of a remote node to disable
2141  */
2142 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2143 {
2144         int ret;
2145         
2146
2147         ret = ctdb_control(ctdb, destnode, 0, 
2148                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2149                            NULL, NULL, NULL, &timeout, NULL);
2150         if (ret != 0) {
2151                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2152                 return -1;
2153         }
2154
2155         
2156
2157         return 0;
2158 }
2159
2160
2161
2162 /* 
2163   sent to a node to make it take over an ip address
2164 */
2165 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2166                           uint32_t destnode, struct ctdb_public_ip *ip)
2167 {
2168         TDB_DATA data;
2169         struct ctdb_public_ipv4 ipv4;
2170         int ret;
2171         int32_t res;
2172
2173         if (ip->addr.sa.sa_family == AF_INET) {
2174                 ipv4.pnn = ip->pnn;
2175                 ipv4.sin = ip->addr.ip;
2176
2177                 data.dsize = sizeof(ipv4);
2178                 data.dptr  = (uint8_t *)&ipv4;
2179
2180                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2181                            NULL, &res, &timeout, NULL);
2182         } else {
2183                 data.dsize = sizeof(*ip);
2184                 data.dptr  = (uint8_t *)ip;
2185
2186                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2187                            NULL, &res, &timeout, NULL);
2188         }
2189
2190         if (ret != 0 || res != 0) {
2191                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2192                 return -1;
2193         }
2194
2195         return 0;       
2196 }
2197
2198
2199 /* 
2200   sent to a node to make it release an ip address
2201 */
2202 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2203                          uint32_t destnode, struct ctdb_public_ip *ip)
2204 {
2205         TDB_DATA data;
2206         struct ctdb_public_ipv4 ipv4;
2207         int ret;
2208         int32_t res;
2209
2210         if (ip->addr.sa.sa_family == AF_INET) {
2211                 ipv4.pnn = ip->pnn;
2212                 ipv4.sin = ip->addr.ip;
2213
2214                 data.dsize = sizeof(ipv4);
2215                 data.dptr  = (uint8_t *)&ipv4;
2216
2217                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2218                                    NULL, &res, &timeout, NULL);
2219         } else {
2220                 data.dsize = sizeof(*ip);
2221                 data.dptr  = (uint8_t *)ip;
2222
2223                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2224                                    NULL, &res, &timeout, NULL);
2225         }
2226
2227         if (ret != 0 || res != 0) {
2228                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2229                 return -1;
2230         }
2231
2232         return 0;       
2233 }
2234
2235
2236 /*
2237   get a tunable
2238  */
2239 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2240                           struct timeval timeout, 
2241                           uint32_t destnode,
2242                           const char *name, uint32_t *value)
2243 {
2244         struct ctdb_control_get_tunable *t;
2245         TDB_DATA data, outdata;
2246         int32_t res;
2247         int ret;
2248
2249         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2250         data.dptr  = talloc_size(ctdb, data.dsize);
2251         CTDB_NO_MEMORY(ctdb, data.dptr);
2252
2253         t = (struct ctdb_control_get_tunable *)data.dptr;
2254         t->length = strlen(name)+1;
2255         memcpy(t->name, name, t->length);
2256
2257         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2258                            &outdata, &res, &timeout, NULL);
2259         talloc_free(data.dptr);
2260         if (ret != 0 || res != 0) {
2261                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2262                 return -1;
2263         }
2264
2265         if (outdata.dsize != sizeof(uint32_t)) {
2266                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2267                 talloc_free(outdata.dptr);
2268                 return -1;
2269         }
2270         
2271         *value = *(uint32_t *)outdata.dptr;
2272         talloc_free(outdata.dptr);
2273
2274         return 0;
2275 }
2276
2277 /*
2278   set a tunable
2279  */
2280 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2281                           struct timeval timeout, 
2282                           uint32_t destnode,
2283                           const char *name, uint32_t value)
2284 {
2285         struct ctdb_control_set_tunable *t;
2286         TDB_DATA data;
2287         int32_t res;
2288         int ret;
2289
2290         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2291         data.dptr  = talloc_size(ctdb, data.dsize);
2292         CTDB_NO_MEMORY(ctdb, data.dptr);
2293
2294         t = (struct ctdb_control_set_tunable *)data.dptr;
2295         t->length = strlen(name)+1;
2296         memcpy(t->name, name, t->length);
2297         t->value = value;
2298
2299         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2300                            NULL, &res, &timeout, NULL);
2301         talloc_free(data.dptr);
2302         if (ret != 0 || res != 0) {
2303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2304                 return -1;
2305         }
2306
2307         return 0;
2308 }
2309
2310 /*
2311   list tunables
2312  */
2313 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2314                             struct timeval timeout, 
2315                             uint32_t destnode,
2316                             TALLOC_CTX *mem_ctx,
2317                             const char ***list, uint32_t *count)
2318 {
2319         TDB_DATA outdata;
2320         int32_t res;
2321         int ret;
2322         struct ctdb_control_list_tunable *t;
2323         char *p, *s, *ptr;
2324
2325         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2326                            mem_ctx, &outdata, &res, &timeout, NULL);
2327         if (ret != 0 || res != 0) {
2328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2329                 return -1;
2330         }
2331
2332         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2333         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2334             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2335                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2336                 talloc_free(outdata.dptr);
2337                 return -1;              
2338         }
2339         
2340         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2341         CTDB_NO_MEMORY(ctdb, p);
2342
2343         talloc_free(outdata.dptr);
2344         
2345         (*list) = NULL;
2346         (*count) = 0;
2347
2348         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2349                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2350                 CTDB_NO_MEMORY(ctdb, *list);
2351                 (*list)[*count] = talloc_strdup(*list, s);
2352                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2353                 (*count)++;
2354         }
2355
2356         talloc_free(p);
2357
2358         return 0;
2359 }
2360
2361
2362 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2363                                    struct timeval timeout, uint32_t destnode,
2364                                    TALLOC_CTX *mem_ctx,
2365                                    uint32_t flags,
2366                                    struct ctdb_all_public_ips **ips)
2367 {
2368         int ret;
2369         TDB_DATA outdata;
2370         int32_t res;
2371
2372         ret = ctdb_control(ctdb, destnode, 0, 
2373                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2374                            mem_ctx, &outdata, &res, &timeout, NULL);
2375         if (ret == 0 && res == -1) {
2376                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2377                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2378         }
2379         if (ret != 0 || res != 0) {
2380           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2381                 return -1;
2382         }
2383
2384         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2385         talloc_free(outdata.dptr);
2386                     
2387         return 0;
2388 }
2389
2390 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2391                              struct timeval timeout, uint32_t destnode,
2392                              TALLOC_CTX *mem_ctx,
2393                              struct ctdb_all_public_ips **ips)
2394 {
2395         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2396                                               destnode, mem_ctx,
2397                                               0, ips);
2398 }
2399
2400 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2401                         struct timeval timeout, uint32_t destnode, 
2402                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2403 {
2404         int ret, i, len;
2405         TDB_DATA outdata;
2406         int32_t res;
2407         struct ctdb_all_public_ipsv4 *ipsv4;
2408
2409         ret = ctdb_control(ctdb, destnode, 0, 
2410                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2411                            mem_ctx, &outdata, &res, &timeout, NULL);
2412         if (ret != 0 || res != 0) {
2413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2414                 return -1;
2415         }
2416
2417         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2418         len = offsetof(struct ctdb_all_public_ips, ips) +
2419                 ipsv4->num*sizeof(struct ctdb_public_ip);
2420         *ips = talloc_zero_size(mem_ctx, len);
2421         CTDB_NO_MEMORY(ctdb, *ips);
2422         (*ips)->num = ipsv4->num;
2423         for (i=0; i<ipsv4->num; i++) {
2424                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2425                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2426         }
2427
2428         talloc_free(outdata.dptr);
2429                     
2430         return 0;
2431 }
2432
2433 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2434                                  struct timeval timeout, uint32_t destnode,
2435                                  TALLOC_CTX *mem_ctx,
2436                                  const ctdb_sock_addr *addr,
2437                                  struct ctdb_control_public_ip_info **_info)
2438 {
2439         int ret;
2440         TDB_DATA indata;
2441         TDB_DATA outdata;
2442         int32_t res;
2443         struct ctdb_control_public_ip_info *info;
2444         uint32_t len;
2445         uint32_t i;
2446
2447         indata.dptr = discard_const_p(uint8_t, addr);
2448         indata.dsize = sizeof(*addr);
2449
2450         ret = ctdb_control(ctdb, destnode, 0,
2451                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2452                            mem_ctx, &outdata, &res, &timeout, NULL);
2453         if (ret != 0 || res != 0) {
2454                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2455                                 "failed ret:%d res:%d\n",
2456                                 ret, res));
2457                 return -1;
2458         }
2459
2460         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2461         if (len > outdata.dsize) {
2462                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2463                                 "returned invalid data with size %u > %u\n",
2464                                 (unsigned int)outdata.dsize,
2465                                 (unsigned int)len));
2466                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2467                 return -1;
2468         }
2469
2470         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2471         len += info->num*sizeof(struct ctdb_control_iface_info);
2472
2473         if (len > outdata.dsize) {
2474                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2475                                 "returned invalid data with size %u > %u\n",
2476                                 (unsigned int)outdata.dsize,
2477                                 (unsigned int)len));
2478                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2479                 return -1;
2480         }
2481
2482         /* make sure we null terminate the returned strings */
2483         for (i=0; i < info->num; i++) {
2484                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2485         }
2486
2487         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2488                                                                 outdata.dptr,
2489                                                                 outdata.dsize);
2490         talloc_free(outdata.dptr);
2491         if (*_info == NULL) {
2492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2493                                 "talloc_memdup size %u failed\n",
2494                                 (unsigned int)outdata.dsize));
2495                 return -1;
2496         }
2497
2498         return 0;
2499 }
2500
2501 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2502                          struct timeval timeout, uint32_t destnode,
2503                          TALLOC_CTX *mem_ctx,
2504                          struct ctdb_control_get_ifaces **_ifaces)
2505 {
2506         int ret;
2507         TDB_DATA outdata;
2508         int32_t res;
2509         struct ctdb_control_get_ifaces *ifaces;
2510         uint32_t len;
2511         uint32_t i;
2512
2513         ret = ctdb_control(ctdb, destnode, 0,
2514                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2515                            mem_ctx, &outdata, &res, &timeout, NULL);
2516         if (ret != 0 || res != 0) {
2517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2518                                 "failed ret:%d res:%d\n",
2519                                 ret, res));
2520                 return -1;
2521         }
2522
2523         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2524         if (len > outdata.dsize) {
2525                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2526                                 "returned invalid data with size %u > %u\n",
2527                                 (unsigned int)outdata.dsize,
2528                                 (unsigned int)len));
2529                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2530                 return -1;
2531         }
2532
2533         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2534         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2535
2536         if (len > outdata.dsize) {
2537                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2538                                 "returned invalid data with size %u > %u\n",
2539                                 (unsigned int)outdata.dsize,
2540                                 (unsigned int)len));
2541                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2542                 return -1;
2543         }
2544
2545         /* make sure we null terminate the returned strings */
2546         for (i=0; i < ifaces->num; i++) {
2547                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2548         }
2549
2550         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2551                                                                   outdata.dptr,
2552                                                                   outdata.dsize);
2553         talloc_free(outdata.dptr);
2554         if (*_ifaces == NULL) {
2555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2556                                 "talloc_memdup size %u failed\n",
2557                                 (unsigned int)outdata.dsize));
2558                 return -1;
2559         }
2560
2561         return 0;
2562 }
2563
2564 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2565                              struct timeval timeout, uint32_t destnode,
2566                              TALLOC_CTX *mem_ctx,
2567                              const struct ctdb_control_iface_info *info)
2568 {
2569         int ret;
2570         TDB_DATA indata;
2571         int32_t res;
2572
2573         indata.dptr = discard_const_p(uint8_t, info);
2574         indata.dsize = sizeof(*info);
2575
2576         ret = ctdb_control(ctdb, destnode, 0,
2577                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2578                            mem_ctx, NULL, &res, &timeout, NULL);
2579         if (ret != 0 || res != 0) {
2580                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2581                                 "failed ret:%d res:%d\n",
2582                                 ret, res));
2583                 return -1;
2584         }
2585
2586         return 0;
2587 }
2588
2589 /*
2590   set/clear the permanent disabled bit on a remote node
2591  */
2592 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2593                        uint32_t set, uint32_t clear)
2594 {
2595         int ret;
2596         TDB_DATA data;
2597         struct ctdb_node_map *nodemap=NULL;
2598         struct ctdb_node_flag_change c;
2599         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2600         uint32_t recmaster;
2601         uint32_t *nodes;
2602
2603
2604         /* find the recovery master */
2605         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2606         if (ret != 0) {
2607                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2608                 talloc_free(tmp_ctx);
2609                 return ret;
2610         }
2611
2612
2613         /* read the node flags from the recmaster */
2614         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2615         if (ret != 0) {
2616                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2617                 talloc_free(tmp_ctx);
2618                 return -1;
2619         }
2620         if (destnode >= nodemap->num) {
2621                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2622                 talloc_free(tmp_ctx);
2623                 return -1;
2624         }
2625
2626         c.pnn       = destnode;
2627         c.old_flags = nodemap->nodes[destnode].flags;
2628         c.new_flags = c.old_flags;
2629         c.new_flags |= set;
2630         c.new_flags &= ~clear;
2631
2632         data.dsize = sizeof(c);
2633         data.dptr = (unsigned char *)&c;
2634
2635         /* send the flags update to all connected nodes */
2636         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2637
2638         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2639                                         nodes, 0,
2640                                         timeout, false, data,
2641                                         NULL, NULL,
2642                                         NULL) != 0) {
2643                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2644
2645                 talloc_free(tmp_ctx);
2646                 return -1;
2647         }
2648
2649         talloc_free(tmp_ctx);
2650         return 0;
2651 }
2652
2653
2654 /*
2655   get all tunables
2656  */
2657 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2658                                struct timeval timeout, 
2659                                uint32_t destnode,
2660                                struct ctdb_tunable *tunables)
2661 {
2662         TDB_DATA outdata;
2663         int ret;
2664         int32_t res;
2665
2666         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2667                            &outdata, &res, &timeout, NULL);
2668         if (ret != 0 || res != 0) {
2669                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2670                 return -1;
2671         }
2672
2673         if (outdata.dsize != sizeof(*tunables)) {
2674                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2675                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2676                 return -1;              
2677         }
2678
2679         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2680         talloc_free(outdata.dptr);
2681         return 0;
2682 }
2683
2684 /*
2685   add a public address to a node
2686  */
2687 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2688                       struct timeval timeout, 
2689                       uint32_t destnode,
2690                       struct ctdb_control_ip_iface *pub)
2691 {
2692         TDB_DATA data;
2693         int32_t res;
2694         int ret;
2695
2696         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2697         data.dptr  = (unsigned char *)pub;
2698
2699         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2700                            NULL, &res, &timeout, NULL);
2701         if (ret != 0 || res != 0) {
2702                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2703                 return -1;
2704         }
2705
2706         return 0;
2707 }
2708
2709 /*
2710   delete a public address from a node
2711  */
2712 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2713                       struct timeval timeout, 
2714                       uint32_t destnode,
2715                       struct ctdb_control_ip_iface *pub)
2716 {
2717         TDB_DATA data;
2718         int32_t res;
2719         int ret;
2720
2721         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2722         data.dptr  = (unsigned char *)pub;
2723
2724         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2725                            NULL, &res, &timeout, NULL);
2726         if (ret != 0 || res != 0) {
2727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2728                 return -1;
2729         }
2730
2731         return 0;
2732 }
2733
2734 /*
2735   kill a tcp connection
2736  */
2737 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2738                       struct timeval timeout, 
2739                       uint32_t destnode,
2740                       struct ctdb_control_killtcp *killtcp)
2741 {
2742         TDB_DATA data;
2743         int32_t res;
2744         int ret;
2745
2746         data.dsize = sizeof(struct ctdb_control_killtcp);
2747         data.dptr  = (unsigned char *)killtcp;
2748
2749         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2750                            NULL, &res, &timeout, NULL);
2751         if (ret != 0 || res != 0) {
2752                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2753                 return -1;
2754         }
2755
2756         return 0;
2757 }
2758
2759 /*
2760   send a gratious arp
2761  */
2762 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2763                       struct timeval timeout, 
2764                       uint32_t destnode,
2765                       ctdb_sock_addr *addr,
2766                       const char *ifname)
2767 {
2768         TDB_DATA data;
2769         int32_t res;
2770         int ret, len;
2771         struct ctdb_control_gratious_arp *gratious_arp;
2772         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2773
2774
2775         len = strlen(ifname)+1;
2776         gratious_arp = talloc_size(tmp_ctx, 
2777                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2778         CTDB_NO_MEMORY(ctdb, gratious_arp);
2779
2780         gratious_arp->addr = *addr;
2781         gratious_arp->len = len;
2782         memcpy(&gratious_arp->iface[0], ifname, len);
2783
2784
2785         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2786         data.dptr  = (unsigned char *)gratious_arp;
2787
2788         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2789                            NULL, &res, &timeout, NULL);
2790         if (ret != 0 || res != 0) {
2791                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2792                 talloc_free(tmp_ctx);
2793                 return -1;
2794         }
2795
2796         talloc_free(tmp_ctx);
2797         return 0;
2798 }
2799
2800 /*
2801   get a list of all tcp tickles that a node knows about for a particular vnn
2802  */
2803 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2804                               struct timeval timeout, uint32_t destnode, 
2805                               TALLOC_CTX *mem_ctx, 
2806                               ctdb_sock_addr *addr,
2807                               struct ctdb_control_tcp_tickle_list **list)
2808 {
2809         int ret;
2810         TDB_DATA data, outdata;
2811         int32_t status;
2812
2813         data.dptr = (uint8_t*)addr;
2814         data.dsize = sizeof(ctdb_sock_addr);
2815
2816         ret = ctdb_control(ctdb, destnode, 0, 
2817                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2818                            mem_ctx, &outdata, &status, NULL, NULL);
2819         if (ret != 0 || status != 0) {
2820                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2821                 return -1;
2822         }
2823
2824         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2825
2826         return status;
2827 }
2828
2829 /*
2830   register a server id
2831  */
2832 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2833                       struct timeval timeout, 
2834                       struct ctdb_server_id *id)
2835 {
2836         TDB_DATA data;
2837         int32_t res;
2838         int ret;
2839
2840         data.dsize = sizeof(struct ctdb_server_id);
2841         data.dptr  = (unsigned char *)id;
2842
2843         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2844                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2845                         0, data, NULL,
2846                         NULL, &res, &timeout, NULL);
2847         if (ret != 0 || res != 0) {
2848                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2849                 return -1;
2850         }
2851
2852         return 0;
2853 }
2854
2855 /*
2856   unregister a server id
2857  */
2858 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2859                       struct timeval timeout, 
2860                       struct ctdb_server_id *id)
2861 {
2862         TDB_DATA data;
2863         int32_t res;
2864         int ret;
2865
2866         data.dsize = sizeof(struct ctdb_server_id);
2867         data.dptr  = (unsigned char *)id;
2868
2869         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2870                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2871                         0, data, NULL,
2872                         NULL, &res, &timeout, NULL);
2873         if (ret != 0 || res != 0) {
2874                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2875                 return -1;
2876         }
2877
2878         return 0;
2879 }
2880
2881
2882 /*
2883   check if a server id exists
2884
2885   if a server id does exist, return *status == 1, otherwise *status == 0
2886  */
2887 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2888                       struct timeval timeout, 
2889                       uint32_t destnode,
2890                       struct ctdb_server_id *id,
2891                       uint32_t *status)
2892 {
2893         TDB_DATA data;
2894         int32_t res;
2895         int ret;
2896
2897         data.dsize = sizeof(struct ctdb_server_id);
2898         data.dptr  = (unsigned char *)id;
2899
2900         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2901                         0, data, NULL,
2902                         NULL, &res, &timeout, NULL);
2903         if (ret != 0) {
2904                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2905                 return -1;
2906         }
2907
2908         if (res) {
2909                 *status = 1;
2910         } else {
2911                 *status = 0;
2912         }
2913
2914         return 0;
2915 }
2916
2917 /*
2918    get the list of server ids that are registered on a node
2919 */
2920 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2921                 TALLOC_CTX *mem_ctx,
2922                 struct timeval timeout, uint32_t destnode, 
2923                 struct ctdb_server_id_list **svid_list)
2924 {
2925         int ret;
2926         TDB_DATA outdata;
2927         int32_t res;
2928
2929         ret = ctdb_control(ctdb, destnode, 0, 
2930                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2931                            mem_ctx, &outdata, &res, &timeout, NULL);
2932         if (ret != 0 || res != 0) {
2933                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2934                 return -1;
2935         }
2936
2937         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2938                     
2939         return 0;
2940 }
2941
2942 /*
2943   initialise the ctdb daemon for client applications
2944
2945   NOTE: In current code the daemon does not fork. This is for testing purposes only
2946   and to simplify the code.
2947 */
2948 struct ctdb_context *ctdb_init(struct event_context *ev)
2949 {
2950         int ret;
2951         struct ctdb_context *ctdb;
2952
2953         ctdb = talloc_zero(ev, struct ctdb_context);
2954         if (ctdb == NULL) {
2955                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2956                 return NULL;
2957         }
2958         ctdb->ev  = ev;
2959         ctdb->idr = idr_init(ctdb);
2960         /* Wrap early to exercise code. */
2961         ctdb->lastid = INT_MAX-200;
2962         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2963
2964         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2965         if (ret != 0) {
2966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2967                 talloc_free(ctdb);
2968                 return NULL;
2969         }
2970
2971         ctdb->statistics.statistics_start_time = timeval_current();
2972
2973         return ctdb;
2974 }
2975
2976
2977 /*
2978   set some ctdb flags
2979 */
2980 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2981 {
2982         ctdb->flags |= flags;
2983 }
2984
2985 /*
2986   setup the local socket name
2987 */
2988 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2989 {
2990         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2991         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2992
2993         return 0;
2994 }
2995
2996 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2997 {
2998         return ctdb->daemon.name;
2999 }
3000
3001 /*
3002   return the pnn of this node
3003 */
3004 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3005 {
3006         return ctdb->pnn;
3007 }
3008
3009
3010 /*
3011   get the uptime of a remote node
3012  */
3013 struct ctdb_client_control_state *
3014 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3015 {
3016         return ctdb_control_send(ctdb, destnode, 0, 
3017                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3018                            mem_ctx, &timeout, NULL);
3019 }
3020
3021 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3022 {
3023         int ret;
3024         int32_t res;
3025         TDB_DATA outdata;
3026
3027         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3028         if (ret != 0 || res != 0) {
3029                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3030                 return -1;
3031         }
3032
3033         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3034
3035         return 0;
3036 }
3037
3038 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3039 {
3040         struct ctdb_client_control_state *state;
3041
3042         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3043         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3044 }
3045
3046 /*
3047   send a control to execute the "recovered" event script on a node
3048  */
3049 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3050 {
3051         int ret;
3052         int32_t status;
3053
3054         ret = ctdb_control(ctdb, destnode, 0, 
3055                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3056                            NULL, NULL, &status, &timeout, NULL);
3057         if (ret != 0 || status != 0) {
3058                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3059                 return -1;
3060         }
3061
3062         return 0;
3063 }
3064
3065 /* 
3066   callback for the async helpers used when sending the same control
3067   to multiple nodes in parallell.
3068 */
3069 static void async_callback(struct ctdb_client_control_state *state)
3070 {
3071         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3072         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3073         int ret;
3074         TDB_DATA outdata;
3075         int32_t res;
3076         uint32_t destnode = state->c->hdr.destnode;
3077
3078         /* one more node has responded with recmode data */
3079         data->count--;
3080
3081         /* if we failed to push the db, then return an error and let
3082            the main loop try again.
3083         */
3084         if (state->state != CTDB_CONTROL_DONE) {
3085                 if ( !data->dont_log_errors) {
3086                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3087                 }
3088                 data->fail_count++;
3089                 if (data->fail_callback) {
3090                         data->fail_callback(ctdb, destnode, res, outdata,
3091                                         data->callback_data);
3092                 }
3093                 return;
3094         }
3095         
3096         state->async.fn = NULL;
3097
3098         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3099         if ((ret != 0) || (res != 0)) {
3100                 if ( !data->dont_log_errors) {
3101                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3102                 }
3103                 data->fail_count++;
3104                 if (data->fail_callback) {
3105                         data->fail_callback(ctdb, destnode, res, outdata,
3106                                         data->callback_data);
3107                 }
3108         }
3109         if ((ret == 0) && (data->callback != NULL)) {
3110                 data->callback(ctdb, destnode, res, outdata,
3111                                         data->callback_data);
3112         }
3113 }
3114
3115
3116 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3117 {
3118         /* set up the callback functions */
3119         state->async.fn = async_callback;
3120         state->async.private_data = data;
3121         
3122         /* one more control to wait for to complete */
3123         data->count++;
3124 }
3125
3126
3127 /* wait for up to the maximum number of seconds allowed
3128    or until all nodes we expect a response from has replied
3129 */
3130 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3131 {
3132         while (data->count > 0) {
3133                 event_loop_once(ctdb->ev);
3134         }
3135         if (data->fail_count != 0) {
3136                 if (!data->dont_log_errors) {
3137                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3138                                  data->fail_count));
3139                 }
3140                 return -1;
3141         }
3142         return 0;
3143 }
3144
3145
3146 /* 
3147    perform a simple control on the listed nodes
3148    The control cannot return data
3149  */
3150 int ctdb_client_async_control(struct ctdb_context *ctdb,
3151                                 enum ctdb_controls opcode,
3152                                 uint32_t *nodes,
3153                                 uint64_t srvid,
3154                                 struct timeval timeout,
3155                                 bool dont_log_errors,
3156                                 TDB_DATA data,
3157                                 client_async_callback client_callback,
3158                                 client_async_callback fail_callback,
3159                                 void *callback_data)
3160 {
3161         struct client_async_data *async_data;
3162         struct ctdb_client_control_state *state;
3163         int j, num_nodes;
3164
3165         async_data = talloc_zero(ctdb, struct client_async_data);
3166         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3167         async_data->dont_log_errors = dont_log_errors;
3168         async_data->callback = client_callback;
3169         async_data->fail_callback = fail_callback;
3170         async_data->callback_data = callback_data;
3171         async_data->opcode        = opcode;
3172
3173         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3174
3175         /* loop over all nodes and send an async control to each of them */
3176         for (j=0; j<num_nodes; j++) {
3177                 uint32_t pnn = nodes[j];
3178
3179                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3180                                           0, data, async_data, &timeout, NULL);
3181                 if (state == NULL) {
3182                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3183                         talloc_free(async_data);
3184                         return -1;
3185                 }
3186                 
3187                 ctdb_client_async_add(async_data, state);
3188         }
3189
3190         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3191                 talloc_free(async_data);
3192                 return -1;
3193         }
3194
3195         talloc_free(async_data);
3196         return 0;
3197 }
3198
3199 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3200                                 struct ctdb_vnn_map *vnn_map,
3201                                 TALLOC_CTX *mem_ctx,
3202                                 bool include_self)
3203 {
3204         int i, j, num_nodes;
3205         uint32_t *nodes;
3206
3207         for (i=num_nodes=0;i<vnn_map->size;i++) {
3208                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3209                         continue;
3210                 }
3211                 num_nodes++;
3212         } 
3213
3214         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3215         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3216
3217         for (i=j=0;i<vnn_map->size;i++) {
3218                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3219                         continue;
3220                 }
3221                 nodes[j++] = vnn_map->map[i];
3222         } 
3223
3224         return nodes;
3225 }
3226
3227 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3228                                 struct ctdb_node_map *node_map,
3229                                 TALLOC_CTX *mem_ctx,
3230                                 bool include_self)
3231 {
3232         int i, j, num_nodes;
3233         uint32_t *nodes;
3234
3235         for (i=num_nodes=0;i<node_map->num;i++) {
3236                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3237                         continue;
3238                 }
3239                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3240                         continue;
3241                 }
3242                 num_nodes++;
3243         } 
3244
3245         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3246         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3247
3248         for (i=j=0;i<node_map->num;i++) {
3249                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3250                         continue;
3251                 }
3252                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3253                         continue;
3254                 }
3255                 nodes[j++] = node_map->nodes[i].pnn;
3256         } 
3257
3258         return nodes;
3259 }
3260
3261 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3262                                 struct ctdb_node_map *node_map,
3263                                 TALLOC_CTX *mem_ctx,
3264                                 uint32_t pnn)
3265 {
3266         int i, j, num_nodes;
3267         uint32_t *nodes;
3268
3269         for (i=num_nodes=0;i<node_map->num;i++) {
3270                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3271                         continue;
3272                 }
3273                 if (node_map->nodes[i].pnn == pnn) {
3274                         continue;
3275                 }
3276                 num_nodes++;
3277         } 
3278
3279         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3280         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3281
3282         for (i=j=0;i<node_map->num;i++) {
3283                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3284                         continue;
3285                 }
3286                 if (node_map->nodes[i].pnn == pnn) {
3287                         continue;
3288                 }
3289                 nodes[j++] = node_map->nodes[i].pnn;
3290         } 
3291
3292         return nodes;
3293 }
3294
3295 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3296                                 struct ctdb_node_map *node_map,
3297                                 TALLOC_CTX *mem_ctx,
3298                                 bool include_self)
3299 {
3300         int i, j, num_nodes;
3301         uint32_t *nodes;
3302
3303         for (i=num_nodes=0;i<node_map->num;i++) {
3304                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3305                         continue;
3306                 }
3307                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3308                         continue;
3309                 }
3310                 num_nodes++;
3311         } 
3312
3313         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3314         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3315
3316         for (i=j=0;i<node_map->num;i++) {
3317                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3318                         continue;
3319                 }
3320                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3321                         continue;
3322                 }
3323                 nodes[j++] = node_map->nodes[i].pnn;
3324         } 
3325
3326         return nodes;
3327 }
3328
3329 /* 
3330   this is used to test if a pnn lock exists and if it exists will return
3331   the number of connections that pnn has reported or -1 if that recovery
3332   daemon is not running.
3333 */
3334 int
3335 ctdb_read_pnn_lock(int fd, int32_t pnn)
3336 {
3337         struct flock lock;
3338         char c;
3339
3340         lock.l_type = F_WRLCK;
3341         lock.l_whence = SEEK_SET;
3342         lock.l_start = pnn;
3343         lock.l_len = 1;
3344         lock.l_pid = 0;
3345
3346         if (fcntl(fd, F_GETLK, &lock) != 0) {
3347                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3348                 return -1;
3349         }
3350
3351         if (lock.l_type == F_UNLCK) {
3352                 return -1;
3353         }
3354
3355         if (pread(fd, &c, 1, pnn) == -1) {
3356                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3357                 return -1;
3358         }
3359
3360         return c;
3361 }
3362
3363 /*
3364   get capabilities of a remote node
3365  */
3366 struct ctdb_client_control_state *
3367 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3368 {
3369         return ctdb_control_send(ctdb, destnode, 0, 
3370                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3371                            mem_ctx, &timeout, NULL);
3372 }
3373
3374 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3375 {
3376         int ret;
3377         int32_t res;
3378         TDB_DATA outdata;
3379
3380         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3381         if ( (ret != 0) || (res != 0) ) {
3382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3383                 return -1;
3384         }
3385
3386         if (capabilities) {
3387                 *capabilities = *((uint32_t *)outdata.dptr);
3388         }
3389
3390         return 0;
3391 }
3392
3393 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3394 {
3395         struct ctdb_client_control_state *state;
3396         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3397         int ret;
3398
3399         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3400         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3401         talloc_free(tmp_ctx);
3402         return ret;
3403 }
3404
3405 /**
3406  * check whether a transaction is active on a given db on a given node
3407  */
3408 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3409                                      uint32_t destnode,
3410                                      uint32_t db_id)
3411 {
3412         int32_t status;
3413         int ret;
3414         TDB_DATA indata;
3415
3416         indata.dptr = (uint8_t *)&db_id;
3417         indata.dsize = sizeof(db_id);
3418
3419         ret = ctdb_control(ctdb, destnode, 0,
3420                            CTDB_CONTROL_TRANS2_ACTIVE,
3421                            0, indata, NULL, NULL, &status,
3422                            NULL, NULL);
3423
3424         if (ret != 0) {
3425                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3426                 return -1;
3427         }
3428
3429         return status;
3430 }
3431
3432
3433 struct ctdb_transaction_handle {
3434         struct ctdb_db_context *ctdb_db;
3435         bool in_replay;
3436         /*
3437          * we store the reads and writes done under a transaction:
3438          * - one list stores both reads and writes (m_all),
3439          * - the other just writes (m_write)
3440          */
3441         struct ctdb_marshall_buffer *m_all;
3442         struct ctdb_marshall_buffer *m_write;
3443 };
3444
3445 /* start a transaction on a database */
3446 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3447 {
3448         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3449         return 0;
3450 }
3451
3452 /* start a transaction on a database */
3453 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3454 {
3455         struct ctdb_record_handle *rh;
3456         TDB_DATA key;
3457         TDB_DATA data;
3458         struct ctdb_ltdb_header header;
3459         TALLOC_CTX *tmp_ctx;
3460         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3461         int ret;
3462         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3463         pid_t pid;
3464         int32_t status;
3465
3466         key.dptr = discard_const(keyname);
3467         key.dsize = strlen(keyname);
3468
3469         if (!ctdb_db->persistent) {
3470                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3471                 return -1;
3472         }
3473
3474 again:
3475         tmp_ctx = talloc_new(h);
3476
3477         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3478         if (rh == NULL) {
3479                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3480                 talloc_free(tmp_ctx);
3481                 return -1;
3482         }
3483
3484         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3485                                               CTDB_CURRENT_NODE,
3486                                               ctdb_db->db_id);
3487         if (status == 1) {
3488                 unsigned long int usec = (1000 + random()) % 100000;
3489                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3490                                     "on db_id[0x%08x]. waiting for %lu "
3491                                     "microseconds\n",
3492                                     ctdb_db->db_id, usec));
3493                 talloc_free(tmp_ctx);
3494                 usleep(usec);
3495                 goto again;
3496         }
3497
3498         /*
3499          * store the pid in the database:
3500          * it is not enough that the node is dmaster...
3501          */
3502         pid = getpid();
3503         data.dptr = (unsigned char *)&pid;
3504         data.dsize = sizeof(pid_t);
3505         rh->header.rsn++;
3506         rh->header.dmaster = ctdb_db->ctdb->pnn;
3507         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3508         if (ret != 0) {
3509                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3510                                   "transaction record\n"));
3511                 talloc_free(tmp_ctx);
3512                 return -1;
3513         }
3514
3515         talloc_free(rh);
3516
3517         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3518         if (ret != 0) {
3519                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3525         if (ret != 0) {
3526                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3527                                  "lock record inside transaction\n"));
3528                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3529                 talloc_free(tmp_ctx);
3530                 goto again;
3531         }
3532
3533         if (header.dmaster != ctdb_db->ctdb->pnn) {
3534                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3535                                    "transaction lock record\n"));
3536                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3537                 talloc_free(tmp_ctx);
3538                 goto again;
3539         }
3540
3541         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3542                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3543                                     "the transaction lock record\n"));
3544                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3545                 talloc_free(tmp_ctx);
3546                 goto again;
3547         }
3548
3549         talloc_free(tmp_ctx);
3550
3551         return 0;
3552 }
3553
3554
3555 /* start a transaction on a database */
3556 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3557                                                        TALLOC_CTX *mem_ctx)
3558 {
3559         struct ctdb_transaction_handle *h;
3560         int ret;
3561
3562         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3563         if (h == NULL) {
3564                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3565                 return NULL;
3566         }
3567
3568         h->ctdb_db = ctdb_db;
3569
3570         ret = ctdb_transaction_fetch_start(h);
3571         if (ret != 0) {
3572                 talloc_free(h);
3573                 return NULL;
3574         }
3575
3576         talloc_set_destructor(h, ctdb_transaction_destructor);
3577
3578         return h;
3579 }
3580
3581
3582
3583 /*
3584   fetch a record inside a transaction
3585  */
3586 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3587                            TALLOC_CTX *mem_ctx, 
3588                            TDB_DATA key, TDB_DATA *data)
3589 {
3590         struct ctdb_ltdb_header header;
3591         int ret;
3592
3593         ZERO_STRUCT(header);
3594
3595         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3596         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3597                 /* record doesn't exist yet */
3598                 *data = tdb_null;
3599                 ret = 0;
3600         }
3601         
3602         if (ret != 0) {
3603                 return ret;
3604         }
3605
3606         if (!h->in_replay) {
3607                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3608                 if (h->m_all == NULL) {
3609                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3610                         return -1;
3611                 }
3612         }
3613
3614         return 0;
3615 }
3616
3617 /*
3618   stores a record inside a transaction
3619  */
3620 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3621                            TDB_DATA key, TDB_DATA data)
3622 {
3623         TALLOC_CTX *tmp_ctx = talloc_new(h);
3624         struct ctdb_ltdb_header header;
3625         TDB_DATA olddata;
3626         int ret;
3627
3628         ZERO_STRUCT(header);
3629
3630         /* we need the header so we can update the RSN */
3631         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3632         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3633                 /* the record doesn't exist - create one with us as dmaster.
3634                    This is only safe because we are in a transaction and this
3635                    is a persistent database */
3636                 ZERO_STRUCT(header);
3637         } else if (ret != 0) {
3638                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3639                 talloc_free(tmp_ctx);
3640                 return ret;
3641         }
3642
3643         if (data.dsize == olddata.dsize &&
3644             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3645                 /* save writing the same data */
3646                 talloc_free(tmp_ctx);
3647                 return 0;
3648         }
3649
3650         header.dmaster = h->ctdb_db->ctdb->pnn;
3651         header.rsn++;
3652
3653         if (!h->in_replay) {
3654                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3655                 if (h->m_all == NULL) {
3656                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3657                         talloc_free(tmp_ctx);
3658                         return -1;
3659                 }
3660         }               
3661
3662         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3663         if (h->m_write == NULL) {
3664                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3665                 talloc_free(tmp_ctx);
3666                 return -1;
3667         }
3668         
3669         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3670
3671         talloc_free(tmp_ctx);
3672         
3673         return ret;
3674 }
3675
3676 /*
3677   replay a transaction
3678  */
3679 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3680 {
3681         int ret, i;
3682         struct ctdb_rec_data *rec = NULL;
3683
3684         h->in_replay = true;
3685         talloc_free(h->m_write);
3686         h->m_write = NULL;
3687
3688         ret = ctdb_transaction_fetch_start(h);
3689         if (ret != 0) {
3690                 return ret;
3691         }
3692
3693         for (i=0;i<h->m_all->count;i++) {
3694                 TDB_DATA key, data;
3695
3696                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3697                 if (rec == NULL) {
3698                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3699                         goto failed;
3700                 }
3701
3702                 if (rec->reqid == 0) {
3703                         /* its a store */
3704                         if (ctdb_transaction_store(h, key, data) != 0) {
3705                                 goto failed;
3706                         }
3707                 } else {
3708                         TDB_DATA data2;
3709                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3710
3711                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3712                                 talloc_free(tmp_ctx);
3713                                 goto failed;
3714                         }
3715                         if (data2.dsize != data.dsize ||
3716                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3717                                 /* the record has changed on us - we have to give up */
3718                                 talloc_free(tmp_ctx);
3719                                 goto failed;
3720                         }
3721                         talloc_free(tmp_ctx);
3722                 }
3723         }
3724         
3725         return 0;
3726
3727 failed:
3728         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3729         return -1;
3730 }
3731
3732
3733 /*
3734   commit a transaction
3735  */
3736 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3737 {
3738         int ret, retries=0;
3739         int32_t status;
3740         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3741         struct timeval timeout;
3742         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3743
3744         talloc_set_destructor(h, NULL);
3745
3746         /* our commit strategy is quite complex.
3747
3748            - we first try to commit the changes to all other nodes
3749
3750            - if that works, then we commit locally and we are done
3751
3752            - if a commit on another node fails, then we need to cancel
3753              the transaction, then restart the transaction (thus
3754              opening a window of time for a pending recovery to
3755              complete), then replay the transaction, checking all the
3756              reads and writes (checking that reads give the same data,
3757              and writes succeed). Then we retry the transaction to the
3758              other nodes
3759         */
3760
3761 again:
3762         if (h->m_write == NULL) {
3763                 /* no changes were made */
3764                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3765                 talloc_free(h);
3766                 return 0;
3767         }
3768
3769         /* tell ctdbd to commit to the other nodes */
3770         timeout = timeval_current_ofs(1, 0);
3771         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3772                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3773                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3774                            &timeout, NULL);
3775         if (ret != 0 || status != 0) {
3776                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3777                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3778                                      ", retrying after 1 second...\n",
3779                                      (retries==0)?"":"retry "));
3780                 sleep(1);
3781
3782                 if (ret != 0) {
3783                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3784                 } else {
3785                         /* work out what error code we will give if we 
3786                            have to fail the operation */
3787                         switch ((enum ctdb_trans2_commit_error)status) {
3788                         case CTDB_TRANS2_COMMIT_SUCCESS:
3789                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3790                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3791                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3792                                 break;
3793                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3794                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3795                                 break;
3796                         }
3797                 }
3798
3799                 if (++retries == 100) {
3800                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3801                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3802                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3803                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3804                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3805                         talloc_free(h);
3806                         return -1;
3807                 }               
3808
3809                 if (ctdb_replay_transaction(h) != 0) {
3810                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3811                                           "transaction on db 0x%08x, "
3812                                           "failure control =%u\n",
3813                                           h->ctdb_db->db_id,
3814                                           (unsigned)failure_control));
3815                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3816                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3817                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3818                         talloc_free(h);
3819                         return -1;
3820                 }
3821                 goto again;
3822         } else {
3823                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3824         }
3825
3826         /* do the real commit locally */
3827         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3828         if (ret != 0) {
3829                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3830                                   "on db id 0x%08x locally, "
3831                                   "failure_control=%u\n",
3832                                   h->ctdb_db->db_id,
3833                                   (unsigned)failure_control));
3834                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3835                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3836                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3837                 talloc_free(h);
3838                 return ret;
3839         }
3840
3841         /* tell ctdbd that we are finished with our local commit */
3842         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3843                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3844                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3845         talloc_free(h);
3846         return 0;
3847 }
3848
3849 /*
3850   recovery daemon ping to main daemon
3851  */
3852 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3853 {
3854         int ret;
3855         int32_t res;
3856
3857         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3858                            ctdb, NULL, &res, NULL, NULL);
3859         if (ret != 0 || res != 0) {
3860                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3861                 return -1;
3862         }
3863
3864         return 0;
3865 }
3866
3867 /* when forking the main daemon and the child process needs to connect back
3868  * to the daemon as a client process, this function can be used to change
3869  * the ctdb context from daemon into client mode
3870  */
3871 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3872 {
3873         int ret;
3874         va_list ap;
3875
3876         /* Add extra information so we can identify this in the logs */
3877         va_start(ap, fmt);
3878         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3879         va_end(ap);
3880
3881         /* shutdown the transport */
3882         if (ctdb->methods) {
3883                 ctdb->methods->shutdown(ctdb);
3884         }
3885
3886         /* get a new event context */
3887         talloc_free(ctdb->ev);
3888         ctdb->ev = event_context_init(ctdb);
3889         tevent_loop_allow_nesting(ctdb->ev);
3890
3891         close(ctdb->daemon.sd);
3892         ctdb->daemon.sd = -1;
3893
3894         /* the client does not need to be realtime */
3895         if (ctdb->do_setsched) {
3896                 ctdb_restore_scheduler(ctdb);
3897         }
3898
3899         /* initialise ctdb */
3900         ret = ctdb_socket_connect(ctdb);
3901         if (ret != 0) {
3902                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3903                 return -1;
3904         }
3905
3906          return 0;
3907 }
3908
3909 /*
3910   get the status of running the monitor eventscripts: NULL means never run.
3911  */
3912 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3913                 struct timeval timeout, uint32_t destnode, 
3914                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3915                 struct ctdb_scripts_wire **script_status)
3916 {
3917         int ret;
3918         TDB_DATA outdata, indata;
3919         int32_t res;
3920         uint32_t uinttype = type;
3921
3922         indata.dptr = (uint8_t *)&uinttype;
3923         indata.dsize = sizeof(uinttype);
3924
3925         ret = ctdb_control(ctdb, destnode, 0, 
3926                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3927                            mem_ctx, &outdata, &res, &timeout, NULL);
3928         if (ret != 0 || res != 0) {
3929                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3930                 return -1;
3931         }
3932
3933         if (outdata.dsize == 0) {
3934                 *script_status = NULL;
3935         } else {
3936                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3937                 talloc_free(outdata.dptr);
3938         }
3939                     
3940         return 0;
3941 }
3942
3943 /*
3944   tell the main daemon how long it took to lock the reclock file
3945  */
3946 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3947 {
3948         int ret;
3949         int32_t res;
3950         TDB_DATA data;
3951
3952         data.dptr = (uint8_t *)&latency;
3953         data.dsize = sizeof(latency);
3954
3955         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3956                            ctdb, NULL, &res, NULL, NULL);
3957         if (ret != 0 || res != 0) {
3958                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3959                 return -1;
3960         }
3961
3962         return 0;
3963 }
3964
3965 /*
3966   get the name of the reclock file
3967  */
3968 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3969                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3970                          const char **name)
3971 {
3972         int ret;
3973         int32_t res;
3974         TDB_DATA data;
3975
3976         ret = ctdb_control(ctdb, destnode, 0, 
3977                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3978                            mem_ctx, &data, &res, &timeout, NULL);
3979         if (ret != 0 || res != 0) {
3980                 return -1;
3981         }
3982
3983         if (data.dsize == 0) {
3984                 *name = NULL;
3985         } else {
3986                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3987         }
3988         talloc_free(data.dptr);
3989
3990         return 0;
3991 }
3992
3993 /*
3994   set the reclock filename for a node
3995  */
3996 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3997 {
3998         int ret;
3999         TDB_DATA data;
4000         int32_t res;
4001
4002         if (reclock == NULL) {
4003                 data.dsize = 0;
4004                 data.dptr  = NULL;
4005         } else {
4006                 data.dsize = strlen(reclock) + 1;
4007                 data.dptr  = discard_const(reclock);
4008         }
4009
4010         ret = ctdb_control(ctdb, destnode, 0, 
4011                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4012                            NULL, NULL, &res, &timeout, NULL);
4013         if (ret != 0 || res != 0) {
4014                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4015                 return -1;
4016         }
4017
4018         return 0;
4019 }
4020
4021 /*
4022   stop a node
4023  */
4024 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4025 {
4026         int ret;
4027         int32_t res;
4028
4029         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4030                            ctdb, NULL, &res, &timeout, NULL);
4031         if (ret != 0 || res != 0) {
4032                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4033                 return -1;
4034         }
4035
4036         return 0;
4037 }
4038
4039 /*
4040   continue a node
4041  */
4042 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4043 {
4044         int ret;
4045
4046         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4047                            ctdb, NULL, NULL, &timeout, NULL);
4048         if (ret != 0) {
4049                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4050                 return -1;
4051         }
4052
4053         return 0;
4054 }
4055
4056 /*
4057   set the natgw state for a node
4058  */
4059 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4060 {
4061         int ret;
4062         TDB_DATA data;
4063         int32_t res;
4064
4065         data.dsize = sizeof(natgwstate);
4066         data.dptr  = (uint8_t *)&natgwstate;
4067
4068         ret = ctdb_control(ctdb, destnode, 0, 
4069                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4070                            NULL, NULL, &res, &timeout, NULL);
4071         if (ret != 0 || res != 0) {
4072                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4073                 return -1;
4074         }
4075
4076         return 0;
4077 }
4078
4079 /*
4080   set the lmaster role for a node
4081  */
4082 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4083 {
4084         int ret;
4085         TDB_DATA data;
4086         int32_t res;
4087
4088         data.dsize = sizeof(lmasterrole);
4089         data.dptr  = (uint8_t *)&lmasterrole;
4090
4091         ret = ctdb_control(ctdb, destnode, 0, 
4092                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4093                            NULL, NULL, &res, &timeout, NULL);
4094         if (ret != 0 || res != 0) {
4095                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4096                 return -1;
4097         }
4098
4099         return 0;
4100 }
4101
4102 /*
4103   set the recmaster role for a node
4104  */
4105 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4106 {
4107         int ret;
4108         TDB_DATA data;
4109         int32_t res;
4110
4111         data.dsize = sizeof(recmasterrole);
4112         data.dptr  = (uint8_t *)&recmasterrole;
4113
4114         ret = ctdb_control(ctdb, destnode, 0, 
4115                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4116                            NULL, NULL, &res, &timeout, NULL);
4117         if (ret != 0 || res != 0) {
4118                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4119                 return -1;
4120         }
4121
4122         return 0;
4123 }
4124
4125 /* enable an eventscript
4126  */
4127 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4128 {
4129         int ret;
4130         TDB_DATA data;
4131         int32_t res;
4132
4133         data.dsize = strlen(script) + 1;
4134         data.dptr  = discard_const(script);
4135
4136         ret = ctdb_control(ctdb, destnode, 0, 
4137                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4138                            NULL, NULL, &res, &timeout, NULL);
4139         if (ret != 0 || res != 0) {
4140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4141                 return -1;
4142         }
4143
4144         return 0;
4145 }
4146
4147 /* disable an eventscript
4148  */
4149 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4150 {
4151         int ret;
4152         TDB_DATA data;
4153         int32_t res;
4154
4155         data.dsize = strlen(script) + 1;
4156         data.dptr  = discard_const(script);
4157
4158         ret = ctdb_control(ctdb, destnode, 0, 
4159                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4160                            NULL, NULL, &res, &timeout, NULL);
4161         if (ret != 0 || res != 0) {
4162                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4163                 return -1;
4164         }
4165
4166         return 0;
4167 }
4168
4169
4170 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4171 {
4172         int ret;
4173         TDB_DATA data;
4174         int32_t res;
4175
4176         data.dsize = sizeof(*bantime);
4177         data.dptr  = (uint8_t *)bantime;
4178
4179         ret = ctdb_control(ctdb, destnode, 0, 
4180                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4181                            NULL, NULL, &res, &timeout, NULL);
4182         if (ret != 0 || res != 0) {
4183                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4184                 return -1;
4185         }
4186
4187         return 0;
4188 }
4189
4190
4191 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4192 {
4193         int ret;
4194         TDB_DATA outdata;
4195         int32_t res;
4196         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4197
4198         ret = ctdb_control(ctdb, destnode, 0, 
4199                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4200                            tmp_ctx, &outdata, &res, &timeout, NULL);
4201         if (ret != 0 || res != 0) {
4202                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4203                 talloc_free(tmp_ctx);
4204                 return -1;
4205         }
4206
4207         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4208         talloc_free(tmp_ctx);
4209
4210         return 0;
4211 }
4212
4213
4214 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4215 {
4216         int ret;
4217         int32_t res;
4218         TDB_DATA data;
4219         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4220
4221         data.dptr = (uint8_t*)db_prio;
4222         data.dsize = sizeof(*db_prio);
4223
4224         ret = ctdb_control(ctdb, destnode, 0, 
4225                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4226                            tmp_ctx, NULL, &res, &timeout, NULL);
4227         if (ret != 0 || res != 0) {
4228                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4229                 talloc_free(tmp_ctx);
4230                 return -1;
4231         }
4232
4233         talloc_free(tmp_ctx);
4234
4235         return 0;
4236 }
4237
4238 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4239 {
4240         int ret;
4241         int32_t res;
4242         TDB_DATA data;
4243         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4244
4245         data.dptr = (uint8_t*)&db_id;
4246         data.dsize = sizeof(db_id);
4247
4248         ret = ctdb_control(ctdb, destnode, 0, 
4249                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4250                            tmp_ctx, NULL, &res, &timeout, NULL);
4251         if (ret != 0 || res < 0) {
4252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4253                 talloc_free(tmp_ctx);
4254                 return -1;
4255         }
4256
4257         if (priority) {
4258                 *priority = res;
4259         }
4260
4261         talloc_free(tmp_ctx);
4262
4263         return 0;
4264 }
4265
4266 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4267 {
4268         int ret;
4269         TDB_DATA outdata;
4270         int32_t res;
4271
4272         ret = ctdb_control(ctdb, destnode, 0, 
4273                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4274                            mem_ctx, &outdata, &res, &timeout, NULL);
4275         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4277                 return -1;
4278         }
4279
4280         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4281         talloc_free(outdata.dptr);
4282                     
4283         return 0;
4284 }
4285
4286 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4287 {
4288         if (h == NULL) {
4289                 return NULL;
4290         }
4291
4292         return &h->header;
4293 }
4294
4295
4296 struct ctdb_client_control_state *
4297 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4298 {
4299         struct ctdb_client_control_state *handle;
4300         struct ctdb_marshall_buffer *m;
4301         struct ctdb_rec_data *rec;
4302         TDB_DATA outdata;
4303
4304         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4305         if (m == NULL) {
4306                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4307                 return NULL;
4308         }
4309
4310         m->db_id = ctdb_db->db_id;
4311
4312         rec = ctdb_marshall_record(m, 0, key, header, data);
4313         if (rec == NULL) {
4314                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4315                 talloc_free(m);
4316                 return NULL;
4317         }
4318         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4319         if (m == NULL) {
4320                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4321                 talloc_free(m);
4322                 return NULL;
4323         }
4324         m->count++;
4325         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4326
4327
4328         outdata.dptr = (uint8_t *)m;
4329         outdata.dsize = talloc_get_size(m);
4330
4331         handle = ctdb_control_send(ctdb, destnode, 0, 
4332                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4333                            mem_ctx, &timeout, NULL);
4334         talloc_free(m);
4335         return handle;
4336 }
4337
4338 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4339 {
4340         int ret;
4341         int32_t res;
4342
4343         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4344         if ( (ret != 0) || (res != 0) ){
4345                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4346                 return -1;
4347         }
4348
4349         return 0;
4350 }
4351
4352 int
4353 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4354 {
4355         struct ctdb_client_control_state *state;
4356
4357         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4358         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4359 }
4360