7103ef6d94ce4ff4f98de31362d4d8976e47e882
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         /* we need to force the record to be written out if this was a remote access */
109         if (header->laccessor != caller) {
110                 header->lacount = 0;
111         }
112         header->laccessor = caller;
113         header->lacount++;
114
115         /* we need to force the record to be written out if this was a remote access,
116            so that the lacount is updated */
117         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
118                 c->new_data = &c->record_data;
119         }
120
121         if (c->new_data) {
122                 /* XXX check that we always have the lock here? */
123                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
124                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
125                         talloc_free(c);
126                         return -1;
127                 }
128         }
129
130         if (c->reply_data) {
131                 call->reply_data = *c->reply_data;
132
133                 talloc_steal(call, call->reply_data.dptr);
134                 talloc_set_name_const(call->reply_data.dptr, __location__);
135         } else {
136                 call->reply_data.dptr = NULL;
137                 call->reply_data.dsize = 0;
138         }
139         call->status = c->status;
140
141         talloc_free(c);
142
143         return 0;
144 }
145
146
147 /*
148   queue a packet for sending from client to daemon
149 */
150 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
151 {
152         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
153 }
154
155
156 /*
157   called when a CTDB_REPLY_CALL packet comes in in the client
158
159   This packet comes in response to a CTDB_REQ_CALL request packet. It
160   contains any reply data from the call
161 */
162 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
163 {
164         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
165         struct ctdb_client_call_state *state;
166
167         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
168         if (state == NULL) {
169                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
170                 return;
171         }
172
173         if (hdr->reqid != state->reqid) {
174                 /* we found a record  but it was the wrong one */
175                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
176                 return;
177         }
178
179         state->call->reply_data.dptr = c->data;
180         state->call->reply_data.dsize = c->datalen;
181         state->call->status = c->status;
182
183         talloc_steal(state, c);
184
185         state->state = CTDB_CALL_DONE;
186
187         if (state->async.fn) {
188                 state->async.fn(state);
189         }
190 }
191
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
193
194 /*
195   this is called in the client, when data comes in from the daemon
196  */
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
198 {
199         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
201         TALLOC_CTX *tmp_ctx;
202
203         /* place the packet as a child of a tmp_ctx. We then use
204            talloc_free() below to free it. If any of the calls want
205            to keep it, then they will steal it somewhere else, and the
206            talloc_free() will be a no-op */
207         tmp_ctx = talloc_new(ctdb);
208         talloc_steal(tmp_ctx, hdr);
209
210         if (cnt == 0) {
211                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
212                 exit(0);
213         }
214
215         if (cnt < sizeof(*hdr)) {
216                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
217                 goto done;
218         }
219         if (cnt != hdr->length) {
220                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
221                                (unsigned)hdr->length, (unsigned)cnt);
222                 goto done;
223         }
224
225         if (hdr->ctdb_magic != CTDB_MAGIC) {
226                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
227                 goto done;
228         }
229
230         if (hdr->ctdb_version != CTDB_VERSION) {
231                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
232                 goto done;
233         }
234
235         switch (hdr->operation) {
236         case CTDB_REPLY_CALL:
237                 ctdb_client_reply_call(ctdb, hdr);
238                 break;
239
240         case CTDB_REQ_MESSAGE:
241                 ctdb_request_message(ctdb, hdr);
242                 break;
243
244         case CTDB_REPLY_CONTROL:
245                 ctdb_client_reply_control(ctdb, hdr);
246                 break;
247
248         default:
249                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
250         }
251
252 done:
253         talloc_free(tmp_ctx);
254 }
255
256 /*
257   connect to a unix domain socket
258 */
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
260 {
261         struct sockaddr_un addr;
262
263         memset(&addr, 0, sizeof(addr));
264         addr.sun_family = AF_UNIX;
265         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
266
267         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268         if (ctdb->daemon.sd == -1) {
269                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
270                 return -1;
271         }
272
273         set_nonblocking(ctdb->daemon.sd);
274         set_close_on_exec(ctdb->daemon.sd);
275         
276         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
277                 close(ctdb->daemon.sd);
278                 ctdb->daemon.sd = -1;
279                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
280                 return -1;
281         }
282
283         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
284                                               CTDB_DS_ALIGNMENT, 
285                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
286         return 0;
287 }
288
289
290 struct ctdb_record_handle {
291         struct ctdb_db_context *ctdb_db;
292         TDB_DATA key;
293         TDB_DATA *data;
294         struct ctdb_ltdb_header header;
295 };
296
297
298 /*
299   make a recv call to the local ctdb daemon - called from client context
300
301   This is called when the program wants to wait for a ctdb_call to complete and get the 
302   results. This call will block unless the call has already completed.
303 */
304 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
305 {
306         if (state == NULL) {
307                 return -1;
308         }
309
310         while (state->state < CTDB_CALL_DONE) {
311                 event_loop_once(state->ctdb_db->ctdb->ev);
312         }
313         if (state->state != CTDB_CALL_DONE) {
314                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
315                 talloc_free(state);
316                 return -1;
317         }
318
319         if (state->call->reply_data.dsize) {
320                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
321                                                       state->call->reply_data.dptr,
322                                                       state->call->reply_data.dsize);
323                 call->reply_data.dsize = state->call->reply_data.dsize;
324         } else {
325                 call->reply_data.dptr = NULL;
326                 call->reply_data.dsize = 0;
327         }
328         call->status = state->call->status;
329         talloc_free(state);
330
331         return 0;
332 }
333
334
335
336
337 /*
338   destroy a ctdb_call in client
339 */
340 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
341 {
342         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
343         return 0;
344 }
345
346 /*
347   construct an event driven local ctdb_call
348
349   this is used so that locally processed ctdb_call requests are processed
350   in an event driven manner
351 */
352 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
353                                                                   struct ctdb_call *call,
354                                                                   struct ctdb_ltdb_header *header,
355                                                                   TDB_DATA *data)
356 {
357         struct ctdb_client_call_state *state;
358         struct ctdb_context *ctdb = ctdb_db->ctdb;
359         int ret;
360
361         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
362         CTDB_NO_MEMORY_NULL(ctdb, state);
363         state->call = talloc_zero(state, struct ctdb_call);
364         CTDB_NO_MEMORY_NULL(ctdb, state->call);
365
366         talloc_steal(state, data->dptr);
367
368         state->state   = CTDB_CALL_DONE;
369         *(state->call) = *call;
370         state->ctdb_db = ctdb_db;
371
372         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
373
374         return state;
375 }
376
377 /*
378   make a ctdb call to the local daemon - async send. Called from client context.
379
380   This constructs a ctdb_call request and queues it for processing. 
381   This call never blocks.
382 */
383 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
384                                               struct ctdb_call *call)
385 {
386         struct ctdb_client_call_state *state;
387         struct ctdb_context *ctdb = ctdb_db->ctdb;
388         struct ctdb_ltdb_header header;
389         TDB_DATA data;
390         int ret;
391         size_t len;
392         struct ctdb_req_call *c;
393
394         /* if the domain socket is not yet open, open it */
395         if (ctdb->daemon.sd==-1) {
396                 ctdb_socket_connect(ctdb);
397         }
398
399         ret = ctdb_ltdb_lock(ctdb_db, call->key);
400         if (ret != 0) {
401                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
402                 return NULL;
403         }
404
405         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
406
407         if (ret == 0 && header.dmaster == ctdb->pnn) {
408                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
409                 talloc_free(data.dptr);
410                 ctdb_ltdb_unlock(ctdb_db, call->key);
411                 return state;
412         }
413
414         ctdb_ltdb_unlock(ctdb_db, call->key);
415         talloc_free(data.dptr);
416
417         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
418         if (state == NULL) {
419                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
420                 return NULL;
421         }
422         state->call = talloc_zero(state, struct ctdb_call);
423         if (state->call == NULL) {
424                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
425                 return NULL;
426         }
427
428         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
429         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
430         if (c == NULL) {
431                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
432                 return NULL;
433         }
434
435         state->reqid     = ctdb_reqid_new(ctdb, state);
436         state->ctdb_db = ctdb_db;
437         talloc_set_destructor(state, ctdb_client_call_destructor);
438
439         c->hdr.reqid     = state->reqid;
440         c->flags         = call->flags;
441         c->db_id         = ctdb_db->db_id;
442         c->callid        = call->call_id;
443         c->hopcount      = 0;
444         c->keylen        = call->key.dsize;
445         c->calldatalen   = call->call_data.dsize;
446         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
447         memcpy(&c->data[call->key.dsize], 
448                call->call_data.dptr, call->call_data.dsize);
449         *(state->call)              = *call;
450         state->call->call_data.dptr = &c->data[call->key.dsize];
451         state->call->key.dptr       = &c->data[0];
452
453         state->state  = CTDB_CALL_WAIT;
454
455
456         ctdb_client_queue_pkt(ctdb, &c->hdr);
457
458         return state;
459 }
460
461
462 /*
463   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
464 */
465 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
466 {
467         struct ctdb_client_call_state *state;
468
469         state = ctdb_call_send(ctdb_db, call);
470         return ctdb_call_recv(state, call);
471 }
472
473
474 /*
475   tell the daemon what messaging srvid we will use, and register the message
476   handler function in the client
477 */
478 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
479                              ctdb_msg_fn_t handler,
480                              void *private_data)
481                                     
482 {
483         int res;
484         int32_t status;
485         
486         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
487                            tdb_null, NULL, NULL, &status, NULL, NULL);
488         if (res != 0 || status != 0) {
489                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
490                 return -1;
491         }
492
493         /* also need to register the handler with our own ctdb structure */
494         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
495 }
496
497 /*
498   tell the daemon we no longer want a srvid
499 */
500 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
501 {
502         int res;
503         int32_t status;
504         
505         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
506                            tdb_null, NULL, NULL, &status, NULL, NULL);
507         if (res != 0 || status != 0) {
508                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
509                 return -1;
510         }
511
512         /* also need to register the handler with our own ctdb structure */
513         ctdb_deregister_message_handler(ctdb, srvid, private_data);
514         return 0;
515 }
516
517
518 /*
519   send a message - from client context
520  */
521 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
522                       uint64_t srvid, TDB_DATA data)
523 {
524         struct ctdb_req_message *r;
525         int len, res;
526
527         len = offsetof(struct ctdb_req_message, data) + data.dsize;
528         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
529                                len, struct ctdb_req_message);
530         CTDB_NO_MEMORY(ctdb, r);
531
532         r->hdr.destnode  = pnn;
533         r->srvid         = srvid;
534         r->datalen       = data.dsize;
535         memcpy(&r->data[0], data.dptr, data.dsize);
536         
537         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
538         if (res != 0) {
539                 return res;
540         }
541
542         talloc_free(r);
543         return 0;
544 }
545
546
547 /*
548   cancel a ctdb_fetch_lock operation, releasing the lock
549  */
550 static int fetch_lock_destructor(struct ctdb_record_handle *h)
551 {
552         ctdb_ltdb_unlock(h->ctdb_db, h->key);
553         return 0;
554 }
555
556 /*
557   force the migration of a record to this node
558  */
559 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
560 {
561         struct ctdb_call call;
562         ZERO_STRUCT(call);
563         call.call_id = CTDB_NULL_FUNC;
564         call.key = key;
565         call.flags = CTDB_IMMEDIATE_MIGRATION;
566         return ctdb_call(ctdb_db, &call);
567 }
568
569 /*
570   get a lock on a record, and return the records data. Blocks until it gets the lock
571  */
572 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
573                                            TDB_DATA key, TDB_DATA *data)
574 {
575         int ret;
576         struct ctdb_record_handle *h;
577
578         /*
579           procedure is as follows:
580
581           1) get the chain lock. 
582           2) check if we are dmaster
583           3) if we are the dmaster then return handle 
584           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
585              reply from ctdbd
586           5) when we get the reply, goto (1)
587          */
588
589         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
590         if (h == NULL) {
591                 return NULL;
592         }
593
594         h->ctdb_db = ctdb_db;
595         h->key     = key;
596         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
597         if (h->key.dptr == NULL) {
598                 talloc_free(h);
599                 return NULL;
600         }
601         h->data    = data;
602
603         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
604                  (const char *)key.dptr));
605
606 again:
607         /* step 1 - get the chain lock */
608         ret = ctdb_ltdb_lock(ctdb_db, key);
609         if (ret != 0) {
610                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
611                 talloc_free(h);
612                 return NULL;
613         }
614
615         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
616
617         talloc_set_destructor(h, fetch_lock_destructor);
618
619         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
620
621         /* when torturing, ensure we test the remote path */
622         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
623             random() % 5 == 0) {
624                 h->header.dmaster = (uint32_t)-1;
625         }
626
627
628         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
629
630         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
631                 ctdb_ltdb_unlock(ctdb_db, key);
632                 ret = ctdb_client_force_migration(ctdb_db, key);
633                 if (ret != 0) {
634                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
635                         talloc_free(h);
636                         return NULL;
637                 }
638                 goto again;
639         }
640
641         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
642         return h;
643 }
644
645 /*
646   store some data to the record that was locked with ctdb_fetch_lock()
647 */
648 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
649 {
650         if (h->ctdb_db->persistent) {
651                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
652                 return -1;
653         }
654
655         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
656 }
657
658 /*
659   non-locking fetch of a record
660  */
661 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
662                TDB_DATA key, TDB_DATA *data)
663 {
664         struct ctdb_call call;
665         int ret;
666
667         call.call_id = CTDB_FETCH_FUNC;
668         call.call_data.dptr = NULL;
669         call.call_data.dsize = 0;
670
671         ret = ctdb_call(ctdb_db, &call);
672
673         if (ret == 0) {
674                 *data = call.reply_data;
675                 talloc_steal(mem_ctx, data->dptr);
676         }
677
678         return ret;
679 }
680
681
682
683 /*
684    called when a control completes or timesout to invoke the callback
685    function the user provided
686 */
687 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
688         struct timeval t, void *private_data)
689 {
690         struct ctdb_client_control_state *state;
691         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
692         int ret;
693
694         state = talloc_get_type(private_data, struct ctdb_client_control_state);
695         talloc_steal(tmp_ctx, state);
696
697         ret = ctdb_control_recv(state->ctdb, state, state,
698                         NULL, 
699                         NULL, 
700                         NULL);
701
702         talloc_free(tmp_ctx);
703 }
704
705 /*
706   called when a CTDB_REPLY_CONTROL packet comes in in the client
707
708   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
709   contains any reply data from the control
710 */
711 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
712                                       struct ctdb_req_header *hdr)
713 {
714         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
715         struct ctdb_client_control_state *state;
716
717         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
718         if (state == NULL) {
719                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
720                 return;
721         }
722
723         if (hdr->reqid != state->reqid) {
724                 /* we found a record  but it was the wrong one */
725                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
726                 return;
727         }
728
729         state->outdata.dptr = c->data;
730         state->outdata.dsize = c->datalen;
731         state->status = c->status;
732         if (c->errorlen) {
733                 state->errormsg = talloc_strndup(state, 
734                                                  (char *)&c->data[c->datalen], 
735                                                  c->errorlen);
736         }
737
738         /* state->outdata now uses resources from c so we dont want c
739            to just dissappear from under us while state is still alive
740         */
741         talloc_steal(state, c);
742
743         state->state = CTDB_CONTROL_DONE;
744
745         /* if we had a callback registered for this control, pull the response
746            and call the callback.
747         */
748         if (state->async.fn) {
749                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
750         }
751 }
752
753
754 /*
755   destroy a ctdb_control in client
756 */
757 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
758 {
759         ctdb_reqid_remove(state->ctdb, state->reqid);
760         return 0;
761 }
762
763
764 /* time out handler for ctdb_control */
765 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
766         struct timeval t, void *private_data)
767 {
768         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
769
770         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
771                          "dstnode:%u\n", state->reqid, state->c->opcode,
772                          state->c->hdr.destnode));
773
774         state->state = CTDB_CONTROL_TIMEOUT;
775
776         /* if we had a callback registered for this control, pull the response
777            and call the callback.
778         */
779         if (state->async.fn) {
780                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
781         }
782 }
783
784 /* async version of send control request */
785 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
786                 uint32_t destnode, uint64_t srvid, 
787                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
788                 TALLOC_CTX *mem_ctx,
789                 struct timeval *timeout,
790                 char **errormsg)
791 {
792         struct ctdb_client_control_state *state;
793         size_t len;
794         struct ctdb_req_control *c;
795         int ret;
796
797         if (errormsg) {
798                 *errormsg = NULL;
799         }
800
801         /* if the domain socket is not yet open, open it */
802         if (ctdb->daemon.sd==-1) {
803                 ctdb_socket_connect(ctdb);
804         }
805
806         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
807         CTDB_NO_MEMORY_NULL(ctdb, state);
808
809         state->ctdb       = ctdb;
810         state->reqid      = ctdb_reqid_new(ctdb, state);
811         state->state      = CTDB_CONTROL_WAIT;
812         state->errormsg   = NULL;
813
814         talloc_set_destructor(state, ctdb_control_destructor);
815
816         len = offsetof(struct ctdb_req_control, data) + data.dsize;
817         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
818                                len, struct ctdb_req_control);
819         state->c            = c;        
820         CTDB_NO_MEMORY_NULL(ctdb, c);
821         c->hdr.reqid        = state->reqid;
822         c->hdr.destnode     = destnode;
823         c->opcode           = opcode;
824         c->client_id        = 0;
825         c->flags            = flags;
826         c->srvid            = srvid;
827         c->datalen          = data.dsize;
828         if (data.dsize) {
829                 memcpy(&c->data[0], data.dptr, data.dsize);
830         }
831
832         /* timeout */
833         if (timeout && !timeval_is_zero(timeout)) {
834                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
835         }
836
837         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
838         if (ret != 0) {
839                 talloc_free(state);
840                 return NULL;
841         }
842
843         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
844                 talloc_free(state);
845                 return NULL;
846         }
847
848         return state;
849 }
850
851
852 /* async version of receive control reply */
853 int ctdb_control_recv(struct ctdb_context *ctdb, 
854                 struct ctdb_client_control_state *state, 
855                 TALLOC_CTX *mem_ctx,
856                 TDB_DATA *outdata, int32_t *status, char **errormsg)
857 {
858         TALLOC_CTX *tmp_ctx;
859
860         if (status != NULL) {
861                 *status = -1;
862         }
863         if (errormsg != NULL) {
864                 *errormsg = NULL;
865         }
866
867         if (state == NULL) {
868                 return -1;
869         }
870
871         /* prevent double free of state */
872         tmp_ctx = talloc_new(ctdb);
873         talloc_steal(tmp_ctx, state);
874
875         /* loop one event at a time until we either timeout or the control
876            completes.
877         */
878         while (state->state == CTDB_CONTROL_WAIT) {
879                 event_loop_once(ctdb->ev);
880         }
881
882         if (state->state != CTDB_CONTROL_DONE) {
883                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
884                 if (state->async.fn) {
885                         state->async.fn(state);
886                 }
887                 talloc_free(tmp_ctx);
888                 return -1;
889         }
890
891         if (state->errormsg) {
892                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
893                 if (errormsg) {
894                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
895                 }
896                 if (state->async.fn) {
897                         state->async.fn(state);
898                 }
899                 talloc_free(tmp_ctx);
900                 return -1;
901         }
902
903         if (outdata) {
904                 *outdata = state->outdata;
905                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
906         }
907
908         if (status) {
909                 *status = state->status;
910         }
911
912         if (state->async.fn) {
913                 state->async.fn(state);
914         }
915
916         talloc_free(tmp_ctx);
917         return 0;
918 }
919
920
921
922 /*
923   send a ctdb control message
924   timeout specifies how long we should wait for a reply.
925   if timeout is NULL we wait indefinitely
926  */
927 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
928                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
929                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
930                  struct timeval *timeout,
931                  char **errormsg)
932 {
933         struct ctdb_client_control_state *state;
934
935         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
936                         flags, data, mem_ctx,
937                         timeout, errormsg);
938         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
939                         errormsg);
940 }
941
942
943
944
945 /*
946   a process exists call. Returns 0 if process exists, -1 otherwise
947  */
948 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
949 {
950         int ret;
951         TDB_DATA data;
952         int32_t status;
953
954         data.dptr = (uint8_t*)&pid;
955         data.dsize = sizeof(pid);
956
957         ret = ctdb_control(ctdb, destnode, 0, 
958                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
959                            NULL, NULL, &status, NULL, NULL);
960         if (ret != 0) {
961                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
962                 return -1;
963         }
964
965         return status;
966 }
967
968 /*
969   get remote statistics
970  */
971 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
972 {
973         int ret;
974         TDB_DATA data;
975         int32_t res;
976
977         ret = ctdb_control(ctdb, destnode, 0, 
978                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
979                            ctdb, &data, &res, NULL, NULL);
980         if (ret != 0 || res != 0) {
981                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
982                 return -1;
983         }
984
985         if (data.dsize != sizeof(struct ctdb_statistics)) {
986                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
987                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
988                       return -1;
989         }
990
991         *status = *(struct ctdb_statistics *)data.dptr;
992         talloc_free(data.dptr);
993                         
994         return 0;
995 }
996
997 /*
998   shutdown a remote ctdb node
999  */
1000 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1001 {
1002         struct ctdb_client_control_state *state;
1003
1004         state = ctdb_control_send(ctdb, destnode, 0, 
1005                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1006                            NULL, &timeout, NULL);
1007         if (state == NULL) {
1008                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1009                 return -1;
1010         }
1011
1012         return 0;
1013 }
1014
1015 /*
1016   get vnn map from a remote node
1017  */
1018 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1019 {
1020         int ret;
1021         TDB_DATA outdata;
1022         int32_t res;
1023         struct ctdb_vnn_map_wire *map;
1024
1025         ret = ctdb_control(ctdb, destnode, 0, 
1026                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1027                            mem_ctx, &outdata, &res, &timeout, NULL);
1028         if (ret != 0 || res != 0) {
1029                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1030                 return -1;
1031         }
1032         
1033         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1034         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1035             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1036                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1037                 return -1;
1038         }
1039
1040         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1041         CTDB_NO_MEMORY(ctdb, *vnnmap);
1042         (*vnnmap)->generation = map->generation;
1043         (*vnnmap)->size       = map->size;
1044         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1045
1046         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1047         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1048         talloc_free(outdata.dptr);
1049                     
1050         return 0;
1051 }
1052
1053
1054 /*
1055   get the recovery mode of a remote node
1056  */
1057 struct ctdb_client_control_state *
1058 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1059 {
1060         return ctdb_control_send(ctdb, destnode, 0, 
1061                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1062                            mem_ctx, &timeout, NULL);
1063 }
1064
1065 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1066 {
1067         int ret;
1068         int32_t res;
1069
1070         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1071         if (ret != 0) {
1072                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1073                 return -1;
1074         }
1075
1076         if (recmode) {
1077                 *recmode = (uint32_t)res;
1078         }
1079
1080         return 0;
1081 }
1082
1083 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1084 {
1085         struct ctdb_client_control_state *state;
1086
1087         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1088         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1089 }
1090
1091
1092
1093
1094 /*
1095   set the recovery mode of a remote node
1096  */
1097 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1098 {
1099         int ret;
1100         TDB_DATA data;
1101         int32_t res;
1102
1103         data.dsize = sizeof(uint32_t);
1104         data.dptr = (unsigned char *)&recmode;
1105
1106         ret = ctdb_control(ctdb, destnode, 0, 
1107                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1108                            NULL, NULL, &res, &timeout, NULL);
1109         if (ret != 0 || res != 0) {
1110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1111                 return -1;
1112         }
1113
1114         return 0;
1115 }
1116
1117
1118
1119 /*
1120   get the recovery master of a remote node
1121  */
1122 struct ctdb_client_control_state *
1123 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1124                         struct timeval timeout, uint32_t destnode)
1125 {
1126         return ctdb_control_send(ctdb, destnode, 0, 
1127                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1128                            mem_ctx, &timeout, NULL);
1129 }
1130
1131 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1132 {
1133         int ret;
1134         int32_t res;
1135
1136         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1137         if (ret != 0) {
1138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1139                 return -1;
1140         }
1141
1142         if (recmaster) {
1143                 *recmaster = (uint32_t)res;
1144         }
1145
1146         return 0;
1147 }
1148
1149 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1150 {
1151         struct ctdb_client_control_state *state;
1152
1153         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1154         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1155 }
1156
1157
1158 /*
1159   set the recovery master of a remote node
1160  */
1161 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1162 {
1163         int ret;
1164         TDB_DATA data;
1165         int32_t res;
1166
1167         ZERO_STRUCT(data);
1168         data.dsize = sizeof(uint32_t);
1169         data.dptr = (unsigned char *)&recmaster;
1170
1171         ret = ctdb_control(ctdb, destnode, 0, 
1172                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1173                            NULL, NULL, &res, &timeout, NULL);
1174         if (ret != 0 || res != 0) {
1175                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1176                 return -1;
1177         }
1178
1179         return 0;
1180 }
1181
1182
1183 /*
1184   get a list of databases off a remote node
1185  */
1186 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1187                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1188 {
1189         int ret;
1190         TDB_DATA outdata;
1191         int32_t res;
1192
1193         ret = ctdb_control(ctdb, destnode, 0, 
1194                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1195                            mem_ctx, &outdata, &res, &timeout, NULL);
1196         if (ret != 0 || res != 0) {
1197                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1198                 return -1;
1199         }
1200
1201         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1202         talloc_free(outdata.dptr);
1203                     
1204         return 0;
1205 }
1206
1207 /*
1208   get a list of nodes (vnn and flags ) from a remote node
1209  */
1210 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1211                 struct timeval timeout, uint32_t destnode, 
1212                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1213 {
1214         int ret;
1215         TDB_DATA outdata;
1216         int32_t res;
1217
1218         ret = ctdb_control(ctdb, destnode, 0, 
1219                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1220                            mem_ctx, &outdata, &res, &timeout, NULL);
1221         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1222                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1223                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1224         }
1225         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1226                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1227                 return -1;
1228         }
1229
1230         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1231         talloc_free(outdata.dptr);
1232                     
1233         return 0;
1234 }
1235
1236 /*
1237   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1238  */
1239 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1240                 struct timeval timeout, uint32_t destnode, 
1241                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1242 {
1243         int ret, i, len;
1244         TDB_DATA outdata;
1245         struct ctdb_node_mapv4 *nodemapv4;
1246         int32_t res;
1247
1248         ret = ctdb_control(ctdb, destnode, 0, 
1249                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1250                            mem_ctx, &outdata, &res, &timeout, NULL);
1251         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1253                 return -1;
1254         }
1255
1256         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1257
1258         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1259         (*nodemap) = talloc_zero_size(mem_ctx, len);
1260         CTDB_NO_MEMORY(ctdb, (*nodemap));
1261
1262         (*nodemap)->num = nodemapv4->num;
1263         for (i=0; i<nodemapv4->num; i++) {
1264                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1265                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1266                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1267                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1268         }
1269                 
1270         talloc_free(outdata.dptr);
1271                     
1272         return 0;
1273 }
1274
1275 /*
1276   drop the transport, reload the nodes file and restart the transport
1277  */
1278 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1279                     struct timeval timeout, uint32_t destnode)
1280 {
1281         int ret;
1282         int32_t res;
1283
1284         ret = ctdb_control(ctdb, destnode, 0, 
1285                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1286                            NULL, NULL, &res, &timeout, NULL);
1287         if (ret != 0 || res != 0) {
1288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1289                 return -1;
1290         }
1291
1292         return 0;
1293 }
1294
1295
1296 /*
1297   set vnn map on a node
1298  */
1299 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1300                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1301 {
1302         int ret;
1303         TDB_DATA data;
1304         int32_t res;
1305         struct ctdb_vnn_map_wire *map;
1306         size_t len;
1307
1308         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1309         map = talloc_size(mem_ctx, len);
1310         CTDB_NO_MEMORY(ctdb, map);
1311
1312         map->generation = vnnmap->generation;
1313         map->size = vnnmap->size;
1314         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1315         
1316         data.dsize = len;
1317         data.dptr  = (uint8_t *)map;
1318
1319         ret = ctdb_control(ctdb, destnode, 0, 
1320                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1321                            NULL, NULL, &res, &timeout, NULL);
1322         if (ret != 0 || res != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1324                 return -1;
1325         }
1326
1327         talloc_free(map);
1328
1329         return 0;
1330 }
1331
1332
1333 /*
1334   async send for pull database
1335  */
1336 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1337         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1338         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1339 {
1340         TDB_DATA indata;
1341         struct ctdb_control_pulldb *pull;
1342         struct ctdb_client_control_state *state;
1343
1344         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1345         CTDB_NO_MEMORY_NULL(ctdb, pull);
1346
1347         pull->db_id   = dbid;
1348         pull->lmaster = lmaster;
1349
1350         indata.dsize = sizeof(struct ctdb_control_pulldb);
1351         indata.dptr  = (unsigned char *)pull;
1352
1353         state = ctdb_control_send(ctdb, destnode, 0, 
1354                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1355                                   mem_ctx, &timeout, NULL);
1356         talloc_free(pull);
1357
1358         return state;
1359 }
1360
1361 /*
1362   async recv for pull database
1363  */
1364 int ctdb_ctrl_pulldb_recv(
1365         struct ctdb_context *ctdb, 
1366         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1367         TDB_DATA *outdata)
1368 {
1369         int ret;
1370         int32_t res;
1371
1372         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1373         if ( (ret != 0) || (res != 0) ){
1374                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1375                 return -1;
1376         }
1377
1378         return 0;
1379 }
1380
1381 /*
1382   pull all keys and records for a specific database on a node
1383  */
1384 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1385                 uint32_t dbid, uint32_t lmaster, 
1386                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1387                 TDB_DATA *outdata)
1388 {
1389         struct ctdb_client_control_state *state;
1390
1391         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1392                                       timeout);
1393         
1394         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1395 }
1396
1397
1398 /*
1399   change dmaster for all keys in the database to the new value
1400  */
1401 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1402                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1403 {
1404         int ret;
1405         TDB_DATA indata;
1406         int32_t res;
1407
1408         indata.dsize = 2*sizeof(uint32_t);
1409         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1410
1411         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1412         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1413
1414         ret = ctdb_control(ctdb, destnode, 0, 
1415                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1416                            NULL, NULL, &res, &timeout, NULL);
1417         if (ret != 0 || res != 0) {
1418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1419                 return -1;
1420         }
1421
1422         return 0;
1423 }
1424
1425 /*
1426   ping a node, return number of clients connected
1427  */
1428 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1429 {
1430         int ret;
1431         int32_t res;
1432
1433         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1434                            tdb_null, NULL, NULL, &res, NULL, NULL);
1435         if (ret != 0) {
1436                 return -1;
1437         }
1438         return res;
1439 }
1440
1441 /*
1442   find the real path to a ltdb 
1443  */
1444 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1445                    const char **path)
1446 {
1447         int ret;
1448         int32_t res;
1449         TDB_DATA data;
1450
1451         data.dptr = (uint8_t *)&dbid;
1452         data.dsize = sizeof(dbid);
1453
1454         ret = ctdb_control(ctdb, destnode, 0, 
1455                            CTDB_CONTROL_GETDBPATH, 0, data, 
1456                            mem_ctx, &data, &res, &timeout, NULL);
1457         if (ret != 0 || res != 0) {
1458                 return -1;
1459         }
1460
1461         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1462         if ((*path) == NULL) {
1463                 return -1;
1464         }
1465
1466         talloc_free(data.dptr);
1467
1468         return 0;
1469 }
1470
1471 /*
1472   find the name of a db 
1473  */
1474 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1475                    const char **name)
1476 {
1477         int ret;
1478         int32_t res;
1479         TDB_DATA data;
1480
1481         data.dptr = (uint8_t *)&dbid;
1482         data.dsize = sizeof(dbid);
1483
1484         ret = ctdb_control(ctdb, destnode, 0, 
1485                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1486                            mem_ctx, &data, &res, &timeout, NULL);
1487         if (ret != 0 || res != 0) {
1488                 return -1;
1489         }
1490
1491         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1492         if ((*name) == NULL) {
1493                 return -1;
1494         }
1495
1496         talloc_free(data.dptr);
1497
1498         return 0;
1499 }
1500
1501 /*
1502   get the health status of a db
1503  */
1504 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1505                           struct timeval timeout,
1506                           uint32_t destnode,
1507                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1508                           const char **reason)
1509 {
1510         int ret;
1511         int32_t res;
1512         TDB_DATA data;
1513
1514         data.dptr = (uint8_t *)&dbid;
1515         data.dsize = sizeof(dbid);
1516
1517         ret = ctdb_control(ctdb, destnode, 0,
1518                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1519                            mem_ctx, &data, &res, &timeout, NULL);
1520         if (ret != 0 || res != 0) {
1521                 return -1;
1522         }
1523
1524         if (data.dsize == 0) {
1525                 (*reason) = NULL;
1526                 return 0;
1527         }
1528
1529         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1530         if ((*reason) == NULL) {
1531                 return -1;
1532         }
1533
1534         talloc_free(data.dptr);
1535
1536         return 0;
1537 }
1538
1539 /*
1540   create a database
1541  */
1542 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1543                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1544 {
1545         int ret;
1546         int32_t res;
1547         TDB_DATA data;
1548
1549         data.dptr = discard_const(name);
1550         data.dsize = strlen(name)+1;
1551
1552         ret = ctdb_control(ctdb, destnode, 0, 
1553                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1554                            0, data, 
1555                            mem_ctx, &data, &res, &timeout, NULL);
1556
1557         if (ret != 0 || res != 0) {
1558                 return -1;
1559         }
1560
1561         return 0;
1562 }
1563
1564 /*
1565   get debug level on a node
1566  */
1567 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1568 {
1569         int ret;
1570         int32_t res;
1571         TDB_DATA data;
1572
1573         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1574                            ctdb, &data, &res, NULL, NULL);
1575         if (ret != 0 || res != 0) {
1576                 return -1;
1577         }
1578         if (data.dsize != sizeof(int32_t)) {
1579                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1580                          (unsigned)data.dsize));
1581                 return -1;
1582         }
1583         *level = *(int32_t *)data.dptr;
1584         talloc_free(data.dptr);
1585         return 0;
1586 }
1587
1588 /*
1589   set debug level on a node
1590  */
1591 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1592 {
1593         int ret;
1594         int32_t res;
1595         TDB_DATA data;
1596
1597         data.dptr = (uint8_t *)&level;
1598         data.dsize = sizeof(level);
1599
1600         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1601                            NULL, NULL, &res, NULL, NULL);
1602         if (ret != 0 || res != 0) {
1603                 return -1;
1604         }
1605         return 0;
1606 }
1607
1608
1609 /*
1610   get a list of connected nodes
1611  */
1612 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1613                                 struct timeval timeout,
1614                                 TALLOC_CTX *mem_ctx,
1615                                 uint32_t *num_nodes)
1616 {
1617         struct ctdb_node_map *map=NULL;
1618         int ret, i;
1619         uint32_t *nodes;
1620
1621         *num_nodes = 0;
1622
1623         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1624         if (ret != 0) {
1625                 return NULL;
1626         }
1627
1628         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1629         if (nodes == NULL) {
1630                 return NULL;
1631         }
1632
1633         for (i=0;i<map->num;i++) {
1634                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1635                         nodes[*num_nodes] = map->nodes[i].pnn;
1636                         (*num_nodes)++;
1637                 }
1638         }
1639
1640         return nodes;
1641 }
1642
1643
1644 /*
1645   reset remote status
1646  */
1647 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1648 {
1649         int ret;
1650         int32_t res;
1651
1652         ret = ctdb_control(ctdb, destnode, 0, 
1653                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1654                            NULL, NULL, &res, NULL, NULL);
1655         if (ret != 0 || res != 0) {
1656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1657                 return -1;
1658         }
1659         return 0;
1660 }
1661
1662 /*
1663   this is the dummy null procedure that all databases support
1664 */
1665 static int ctdb_null_func(struct ctdb_call_info *call)
1666 {
1667         return 0;
1668 }
1669
1670 /*
1671   this is a plain fetch procedure that all databases support
1672 */
1673 static int ctdb_fetch_func(struct ctdb_call_info *call)
1674 {
1675         call->reply_data = &call->record_data;
1676         return 0;
1677 }
1678
1679 /*
1680   attach to a specific database - client call
1681 */
1682 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1683 {
1684         struct ctdb_db_context *ctdb_db;
1685         TDB_DATA data;
1686         int ret;
1687         int32_t res;
1688
1689         ctdb_db = ctdb_db_handle(ctdb, name);
1690         if (ctdb_db) {
1691                 return ctdb_db;
1692         }
1693
1694         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1695         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1696
1697         ctdb_db->ctdb = ctdb;
1698         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1699         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1700
1701         data.dptr = discard_const(name);
1702         data.dsize = strlen(name)+1;
1703
1704         /* tell ctdb daemon to attach */
1705         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1706                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1707                            0, data, ctdb_db, &data, &res, NULL, NULL);
1708         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1709                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1710                 talloc_free(ctdb_db);
1711                 return NULL;
1712         }
1713         
1714         ctdb_db->db_id = *(uint32_t *)data.dptr;
1715         talloc_free(data.dptr);
1716
1717         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1718         if (ret != 0) {
1719                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1720                 talloc_free(ctdb_db);
1721                 return NULL;
1722         }
1723
1724         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1725         if (ctdb->valgrinding) {
1726                 tdb_flags |= TDB_NOMMAP;
1727         }
1728         tdb_flags |= TDB_DISALLOW_NESTING;
1729
1730         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1731         if (ctdb_db->ltdb == NULL) {
1732                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1733                 talloc_free(ctdb_db);
1734                 return NULL;
1735         }
1736
1737         ctdb_db->persistent = persistent;
1738
1739         DLIST_ADD(ctdb->db_list, ctdb_db);
1740
1741         /* add well known functions */
1742         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1743         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1744
1745         return ctdb_db;
1746 }
1747
1748
1749 /*
1750   setup a call for a database
1751  */
1752 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1753 {
1754         struct ctdb_registered_call *call;
1755
1756 #if 0
1757         TDB_DATA data;
1758         int32_t status;
1759         struct ctdb_control_set_call c;
1760         int ret;
1761
1762         /* this is no longer valid with the separate daemon architecture */
1763         c.db_id = ctdb_db->db_id;
1764         c.fn    = fn;
1765         c.id    = id;
1766
1767         data.dptr = (uint8_t *)&c;
1768         data.dsize = sizeof(c);
1769
1770         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1771                            data, NULL, NULL, &status, NULL, NULL);
1772         if (ret != 0 || status != 0) {
1773                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1774                 return -1;
1775         }
1776 #endif
1777
1778         /* also register locally */
1779         call = talloc(ctdb_db, struct ctdb_registered_call);
1780         call->fn = fn;
1781         call->id = id;
1782
1783         DLIST_ADD(ctdb_db->calls, call);        
1784         return 0;
1785 }
1786
1787
1788 struct traverse_state {
1789         bool done;
1790         uint32_t count;
1791         ctdb_traverse_func fn;
1792         void *private_data;
1793 };
1794
1795 /*
1796   called on each key during a ctdb_traverse
1797  */
1798 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1799 {
1800         struct traverse_state *state = (struct traverse_state *)p;
1801         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1802         TDB_DATA key;
1803
1804         if (data.dsize < sizeof(uint32_t) ||
1805             d->length != data.dsize) {
1806                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1807                 state->done = True;
1808                 return;
1809         }
1810
1811         key.dsize = d->keylen;
1812         key.dptr  = &d->data[0];
1813         data.dsize = d->datalen;
1814         data.dptr = &d->data[d->keylen];
1815
1816         if (key.dsize == 0 && data.dsize == 0) {
1817                 /* end of traverse */
1818                 state->done = True;
1819                 return;
1820         }
1821
1822         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1823                 /* empty records are deleted records in ctdb */
1824                 return;
1825         }
1826
1827         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1828                 state->done = True;
1829         }
1830
1831         state->count++;
1832 }
1833
1834
1835 /*
1836   start a cluster wide traverse, calling the supplied fn on each record
1837   return the number of records traversed, or -1 on error
1838  */
1839 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1840 {
1841         TDB_DATA data;
1842         struct ctdb_traverse_start t;
1843         int32_t status;
1844         int ret;
1845         uint64_t srvid = (getpid() | 0xFLL<<60);
1846         struct traverse_state state;
1847
1848         state.done = False;
1849         state.count = 0;
1850         state.private_data = private_data;
1851         state.fn = fn;
1852
1853         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1854         if (ret != 0) {
1855                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1856                 return -1;
1857         }
1858
1859         t.db_id = ctdb_db->db_id;
1860         t.srvid = srvid;
1861         t.reqid = 0;
1862
1863         data.dptr = (uint8_t *)&t;
1864         data.dsize = sizeof(t);
1865
1866         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1867                            data, NULL, NULL, &status, NULL, NULL);
1868         if (ret != 0 || status != 0) {
1869                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1870                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1871                 return -1;
1872         }
1873
1874         while (!state.done) {
1875                 event_loop_once(ctdb_db->ctdb->ev);
1876         }
1877
1878         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1879         if (ret != 0) {
1880                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1881                 return -1;
1882         }
1883
1884         return state.count;
1885 }
1886
1887 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1888 /*
1889   called on each key during a catdb
1890  */
1891 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1892 {
1893         int i;
1894         FILE *f = (FILE *)p;
1895         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1896
1897         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1898         for (i=0;i<key.dsize;i++) {
1899                 if (ISASCII(key.dptr[i])) {
1900                         fprintf(f, "%c", key.dptr[i]);
1901                 } else {
1902                         fprintf(f, "\\%02X", key.dptr[i]);
1903                 }
1904         }
1905         fprintf(f, "\"\n");
1906
1907         fprintf(f, "dmaster: %u\n", h->dmaster);
1908         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1909
1910         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1911         for (i=sizeof(*h);i<data.dsize;i++) {
1912                 if (ISASCII(data.dptr[i])) {
1913                         fprintf(f, "%c", data.dptr[i]);
1914                 } else {
1915                         fprintf(f, "\\%02X", data.dptr[i]);
1916                 }
1917         }
1918         fprintf(f, "\"\n");
1919
1920         fprintf(f, "\n");
1921
1922         return 0;
1923 }
1924
1925 /*
1926   convenience function to list all keys to stdout
1927  */
1928 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1929 {
1930         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1931 }
1932
1933 /*
1934   get the pid of a ctdb daemon
1935  */
1936 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1937 {
1938         int ret;
1939         int32_t res;
1940
1941         ret = ctdb_control(ctdb, destnode, 0, 
1942                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1943                            NULL, NULL, &res, &timeout, NULL);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1946                 return -1;
1947         }
1948
1949         *pid = res;
1950
1951         return 0;
1952 }
1953
1954
1955 /*
1956   async freeze send control
1957  */
1958 struct ctdb_client_control_state *
1959 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1960 {
1961         return ctdb_control_send(ctdb, destnode, priority, 
1962                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1963                            mem_ctx, &timeout, NULL);
1964 }
1965
1966 /* 
1967    async freeze recv control
1968 */
1969 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1970 {
1971         int ret;
1972         int32_t res;
1973
1974         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1975         if ( (ret != 0) || (res != 0) ){
1976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1977                 return -1;
1978         }
1979
1980         return 0;
1981 }
1982
1983 /*
1984   freeze databases of a certain priority
1985  */
1986 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1987 {
1988         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1989         struct ctdb_client_control_state *state;
1990         int ret;
1991
1992         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1993         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1994         talloc_free(tmp_ctx);
1995
1996         return ret;
1997 }
1998
1999 /* Freeze all databases */
2000 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2001 {
2002         int i;
2003
2004         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2005                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2006                         return -1;
2007                 }
2008         }
2009         return 0;
2010 }
2011
2012 /*
2013   thaw databases of a certain priority
2014  */
2015 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2016 {
2017         int ret;
2018         int32_t res;
2019
2020         ret = ctdb_control(ctdb, destnode, priority, 
2021                            CTDB_CONTROL_THAW, 0, tdb_null, 
2022                            NULL, NULL, &res, &timeout, NULL);
2023         if (ret != 0 || res != 0) {
2024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2025                 return -1;
2026         }
2027
2028         return 0;
2029 }
2030
2031 /* thaw all databases */
2032 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2033 {
2034         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2035 }
2036
2037 /*
2038   get pnn of a node, or -1
2039  */
2040 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2041 {
2042         int ret;
2043         int32_t res;
2044
2045         ret = ctdb_control(ctdb, destnode, 0, 
2046                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2047                            NULL, NULL, &res, &timeout, NULL);
2048         if (ret != 0) {
2049                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2050                 return -1;
2051         }
2052
2053         return res;
2054 }
2055
2056 /*
2057   get the monitoring mode of a remote node
2058  */
2059 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2060 {
2061         int ret;
2062         int32_t res;
2063
2064         ret = ctdb_control(ctdb, destnode, 0, 
2065                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2066                            NULL, NULL, &res, &timeout, NULL);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2069                 return -1;
2070         }
2071
2072         *monmode = res;
2073
2074         return 0;
2075 }
2076
2077
2078 /*
2079  set the monitoring mode of a remote node to active
2080  */
2081 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2082 {
2083         int ret;
2084         
2085
2086         ret = ctdb_control(ctdb, destnode, 0, 
2087                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2088                            NULL, NULL,NULL, &timeout, NULL);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2091                 return -1;
2092         }
2093
2094         
2095
2096         return 0;
2097 }
2098
2099 /*
2100   set the monitoring mode of a remote node to disable
2101  */
2102 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2103 {
2104         int ret;
2105         
2106
2107         ret = ctdb_control(ctdb, destnode, 0, 
2108                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2109                            NULL, NULL, NULL, &timeout, NULL);
2110         if (ret != 0) {
2111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2112                 return -1;
2113         }
2114
2115         
2116
2117         return 0;
2118 }
2119
2120
2121
2122 /* 
2123   sent to a node to make it take over an ip address
2124 */
2125 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2126                           uint32_t destnode, struct ctdb_public_ip *ip)
2127 {
2128         TDB_DATA data;
2129         struct ctdb_public_ipv4 ipv4;
2130         int ret;
2131         int32_t res;
2132
2133         if (ip->addr.sa.sa_family == AF_INET) {
2134                 ipv4.pnn = ip->pnn;
2135                 ipv4.sin = ip->addr.ip;
2136
2137                 data.dsize = sizeof(ipv4);
2138                 data.dptr  = (uint8_t *)&ipv4;
2139
2140                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2141                            NULL, &res, &timeout, NULL);
2142         } else {
2143                 data.dsize = sizeof(*ip);
2144                 data.dptr  = (uint8_t *)ip;
2145
2146                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2147                            NULL, &res, &timeout, NULL);
2148         }
2149
2150         if (ret != 0 || res != 0) {
2151                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2152                 return -1;
2153         }
2154
2155         return 0;       
2156 }
2157
2158
2159 /* 
2160   sent to a node to make it release an ip address
2161 */
2162 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2163                          uint32_t destnode, struct ctdb_public_ip *ip)
2164 {
2165         TDB_DATA data;
2166         struct ctdb_public_ipv4 ipv4;
2167         int ret;
2168         int32_t res;
2169
2170         if (ip->addr.sa.sa_family == AF_INET) {
2171                 ipv4.pnn = ip->pnn;
2172                 ipv4.sin = ip->addr.ip;
2173
2174                 data.dsize = sizeof(ipv4);
2175                 data.dptr  = (uint8_t *)&ipv4;
2176
2177                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2178                                    NULL, &res, &timeout, NULL);
2179         } else {
2180                 data.dsize = sizeof(*ip);
2181                 data.dptr  = (uint8_t *)ip;
2182
2183                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2184                                    NULL, &res, &timeout, NULL);
2185         }
2186
2187         if (ret != 0 || res != 0) {
2188                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2189                 return -1;
2190         }
2191
2192         return 0;       
2193 }
2194
2195
2196 /*
2197   get a tunable
2198  */
2199 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2200                           struct timeval timeout, 
2201                           uint32_t destnode,
2202                           const char *name, uint32_t *value)
2203 {
2204         struct ctdb_control_get_tunable *t;
2205         TDB_DATA data, outdata;
2206         int32_t res;
2207         int ret;
2208
2209         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2210         data.dptr  = talloc_size(ctdb, data.dsize);
2211         CTDB_NO_MEMORY(ctdb, data.dptr);
2212
2213         t = (struct ctdb_control_get_tunable *)data.dptr;
2214         t->length = strlen(name)+1;
2215         memcpy(t->name, name, t->length);
2216
2217         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2218                            &outdata, &res, &timeout, NULL);
2219         talloc_free(data.dptr);
2220         if (ret != 0 || res != 0) {
2221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2222                 return -1;
2223         }
2224
2225         if (outdata.dsize != sizeof(uint32_t)) {
2226                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2227                 talloc_free(outdata.dptr);
2228                 return -1;
2229         }
2230         
2231         *value = *(uint32_t *)outdata.dptr;
2232         talloc_free(outdata.dptr);
2233
2234         return 0;
2235 }
2236
2237 /*
2238   set a tunable
2239  */
2240 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2241                           struct timeval timeout, 
2242                           uint32_t destnode,
2243                           const char *name, uint32_t value)
2244 {
2245         struct ctdb_control_set_tunable *t;
2246         TDB_DATA data;
2247         int32_t res;
2248         int ret;
2249
2250         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2251         data.dptr  = talloc_size(ctdb, data.dsize);
2252         CTDB_NO_MEMORY(ctdb, data.dptr);
2253
2254         t = (struct ctdb_control_set_tunable *)data.dptr;
2255         t->length = strlen(name)+1;
2256         memcpy(t->name, name, t->length);
2257         t->value = value;
2258
2259         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2260                            NULL, &res, &timeout, NULL);
2261         talloc_free(data.dptr);
2262         if (ret != 0 || res != 0) {
2263                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2264                 return -1;
2265         }
2266
2267         return 0;
2268 }
2269
2270 /*
2271   list tunables
2272  */
2273 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2274                             struct timeval timeout, 
2275                             uint32_t destnode,
2276                             TALLOC_CTX *mem_ctx,
2277                             const char ***list, uint32_t *count)
2278 {
2279         TDB_DATA outdata;
2280         int32_t res;
2281         int ret;
2282         struct ctdb_control_list_tunable *t;
2283         char *p, *s, *ptr;
2284
2285         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2286                            mem_ctx, &outdata, &res, &timeout, NULL);
2287         if (ret != 0 || res != 0) {
2288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2289                 return -1;
2290         }
2291
2292         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2293         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2294             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2295                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2296                 talloc_free(outdata.dptr);
2297                 return -1;              
2298         }
2299         
2300         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2301         CTDB_NO_MEMORY(ctdb, p);
2302
2303         talloc_free(outdata.dptr);
2304         
2305         (*list) = NULL;
2306         (*count) = 0;
2307
2308         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2309                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2310                 CTDB_NO_MEMORY(ctdb, *list);
2311                 (*list)[*count] = talloc_strdup(*list, s);
2312                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2313                 (*count)++;
2314         }
2315
2316         talloc_free(p);
2317
2318         return 0;
2319 }
2320
2321
2322 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2323                                    struct timeval timeout, uint32_t destnode,
2324                                    TALLOC_CTX *mem_ctx,
2325                                    uint32_t flags,
2326                                    struct ctdb_all_public_ips **ips)
2327 {
2328         int ret;
2329         TDB_DATA outdata;
2330         int32_t res;
2331
2332         ret = ctdb_control(ctdb, destnode, 0, 
2333                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2334                            mem_ctx, &outdata, &res, &timeout, NULL);
2335         if (ret == 0 && res == -1) {
2336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2337                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2338         }
2339         if (ret != 0 || res != 0) {
2340           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2341                 return -1;
2342         }
2343
2344         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2345         talloc_free(outdata.dptr);
2346                     
2347         return 0;
2348 }
2349
2350 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2351                              struct timeval timeout, uint32_t destnode,
2352                              TALLOC_CTX *mem_ctx,
2353                              struct ctdb_all_public_ips **ips)
2354 {
2355         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2356                                               destnode, mem_ctx,
2357                                               0, ips);
2358 }
2359
2360 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2361                         struct timeval timeout, uint32_t destnode, 
2362                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2363 {
2364         int ret, i, len;
2365         TDB_DATA outdata;
2366         int32_t res;
2367         struct ctdb_all_public_ipsv4 *ipsv4;
2368
2369         ret = ctdb_control(ctdb, destnode, 0, 
2370                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2371                            mem_ctx, &outdata, &res, &timeout, NULL);
2372         if (ret != 0 || res != 0) {
2373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2374                 return -1;
2375         }
2376
2377         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2378         len = offsetof(struct ctdb_all_public_ips, ips) +
2379                 ipsv4->num*sizeof(struct ctdb_public_ip);
2380         *ips = talloc_zero_size(mem_ctx, len);
2381         CTDB_NO_MEMORY(ctdb, *ips);
2382         (*ips)->num = ipsv4->num;
2383         for (i=0; i<ipsv4->num; i++) {
2384                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2385                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2386         }
2387
2388         talloc_free(outdata.dptr);
2389                     
2390         return 0;
2391 }
2392
2393 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2394                                  struct timeval timeout, uint32_t destnode,
2395                                  TALLOC_CTX *mem_ctx,
2396                                  const ctdb_sock_addr *addr,
2397                                  struct ctdb_control_public_ip_info **_info)
2398 {
2399         int ret;
2400         TDB_DATA indata;
2401         TDB_DATA outdata;
2402         int32_t res;
2403         struct ctdb_control_public_ip_info *info;
2404         uint32_t len;
2405         uint32_t i;
2406
2407         indata.dptr = discard_const_p(uint8_t, addr);
2408         indata.dsize = sizeof(*addr);
2409
2410         ret = ctdb_control(ctdb, destnode, 0,
2411                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2412                            mem_ctx, &outdata, &res, &timeout, NULL);
2413         if (ret != 0 || res != 0) {
2414                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2415                                 "failed ret:%d res:%d\n",
2416                                 ret, res));
2417                 return -1;
2418         }
2419
2420         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2421         if (len > outdata.dsize) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2423                                 "returned invalid data with size %u > %u\n",
2424                                 (unsigned int)outdata.dsize,
2425                                 (unsigned int)len));
2426                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2427                 return -1;
2428         }
2429
2430         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2431         len += info->num*sizeof(struct ctdb_control_iface_info);
2432
2433         if (len > outdata.dsize) {
2434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2435                                 "returned invalid data with size %u > %u\n",
2436                                 (unsigned int)outdata.dsize,
2437                                 (unsigned int)len));
2438                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2439                 return -1;
2440         }
2441
2442         /* make sure we null terminate the returned strings */
2443         for (i=0; i < info->num; i++) {
2444                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2445         }
2446
2447         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2448                                                                 outdata.dptr,
2449                                                                 outdata.dsize);
2450         talloc_free(outdata.dptr);
2451         if (*_info == NULL) {
2452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2453                                 "talloc_memdup size %u failed\n",
2454                                 (unsigned int)outdata.dsize));
2455                 return -1;
2456         }
2457
2458         return 0;
2459 }
2460
2461 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2462                          struct timeval timeout, uint32_t destnode,
2463                          TALLOC_CTX *mem_ctx,
2464                          struct ctdb_control_get_ifaces **_ifaces)
2465 {
2466         int ret;
2467         TDB_DATA outdata;
2468         int32_t res;
2469         struct ctdb_control_get_ifaces *ifaces;
2470         uint32_t len;
2471         uint32_t i;
2472
2473         ret = ctdb_control(ctdb, destnode, 0,
2474                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2475                            mem_ctx, &outdata, &res, &timeout, NULL);
2476         if (ret != 0 || res != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2478                                 "failed ret:%d res:%d\n",
2479                                 ret, res));
2480                 return -1;
2481         }
2482
2483         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2484         if (len > outdata.dsize) {
2485                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2486                                 "returned invalid data with size %u > %u\n",
2487                                 (unsigned int)outdata.dsize,
2488                                 (unsigned int)len));
2489                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2490                 return -1;
2491         }
2492
2493         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2494         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2495
2496         if (len > outdata.dsize) {
2497                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2498                                 "returned invalid data with size %u > %u\n",
2499                                 (unsigned int)outdata.dsize,
2500                                 (unsigned int)len));
2501                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2502                 return -1;
2503         }
2504
2505         /* make sure we null terminate the returned strings */
2506         for (i=0; i < ifaces->num; i++) {
2507                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2508         }
2509
2510         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2511                                                                   outdata.dptr,
2512                                                                   outdata.dsize);
2513         talloc_free(outdata.dptr);
2514         if (*_ifaces == NULL) {
2515                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2516                                 "talloc_memdup size %u failed\n",
2517                                 (unsigned int)outdata.dsize));
2518                 return -1;
2519         }
2520
2521         return 0;
2522 }
2523
2524 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2525                              struct timeval timeout, uint32_t destnode,
2526                              TALLOC_CTX *mem_ctx,
2527                              const struct ctdb_control_iface_info *info)
2528 {
2529         int ret;
2530         TDB_DATA indata;
2531         int32_t res;
2532
2533         indata.dptr = discard_const_p(uint8_t, info);
2534         indata.dsize = sizeof(*info);
2535
2536         ret = ctdb_control(ctdb, destnode, 0,
2537                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2538                            mem_ctx, NULL, &res, &timeout, NULL);
2539         if (ret != 0 || res != 0) {
2540                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2541                                 "failed ret:%d res:%d\n",
2542                                 ret, res));
2543                 return -1;
2544         }
2545
2546         return 0;
2547 }
2548
2549 /*
2550   set/clear the permanent disabled bit on a remote node
2551  */
2552 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2553                        uint32_t set, uint32_t clear)
2554 {
2555         int ret;
2556         TDB_DATA data;
2557         struct ctdb_node_map *nodemap=NULL;
2558         struct ctdb_node_flag_change c;
2559         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2560         uint32_t recmaster;
2561         uint32_t *nodes;
2562
2563
2564         /* find the recovery master */
2565         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2566         if (ret != 0) {
2567                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2568                 talloc_free(tmp_ctx);
2569                 return ret;
2570         }
2571
2572
2573         /* read the node flags from the recmaster */
2574         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2575         if (ret != 0) {
2576                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2577                 talloc_free(tmp_ctx);
2578                 return -1;
2579         }
2580         if (destnode >= nodemap->num) {
2581                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2582                 talloc_free(tmp_ctx);
2583                 return -1;
2584         }
2585
2586         c.pnn       = destnode;
2587         c.old_flags = nodemap->nodes[destnode].flags;
2588         c.new_flags = c.old_flags;
2589         c.new_flags |= set;
2590         c.new_flags &= ~clear;
2591
2592         data.dsize = sizeof(c);
2593         data.dptr = (unsigned char *)&c;
2594
2595         /* send the flags update to all connected nodes */
2596         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2597
2598         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2599                                         nodes, 0,
2600                                         timeout, false, data,
2601                                         NULL, NULL,
2602                                         NULL) != 0) {
2603                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2604
2605                 talloc_free(tmp_ctx);
2606                 return -1;
2607         }
2608
2609         talloc_free(tmp_ctx);
2610         return 0;
2611 }
2612
2613
2614 /*
2615   get all tunables
2616  */
2617 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2618                                struct timeval timeout, 
2619                                uint32_t destnode,
2620                                struct ctdb_tunable *tunables)
2621 {
2622         TDB_DATA outdata;
2623         int ret;
2624         int32_t res;
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         if (ret != 0 || res != 0) {
2629                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2630                 return -1;
2631         }
2632
2633         if (outdata.dsize != sizeof(*tunables)) {
2634                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2635                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2636                 return -1;              
2637         }
2638
2639         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2640         talloc_free(outdata.dptr);
2641         return 0;
2642 }
2643
2644 /*
2645   add a public address to a node
2646  */
2647 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2648                       struct timeval timeout, 
2649                       uint32_t destnode,
2650                       struct ctdb_control_ip_iface *pub)
2651 {
2652         TDB_DATA data;
2653         int32_t res;
2654         int ret;
2655
2656         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2657         data.dptr  = (unsigned char *)pub;
2658
2659         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2660                            NULL, &res, &timeout, NULL);
2661         if (ret != 0 || res != 0) {
2662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2663                 return -1;
2664         }
2665
2666         return 0;
2667 }
2668
2669 /*
2670   delete a public address from a node
2671  */
2672 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2673                       struct timeval timeout, 
2674                       uint32_t destnode,
2675                       struct ctdb_control_ip_iface *pub)
2676 {
2677         TDB_DATA data;
2678         int32_t res;
2679         int ret;
2680
2681         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2682         data.dptr  = (unsigned char *)pub;
2683
2684         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2685                            NULL, &res, &timeout, NULL);
2686         if (ret != 0 || res != 0) {
2687                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2688                 return -1;
2689         }
2690
2691         return 0;
2692 }
2693
2694 /*
2695   kill a tcp connection
2696  */
2697 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2698                       struct timeval timeout, 
2699                       uint32_t destnode,
2700                       struct ctdb_control_killtcp *killtcp)
2701 {
2702         TDB_DATA data;
2703         int32_t res;
2704         int ret;
2705
2706         data.dsize = sizeof(struct ctdb_control_killtcp);
2707         data.dptr  = (unsigned char *)killtcp;
2708
2709         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2710                            NULL, &res, &timeout, NULL);
2711         if (ret != 0 || res != 0) {
2712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2713                 return -1;
2714         }
2715
2716         return 0;
2717 }
2718
2719 /*
2720   send a gratious arp
2721  */
2722 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2723                       struct timeval timeout, 
2724                       uint32_t destnode,
2725                       ctdb_sock_addr *addr,
2726                       const char *ifname)
2727 {
2728         TDB_DATA data;
2729         int32_t res;
2730         int ret, len;
2731         struct ctdb_control_gratious_arp *gratious_arp;
2732         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2733
2734
2735         len = strlen(ifname)+1;
2736         gratious_arp = talloc_size(tmp_ctx, 
2737                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2738         CTDB_NO_MEMORY(ctdb, gratious_arp);
2739
2740         gratious_arp->addr = *addr;
2741         gratious_arp->len = len;
2742         memcpy(&gratious_arp->iface[0], ifname, len);
2743
2744
2745         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2746         data.dptr  = (unsigned char *)gratious_arp;
2747
2748         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2749                            NULL, &res, &timeout, NULL);
2750         if (ret != 0 || res != 0) {
2751                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2752                 talloc_free(tmp_ctx);
2753                 return -1;
2754         }
2755
2756         talloc_free(tmp_ctx);
2757         return 0;
2758 }
2759
2760 /*
2761   get a list of all tcp tickles that a node knows about for a particular vnn
2762  */
2763 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2764                               struct timeval timeout, uint32_t destnode, 
2765                               TALLOC_CTX *mem_ctx, 
2766                               ctdb_sock_addr *addr,
2767                               struct ctdb_control_tcp_tickle_list **list)
2768 {
2769         int ret;
2770         TDB_DATA data, outdata;
2771         int32_t status;
2772
2773         data.dptr = (uint8_t*)addr;
2774         data.dsize = sizeof(ctdb_sock_addr);
2775
2776         ret = ctdb_control(ctdb, destnode, 0, 
2777                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2778                            mem_ctx, &outdata, &status, NULL, NULL);
2779         if (ret != 0 || status != 0) {
2780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2781                 return -1;
2782         }
2783
2784         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2785
2786         return status;
2787 }
2788
2789 /*
2790   register a server id
2791  */
2792 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2793                       struct timeval timeout, 
2794                       struct ctdb_server_id *id)
2795 {
2796         TDB_DATA data;
2797         int32_t res;
2798         int ret;
2799
2800         data.dsize = sizeof(struct ctdb_server_id);
2801         data.dptr  = (unsigned char *)id;
2802
2803         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2804                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2805                         0, data, NULL,
2806                         NULL, &res, &timeout, NULL);
2807         if (ret != 0 || res != 0) {
2808                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2809                 return -1;
2810         }
2811
2812         return 0;
2813 }
2814
2815 /*
2816   unregister a server id
2817  */
2818 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2819                       struct timeval timeout, 
2820                       struct ctdb_server_id *id)
2821 {
2822         TDB_DATA data;
2823         int32_t res;
2824         int ret;
2825
2826         data.dsize = sizeof(struct ctdb_server_id);
2827         data.dptr  = (unsigned char *)id;
2828
2829         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2830                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2831                         0, data, NULL,
2832                         NULL, &res, &timeout, NULL);
2833         if (ret != 0 || res != 0) {
2834                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2835                 return -1;
2836         }
2837
2838         return 0;
2839 }
2840
2841
2842 /*
2843   check if a server id exists
2844
2845   if a server id does exist, return *status == 1, otherwise *status == 0
2846  */
2847 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2848                       struct timeval timeout, 
2849                       uint32_t destnode,
2850                       struct ctdb_server_id *id,
2851                       uint32_t *status)
2852 {
2853         TDB_DATA data;
2854         int32_t res;
2855         int ret;
2856
2857         data.dsize = sizeof(struct ctdb_server_id);
2858         data.dptr  = (unsigned char *)id;
2859
2860         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2861                         0, data, NULL,
2862                         NULL, &res, &timeout, NULL);
2863         if (ret != 0) {
2864                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2865                 return -1;
2866         }
2867
2868         if (res) {
2869                 *status = 1;
2870         } else {
2871                 *status = 0;
2872         }
2873
2874         return 0;
2875 }
2876
2877 /*
2878    get the list of server ids that are registered on a node
2879 */
2880 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2881                 TALLOC_CTX *mem_ctx,
2882                 struct timeval timeout, uint32_t destnode, 
2883                 struct ctdb_server_id_list **svid_list)
2884 {
2885         int ret;
2886         TDB_DATA outdata;
2887         int32_t res;
2888
2889         ret = ctdb_control(ctdb, destnode, 0, 
2890                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2891                            mem_ctx, &outdata, &res, &timeout, NULL);
2892         if (ret != 0 || res != 0) {
2893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2894                 return -1;
2895         }
2896
2897         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2898                     
2899         return 0;
2900 }
2901
2902 /*
2903   initialise the ctdb daemon for client applications
2904
2905   NOTE: In current code the daemon does not fork. This is for testing purposes only
2906   and to simplify the code.
2907 */
2908 struct ctdb_context *ctdb_init(struct event_context *ev)
2909 {
2910         int ret;
2911         struct ctdb_context *ctdb;
2912
2913         ctdb = talloc_zero(ev, struct ctdb_context);
2914         if (ctdb == NULL) {
2915                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2916                 return NULL;
2917         }
2918         ctdb->ev  = ev;
2919         ctdb->idr = idr_init(ctdb);
2920         /* Wrap early to exercise code. */
2921         ctdb->lastid = INT_MAX-200;
2922         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2923
2924         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2925         if (ret != 0) {
2926                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2927                 talloc_free(ctdb);
2928                 return NULL;
2929         }
2930
2931         ctdb->statistics.statistics_start_time = timeval_current();
2932
2933         return ctdb;
2934 }
2935
2936
2937 /*
2938   set some ctdb flags
2939 */
2940 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2941 {
2942         ctdb->flags |= flags;
2943 }
2944
2945 /*
2946   setup the local socket name
2947 */
2948 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2949 {
2950         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2951         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2952
2953         return 0;
2954 }
2955
2956 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2957 {
2958         return ctdb->daemon.name;
2959 }
2960
2961 /*
2962   return the pnn of this node
2963 */
2964 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2965 {
2966         return ctdb->pnn;
2967 }
2968
2969
2970 /*
2971   get the uptime of a remote node
2972  */
2973 struct ctdb_client_control_state *
2974 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2975 {
2976         return ctdb_control_send(ctdb, destnode, 0, 
2977                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2978                            mem_ctx, &timeout, NULL);
2979 }
2980
2981 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2982 {
2983         int ret;
2984         int32_t res;
2985         TDB_DATA outdata;
2986
2987         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2988         if (ret != 0 || res != 0) {
2989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2990                 return -1;
2991         }
2992
2993         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2994
2995         return 0;
2996 }
2997
2998 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2999 {
3000         struct ctdb_client_control_state *state;
3001
3002         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3003         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3004 }
3005
3006 /*
3007   send a control to execute the "recovered" event script on a node
3008  */
3009 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3010 {
3011         int ret;
3012         int32_t status;
3013
3014         ret = ctdb_control(ctdb, destnode, 0, 
3015                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3016                            NULL, NULL, &status, &timeout, NULL);
3017         if (ret != 0 || status != 0) {
3018                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3019                 return -1;
3020         }
3021
3022         return 0;
3023 }
3024
3025 /* 
3026   callback for the async helpers used when sending the same control
3027   to multiple nodes in parallell.
3028 */
3029 static void async_callback(struct ctdb_client_control_state *state)
3030 {
3031         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3032         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3033         int ret;
3034         TDB_DATA outdata;
3035         int32_t res;
3036         uint32_t destnode = state->c->hdr.destnode;
3037
3038         /* one more node has responded with recmode data */
3039         data->count--;
3040
3041         /* if we failed to push the db, then return an error and let
3042            the main loop try again.
3043         */
3044         if (state->state != CTDB_CONTROL_DONE) {
3045                 if ( !data->dont_log_errors) {
3046                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3047                 }
3048                 data->fail_count++;
3049                 if (data->fail_callback) {
3050                         data->fail_callback(ctdb, destnode, res, outdata,
3051                                         data->callback_data);
3052                 }
3053                 return;
3054         }
3055         
3056         state->async.fn = NULL;
3057
3058         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3059         if ((ret != 0) || (res != 0)) {
3060                 if ( !data->dont_log_errors) {
3061                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3062                 }
3063                 data->fail_count++;
3064                 if (data->fail_callback) {
3065                         data->fail_callback(ctdb, destnode, res, outdata,
3066                                         data->callback_data);
3067                 }
3068         }
3069         if ((ret == 0) && (data->callback != NULL)) {
3070                 data->callback(ctdb, destnode, res, outdata,
3071                                         data->callback_data);
3072         }
3073 }
3074
3075
3076 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3077 {
3078         /* set up the callback functions */
3079         state->async.fn = async_callback;
3080         state->async.private_data = data;
3081         
3082         /* one more control to wait for to complete */
3083         data->count++;
3084 }
3085
3086
3087 /* wait for up to the maximum number of seconds allowed
3088    or until all nodes we expect a response from has replied
3089 */
3090 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3091 {
3092         while (data->count > 0) {
3093                 event_loop_once(ctdb->ev);
3094         }
3095         if (data->fail_count != 0) {
3096                 if (!data->dont_log_errors) {
3097                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3098                                  data->fail_count));
3099                 }
3100                 return -1;
3101         }
3102         return 0;
3103 }
3104
3105
3106 /* 
3107    perform a simple control on the listed nodes
3108    The control cannot return data
3109  */
3110 int ctdb_client_async_control(struct ctdb_context *ctdb,
3111                                 enum ctdb_controls opcode,
3112                                 uint32_t *nodes,
3113                                 uint64_t srvid,
3114                                 struct timeval timeout,
3115                                 bool dont_log_errors,
3116                                 TDB_DATA data,
3117                                 client_async_callback client_callback,
3118                                 client_async_callback fail_callback,
3119                                 void *callback_data)
3120 {
3121         struct client_async_data *async_data;
3122         struct ctdb_client_control_state *state;
3123         int j, num_nodes;
3124
3125         async_data = talloc_zero(ctdb, struct client_async_data);
3126         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3127         async_data->dont_log_errors = dont_log_errors;
3128         async_data->callback = client_callback;
3129         async_data->fail_callback = fail_callback;
3130         async_data->callback_data = callback_data;
3131         async_data->opcode        = opcode;
3132
3133         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3134
3135         /* loop over all nodes and send an async control to each of them */
3136         for (j=0; j<num_nodes; j++) {
3137                 uint32_t pnn = nodes[j];
3138
3139                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3140                                           0, data, async_data, &timeout, NULL);
3141                 if (state == NULL) {
3142                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3143                         talloc_free(async_data);
3144                         return -1;
3145                 }
3146                 
3147                 ctdb_client_async_add(async_data, state);
3148         }
3149
3150         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3151                 talloc_free(async_data);
3152                 return -1;
3153         }
3154
3155         talloc_free(async_data);
3156         return 0;
3157 }
3158
3159 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3160                                 struct ctdb_vnn_map *vnn_map,
3161                                 TALLOC_CTX *mem_ctx,
3162                                 bool include_self)
3163 {
3164         int i, j, num_nodes;
3165         uint32_t *nodes;
3166
3167         for (i=num_nodes=0;i<vnn_map->size;i++) {
3168                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3169                         continue;
3170                 }
3171                 num_nodes++;
3172         } 
3173
3174         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3175         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3176
3177         for (i=j=0;i<vnn_map->size;i++) {
3178                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3179                         continue;
3180                 }
3181                 nodes[j++] = vnn_map->map[i];
3182         } 
3183
3184         return nodes;
3185 }
3186
3187 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3188                                 struct ctdb_node_map *node_map,
3189                                 TALLOC_CTX *mem_ctx,
3190                                 bool include_self)
3191 {
3192         int i, j, num_nodes;
3193         uint32_t *nodes;
3194
3195         for (i=num_nodes=0;i<node_map->num;i++) {
3196                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3197                         continue;
3198                 }
3199                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3200                         continue;
3201                 }
3202                 num_nodes++;
3203         } 
3204
3205         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3206         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3207
3208         for (i=j=0;i<node_map->num;i++) {
3209                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3210                         continue;
3211                 }
3212                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3213                         continue;
3214                 }
3215                 nodes[j++] = node_map->nodes[i].pnn;
3216         } 
3217
3218         return nodes;
3219 }
3220
3221 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3222                                 struct ctdb_node_map *node_map,
3223                                 TALLOC_CTX *mem_ctx,
3224                                 uint32_t pnn)
3225 {
3226         int i, j, num_nodes;
3227         uint32_t *nodes;
3228
3229         for (i=num_nodes=0;i<node_map->num;i++) {
3230                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3231                         continue;
3232                 }
3233                 if (node_map->nodes[i].pnn == pnn) {
3234                         continue;
3235                 }
3236                 num_nodes++;
3237         } 
3238
3239         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3240         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3241
3242         for (i=j=0;i<node_map->num;i++) {
3243                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3244                         continue;
3245                 }
3246                 if (node_map->nodes[i].pnn == pnn) {
3247                         continue;
3248                 }
3249                 nodes[j++] = node_map->nodes[i].pnn;
3250         } 
3251
3252         return nodes;
3253 }
3254
3255 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3256                                 struct ctdb_node_map *node_map,
3257                                 TALLOC_CTX *mem_ctx,
3258                                 bool include_self)
3259 {
3260         int i, j, num_nodes;
3261         uint32_t *nodes;
3262
3263         for (i=num_nodes=0;i<node_map->num;i++) {
3264                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3265                         continue;
3266                 }
3267                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3268                         continue;
3269                 }
3270                 num_nodes++;
3271         } 
3272
3273         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3274         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3275
3276         for (i=j=0;i<node_map->num;i++) {
3277                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3278                         continue;
3279                 }
3280                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3281                         continue;
3282                 }
3283                 nodes[j++] = node_map->nodes[i].pnn;
3284         } 
3285
3286         return nodes;
3287 }
3288
3289 /* 
3290   this is used to test if a pnn lock exists and if it exists will return
3291   the number of connections that pnn has reported or -1 if that recovery
3292   daemon is not running.
3293 */
3294 int
3295 ctdb_read_pnn_lock(int fd, int32_t pnn)
3296 {
3297         struct flock lock;
3298         char c;
3299
3300         lock.l_type = F_WRLCK;
3301         lock.l_whence = SEEK_SET;
3302         lock.l_start = pnn;
3303         lock.l_len = 1;
3304         lock.l_pid = 0;
3305
3306         if (fcntl(fd, F_GETLK, &lock) != 0) {
3307                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3308                 return -1;
3309         }
3310
3311         if (lock.l_type == F_UNLCK) {
3312                 return -1;
3313         }
3314
3315         if (pread(fd, &c, 1, pnn) == -1) {
3316                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3317                 return -1;
3318         }
3319
3320         return c;
3321 }
3322
3323 /*
3324   get capabilities of a remote node
3325  */
3326 struct ctdb_client_control_state *
3327 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3328 {
3329         return ctdb_control_send(ctdb, destnode, 0, 
3330                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3331                            mem_ctx, &timeout, NULL);
3332 }
3333
3334 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3335 {
3336         int ret;
3337         int32_t res;
3338         TDB_DATA outdata;
3339
3340         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3341         if ( (ret != 0) || (res != 0) ) {
3342                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3343                 return -1;
3344         }
3345
3346         if (capabilities) {
3347                 *capabilities = *((uint32_t *)outdata.dptr);
3348         }
3349
3350         return 0;
3351 }
3352
3353 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3354 {
3355         struct ctdb_client_control_state *state;
3356         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3357         int ret;
3358
3359         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3360         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3361         talloc_free(tmp_ctx);
3362         return ret;
3363 }
3364
3365 /**
3366  * check whether a transaction is active on a given db on a given node
3367  */
3368 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3369                                      uint32_t destnode,
3370                                      uint32_t db_id)
3371 {
3372         int32_t status;
3373         int ret;
3374         TDB_DATA indata;
3375
3376         indata.dptr = (uint8_t *)&db_id;
3377         indata.dsize = sizeof(db_id);
3378
3379         ret = ctdb_control(ctdb, destnode, 0,
3380                            CTDB_CONTROL_TRANS2_ACTIVE,
3381                            0, indata, NULL, NULL, &status,
3382                            NULL, NULL);
3383
3384         if (ret != 0) {
3385                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3386                 return -1;
3387         }
3388
3389         return status;
3390 }
3391
3392
3393 struct ctdb_transaction_handle {
3394         struct ctdb_db_context *ctdb_db;
3395         bool in_replay;
3396         /*
3397          * we store the reads and writes done under a transaction:
3398          * - one list stores both reads and writes (m_all),
3399          * - the other just writes (m_write)
3400          */
3401         struct ctdb_marshall_buffer *m_all;
3402         struct ctdb_marshall_buffer *m_write;
3403 };
3404
3405 /* start a transaction on a database */
3406 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3407 {
3408         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3409         return 0;
3410 }
3411
3412 /* start a transaction on a database */
3413 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3414 {
3415         struct ctdb_record_handle *rh;
3416         TDB_DATA key;
3417         TDB_DATA data;
3418         struct ctdb_ltdb_header header;
3419         TALLOC_CTX *tmp_ctx;
3420         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3421         int ret;
3422         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3423         pid_t pid;
3424         int32_t status;
3425
3426         key.dptr = discard_const(keyname);
3427         key.dsize = strlen(keyname);
3428
3429         if (!ctdb_db->persistent) {
3430                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3431                 return -1;
3432         }
3433
3434 again:
3435         tmp_ctx = talloc_new(h);
3436
3437         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3438         if (rh == NULL) {
3439                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3440                 talloc_free(tmp_ctx);
3441                 return -1;
3442         }
3443
3444         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3445                                               CTDB_CURRENT_NODE,
3446                                               ctdb_db->db_id);
3447         if (status == 1) {
3448                 unsigned long int usec = (1000 + random()) % 100000;
3449                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3450                                     "on db_id[0x%08x]. waiting for %lu "
3451                                     "microseconds\n",
3452                                     ctdb_db->db_id, usec));
3453                 talloc_free(tmp_ctx);
3454                 usleep(usec);
3455                 goto again;
3456         }
3457
3458         /*
3459          * store the pid in the database:
3460          * it is not enough that the node is dmaster...
3461          */
3462         pid = getpid();
3463         data.dptr = (unsigned char *)&pid;
3464         data.dsize = sizeof(pid_t);
3465         rh->header.rsn++;
3466         rh->header.dmaster = ctdb_db->ctdb->pnn;
3467         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3468         if (ret != 0) {
3469                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3470                                   "transaction record\n"));
3471                 talloc_free(tmp_ctx);
3472                 return -1;
3473         }
3474
3475         talloc_free(rh);
3476
3477         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3478         if (ret != 0) {
3479                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3480                 talloc_free(tmp_ctx);
3481                 return -1;
3482         }
3483
3484         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3485         if (ret != 0) {
3486                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3487                                  "lock record inside transaction\n"));
3488                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3489                 talloc_free(tmp_ctx);
3490                 goto again;
3491         }
3492
3493         if (header.dmaster != ctdb_db->ctdb->pnn) {
3494                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3495                                    "transaction lock record\n"));
3496                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3497                 talloc_free(tmp_ctx);
3498                 goto again;
3499         }
3500
3501         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3502                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3503                                     "the transaction lock record\n"));
3504                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3505                 talloc_free(tmp_ctx);
3506                 goto again;
3507         }
3508
3509         talloc_free(tmp_ctx);
3510
3511         return 0;
3512 }
3513
3514
3515 /* start a transaction on a database */
3516 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3517                                                        TALLOC_CTX *mem_ctx)
3518 {
3519         struct ctdb_transaction_handle *h;
3520         int ret;
3521
3522         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3523         if (h == NULL) {
3524                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3525                 return NULL;
3526         }
3527
3528         h->ctdb_db = ctdb_db;
3529
3530         ret = ctdb_transaction_fetch_start(h);
3531         if (ret != 0) {
3532                 talloc_free(h);
3533                 return NULL;
3534         }
3535
3536         talloc_set_destructor(h, ctdb_transaction_destructor);
3537
3538         return h;
3539 }
3540
3541
3542
3543 /*
3544   fetch a record inside a transaction
3545  */
3546 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3547                            TALLOC_CTX *mem_ctx, 
3548                            TDB_DATA key, TDB_DATA *data)
3549 {
3550         struct ctdb_ltdb_header header;
3551         int ret;
3552
3553         ZERO_STRUCT(header);
3554
3555         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3556         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3557                 /* record doesn't exist yet */
3558                 *data = tdb_null;
3559                 ret = 0;
3560         }
3561         
3562         if (ret != 0) {
3563                 return ret;
3564         }
3565
3566         if (!h->in_replay) {
3567                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3568                 if (h->m_all == NULL) {
3569                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3570                         return -1;
3571                 }
3572         }
3573
3574         return 0;
3575 }
3576
3577 /*
3578   stores a record inside a transaction
3579  */
3580 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3581                            TDB_DATA key, TDB_DATA data)
3582 {
3583         TALLOC_CTX *tmp_ctx = talloc_new(h);
3584         struct ctdb_ltdb_header header;
3585         TDB_DATA olddata;
3586         int ret;
3587
3588         ZERO_STRUCT(header);
3589
3590         /* we need the header so we can update the RSN */
3591         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3592         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3593                 /* the record doesn't exist - create one with us as dmaster.
3594                    This is only safe because we are in a transaction and this
3595                    is a persistent database */
3596                 ZERO_STRUCT(header);
3597         } else if (ret != 0) {
3598                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3599                 talloc_free(tmp_ctx);
3600                 return ret;
3601         }
3602
3603         if (data.dsize == olddata.dsize &&
3604             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3605                 /* save writing the same data */
3606                 talloc_free(tmp_ctx);
3607                 return 0;
3608         }
3609
3610         header.dmaster = h->ctdb_db->ctdb->pnn;
3611         header.rsn++;
3612
3613         if (!h->in_replay) {
3614                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3615                 if (h->m_all == NULL) {
3616                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3617                         talloc_free(tmp_ctx);
3618                         return -1;
3619                 }
3620         }               
3621
3622         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3623         if (h->m_write == NULL) {
3624                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3625                 talloc_free(tmp_ctx);
3626                 return -1;
3627         }
3628         
3629         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3630
3631         talloc_free(tmp_ctx);
3632         
3633         return ret;
3634 }
3635
3636 /*
3637   replay a transaction
3638  */
3639 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3640 {
3641         int ret, i;
3642         struct ctdb_rec_data *rec = NULL;
3643
3644         h->in_replay = true;
3645         talloc_free(h->m_write);
3646         h->m_write = NULL;
3647
3648         ret = ctdb_transaction_fetch_start(h);
3649         if (ret != 0) {
3650                 return ret;
3651         }
3652
3653         for (i=0;i<h->m_all->count;i++) {
3654                 TDB_DATA key, data;
3655
3656                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3657                 if (rec == NULL) {
3658                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3659                         goto failed;
3660                 }
3661
3662                 if (rec->reqid == 0) {
3663                         /* its a store */
3664                         if (ctdb_transaction_store(h, key, data) != 0) {
3665                                 goto failed;
3666                         }
3667                 } else {
3668                         TDB_DATA data2;
3669                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3670
3671                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3672                                 talloc_free(tmp_ctx);
3673                                 goto failed;
3674                         }
3675                         if (data2.dsize != data.dsize ||
3676                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3677                                 /* the record has changed on us - we have to give up */
3678                                 talloc_free(tmp_ctx);
3679                                 goto failed;
3680                         }
3681                         talloc_free(tmp_ctx);
3682                 }
3683         }
3684         
3685         return 0;
3686
3687 failed:
3688         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3689         return -1;
3690 }
3691
3692
3693 /*
3694   commit a transaction
3695  */
3696 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3697 {
3698         int ret, retries=0;
3699         int32_t status;
3700         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3701         struct timeval timeout;
3702         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3703
3704         talloc_set_destructor(h, NULL);
3705
3706         /* our commit strategy is quite complex.
3707
3708            - we first try to commit the changes to all other nodes
3709
3710            - if that works, then we commit locally and we are done
3711
3712            - if a commit on another node fails, then we need to cancel
3713              the transaction, then restart the transaction (thus
3714              opening a window of time for a pending recovery to
3715              complete), then replay the transaction, checking all the
3716              reads and writes (checking that reads give the same data,
3717              and writes succeed). Then we retry the transaction to the
3718              other nodes
3719         */
3720
3721 again:
3722         if (h->m_write == NULL) {
3723                 /* no changes were made */
3724                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3725                 talloc_free(h);
3726                 return 0;
3727         }
3728
3729         /* tell ctdbd to commit to the other nodes */
3730         timeout = timeval_current_ofs(1, 0);
3731         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3732                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3733                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3734                            &timeout, NULL);
3735         if (ret != 0 || status != 0) {
3736                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3737                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3738                                      ", retrying after 1 second...\n",
3739                                      (retries==0)?"":"retry "));
3740                 sleep(1);
3741
3742                 if (ret != 0) {
3743                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3744                 } else {
3745                         /* work out what error code we will give if we 
3746                            have to fail the operation */
3747                         switch ((enum ctdb_trans2_commit_error)status) {
3748                         case CTDB_TRANS2_COMMIT_SUCCESS:
3749                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3750                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3751                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3752                                 break;
3753                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3754                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3755                                 break;
3756                         }
3757                 }
3758
3759                 if (++retries == 100) {
3760                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3761                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3762                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3763                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3764                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3765                         talloc_free(h);
3766                         return -1;
3767                 }               
3768
3769                 if (ctdb_replay_transaction(h) != 0) {
3770                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3771                                           "transaction on db 0x%08x, "
3772                                           "failure control =%u\n",
3773                                           h->ctdb_db->db_id,
3774                                           (unsigned)failure_control));
3775                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3776                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3777                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3778                         talloc_free(h);
3779                         return -1;
3780                 }
3781                 goto again;
3782         } else {
3783                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3784         }
3785
3786         /* do the real commit locally */
3787         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3788         if (ret != 0) {
3789                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3790                                   "on db id 0x%08x locally, "
3791                                   "failure_control=%u\n",
3792                                   h->ctdb_db->db_id,
3793                                   (unsigned)failure_control));
3794                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3795                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3796                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3797                 talloc_free(h);
3798                 return ret;
3799         }
3800
3801         /* tell ctdbd that we are finished with our local commit */
3802         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3803                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3804                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3805         talloc_free(h);
3806         return 0;
3807 }
3808
3809 /*
3810   recovery daemon ping to main daemon
3811  */
3812 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3813 {
3814         int ret;
3815         int32_t res;
3816
3817         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3818                            ctdb, NULL, &res, NULL, NULL);
3819         if (ret != 0 || res != 0) {
3820                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3821                 return -1;
3822         }
3823
3824         return 0;
3825 }
3826
3827 /* when forking the main daemon and the child process needs to connect back
3828  * to the daemon as a client process, this function can be used to change
3829  * the ctdb context from daemon into client mode
3830  */
3831 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3832 {
3833         int ret;
3834         va_list ap;
3835
3836         /* Add extra information so we can identify this in the logs */
3837         va_start(ap, fmt);
3838         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3839         va_end(ap);
3840
3841         /* shutdown the transport */
3842         if (ctdb->methods) {
3843                 ctdb->methods->shutdown(ctdb);
3844         }
3845
3846         /* get a new event context */
3847         talloc_free(ctdb->ev);
3848         ctdb->ev = event_context_init(ctdb);
3849         tevent_loop_allow_nesting(ctdb->ev);
3850
3851         close(ctdb->daemon.sd);
3852         ctdb->daemon.sd = -1;
3853
3854         /* the client does not need to be realtime */
3855         if (ctdb->do_setsched) {
3856                 ctdb_restore_scheduler(ctdb);
3857         }
3858
3859         /* initialise ctdb */
3860         ret = ctdb_socket_connect(ctdb);
3861         if (ret != 0) {
3862                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3863                 return -1;
3864         }
3865
3866          return 0;
3867 }
3868
3869 /*
3870   get the status of running the monitor eventscripts: NULL means never run.
3871  */
3872 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3873                 struct timeval timeout, uint32_t destnode, 
3874                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3875                 struct ctdb_scripts_wire **script_status)
3876 {
3877         int ret;
3878         TDB_DATA outdata, indata;
3879         int32_t res;
3880         uint32_t uinttype = type;
3881
3882         indata.dptr = (uint8_t *)&uinttype;
3883         indata.dsize = sizeof(uinttype);
3884
3885         ret = ctdb_control(ctdb, destnode, 0, 
3886                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3887                            mem_ctx, &outdata, &res, &timeout, NULL);
3888         if (ret != 0 || res != 0) {
3889                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3890                 return -1;
3891         }
3892
3893         if (outdata.dsize == 0) {
3894                 *script_status = NULL;
3895         } else {
3896                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3897                 talloc_free(outdata.dptr);
3898         }
3899                     
3900         return 0;
3901 }
3902
3903 /*
3904   tell the main daemon how long it took to lock the reclock file
3905  */
3906 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3907 {
3908         int ret;
3909         int32_t res;
3910         TDB_DATA data;
3911
3912         data.dptr = (uint8_t *)&latency;
3913         data.dsize = sizeof(latency);
3914
3915         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3916                            ctdb, NULL, &res, NULL, NULL);
3917         if (ret != 0 || res != 0) {
3918                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3919                 return -1;
3920         }
3921
3922         return 0;
3923 }
3924
3925 /*
3926   get the name of the reclock file
3927  */
3928 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3929                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3930                          const char **name)
3931 {
3932         int ret;
3933         int32_t res;
3934         TDB_DATA data;
3935
3936         ret = ctdb_control(ctdb, destnode, 0, 
3937                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3938                            mem_ctx, &data, &res, &timeout, NULL);
3939         if (ret != 0 || res != 0) {
3940                 return -1;
3941         }
3942
3943         if (data.dsize == 0) {
3944                 *name = NULL;
3945         } else {
3946                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3947         }
3948         talloc_free(data.dptr);
3949
3950         return 0;
3951 }
3952
3953 /*
3954   set the reclock filename for a node
3955  */
3956 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3957 {
3958         int ret;
3959         TDB_DATA data;
3960         int32_t res;
3961
3962         if (reclock == NULL) {
3963                 data.dsize = 0;
3964                 data.dptr  = NULL;
3965         } else {
3966                 data.dsize = strlen(reclock) + 1;
3967                 data.dptr  = discard_const(reclock);
3968         }
3969
3970         ret = ctdb_control(ctdb, destnode, 0, 
3971                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3972                            NULL, NULL, &res, &timeout, NULL);
3973         if (ret != 0 || res != 0) {
3974                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3975                 return -1;
3976         }
3977
3978         return 0;
3979 }
3980
3981 /*
3982   stop a node
3983  */
3984 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3985 {
3986         int ret;
3987         int32_t res;
3988
3989         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3990                            ctdb, NULL, &res, &timeout, NULL);
3991         if (ret != 0 || res != 0) {
3992                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3993                 return -1;
3994         }
3995
3996         return 0;
3997 }
3998
3999 /*
4000   continue a node
4001  */
4002 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4003 {
4004         int ret;
4005
4006         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4007                            ctdb, NULL, NULL, &timeout, NULL);
4008         if (ret != 0) {
4009                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4010                 return -1;
4011         }
4012
4013         return 0;
4014 }
4015
4016 /*
4017   set the natgw state for a node
4018  */
4019 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4020 {
4021         int ret;
4022         TDB_DATA data;
4023         int32_t res;
4024
4025         data.dsize = sizeof(natgwstate);
4026         data.dptr  = (uint8_t *)&natgwstate;
4027
4028         ret = ctdb_control(ctdb, destnode, 0, 
4029                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4030                            NULL, NULL, &res, &timeout, NULL);
4031         if (ret != 0 || res != 0) {
4032                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4033                 return -1;
4034         }
4035
4036         return 0;
4037 }
4038
4039 /*
4040   set the lmaster role for a node
4041  */
4042 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4043 {
4044         int ret;
4045         TDB_DATA data;
4046         int32_t res;
4047
4048         data.dsize = sizeof(lmasterrole);
4049         data.dptr  = (uint8_t *)&lmasterrole;
4050
4051         ret = ctdb_control(ctdb, destnode, 0, 
4052                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4053                            NULL, NULL, &res, &timeout, NULL);
4054         if (ret != 0 || res != 0) {
4055                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4056                 return -1;
4057         }
4058
4059         return 0;
4060 }
4061
4062 /*
4063   set the recmaster role for a node
4064  */
4065 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4066 {
4067         int ret;
4068         TDB_DATA data;
4069         int32_t res;
4070
4071         data.dsize = sizeof(recmasterrole);
4072         data.dptr  = (uint8_t *)&recmasterrole;
4073
4074         ret = ctdb_control(ctdb, destnode, 0, 
4075                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4076                            NULL, NULL, &res, &timeout, NULL);
4077         if (ret != 0 || res != 0) {
4078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4079                 return -1;
4080         }
4081
4082         return 0;
4083 }
4084
4085 /* enable an eventscript
4086  */
4087 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4088 {
4089         int ret;
4090         TDB_DATA data;
4091         int32_t res;
4092
4093         data.dsize = strlen(script) + 1;
4094         data.dptr  = discard_const(script);
4095
4096         ret = ctdb_control(ctdb, destnode, 0, 
4097                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4098                            NULL, NULL, &res, &timeout, NULL);
4099         if (ret != 0 || res != 0) {
4100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4101                 return -1;
4102         }
4103
4104         return 0;
4105 }
4106
4107 /* disable an eventscript
4108  */
4109 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4110 {
4111         int ret;
4112         TDB_DATA data;
4113         int32_t res;
4114
4115         data.dsize = strlen(script) + 1;
4116         data.dptr  = discard_const(script);
4117
4118         ret = ctdb_control(ctdb, destnode, 0, 
4119                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4120                            NULL, NULL, &res, &timeout, NULL);
4121         if (ret != 0 || res != 0) {
4122                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4123                 return -1;
4124         }
4125
4126         return 0;
4127 }
4128
4129
4130 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4131 {
4132         int ret;
4133         TDB_DATA data;
4134         int32_t res;
4135
4136         data.dsize = sizeof(*bantime);
4137         data.dptr  = (uint8_t *)bantime;
4138
4139         ret = ctdb_control(ctdb, destnode, 0, 
4140                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4141                            NULL, NULL, &res, &timeout, NULL);
4142         if (ret != 0 || res != 0) {
4143                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4144                 return -1;
4145         }
4146
4147         return 0;
4148 }
4149
4150
4151 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4152 {
4153         int ret;
4154         TDB_DATA outdata;
4155         int32_t res;
4156         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4157
4158         ret = ctdb_control(ctdb, destnode, 0, 
4159                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4160                            tmp_ctx, &outdata, &res, &timeout, NULL);
4161         if (ret != 0 || res != 0) {
4162                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4163                 talloc_free(tmp_ctx);
4164                 return -1;
4165         }
4166
4167         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4168         talloc_free(tmp_ctx);
4169
4170         return 0;
4171 }
4172
4173
4174 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4175 {
4176         int ret;
4177         int32_t res;
4178         TDB_DATA data;
4179         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4180
4181         data.dptr = (uint8_t*)db_prio;
4182         data.dsize = sizeof(*db_prio);
4183
4184         ret = ctdb_control(ctdb, destnode, 0, 
4185                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4186                            tmp_ctx, NULL, &res, &timeout, NULL);
4187         if (ret != 0 || res != 0) {
4188                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4189                 talloc_free(tmp_ctx);
4190                 return -1;
4191         }
4192
4193         talloc_free(tmp_ctx);
4194
4195         return 0;
4196 }
4197
4198 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4199 {
4200         int ret;
4201         int32_t res;
4202         TDB_DATA data;
4203         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4204
4205         data.dptr = (uint8_t*)&db_id;
4206         data.dsize = sizeof(db_id);
4207
4208         ret = ctdb_control(ctdb, destnode, 0, 
4209                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4210                            tmp_ctx, NULL, &res, &timeout, NULL);
4211         if (ret != 0 || res < 0) {
4212                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4213                 talloc_free(tmp_ctx);
4214                 return -1;
4215         }
4216
4217         if (priority) {
4218                 *priority = res;
4219         }
4220
4221         talloc_free(tmp_ctx);
4222
4223         return 0;
4224 }
4225
4226 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4227 {
4228         int ret;
4229         TDB_DATA outdata;
4230         int32_t res;
4231
4232         ret = ctdb_control(ctdb, destnode, 0, 
4233                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4234                            mem_ctx, &outdata, &res, &timeout, NULL);
4235         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4236                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4237                 return -1;
4238         }
4239
4240         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4241         talloc_free(outdata.dptr);
4242                     
4243         return 0;
4244 }
4245
4246 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4247 {
4248         if (h == NULL) {
4249                 return NULL;
4250         }
4251
4252         return &h->header;
4253 }