We need the deprecated talloc_append_string() for now
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /* for talloc_append_string() */
22 #define TALLOC_DEPRECATED 1
23
24 #include "includes.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tevent/tevent.h"
29 #include "system/network.h"
30 #include "system/filesys.h"
31 #include "system/locale.h"
32 #include <stdlib.h>
33 #include "../include/ctdb_private.h"
34 #include "lib/util/dlinklist.h"
35
36 pid_t ctdbd_pid;
37
38 /*
39   allocate a packet for use in client<->daemon communication
40  */
41 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
42                                             TALLOC_CTX *mem_ctx, 
43                                             enum ctdb_operation operation, 
44                                             size_t length, size_t slength,
45                                             const char *type)
46 {
47         int size;
48         struct ctdb_req_header *hdr;
49
50         length = MAX(length, slength);
51         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
52
53         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
54         if (hdr == NULL) {
55                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
56                          operation, (unsigned)length));
57                 return NULL;
58         }
59         talloc_set_name_const(hdr, type);
60         memset(hdr, 0, slength);
61         hdr->length       = length;
62         hdr->operation    = operation;
63         hdr->ctdb_magic   = CTDB_MAGIC;
64         hdr->ctdb_version = CTDB_VERSION;
65         hdr->srcnode      = ctdb->pnn;
66         if (ctdb->vnn_map) {
67                 hdr->generation = ctdb->vnn_map->generation;
68         }
69
70         return hdr;
71 }
72
73 /*
74   local version of ctdb_call
75 */
76 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
77                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
78                     TDB_DATA *data, uint32_t caller)
79 {
80         struct ctdb_call_info *c;
81         struct ctdb_registered_call *fn;
82         struct ctdb_context *ctdb = ctdb_db->ctdb;
83         
84         c = talloc(ctdb, struct ctdb_call_info);
85         CTDB_NO_MEMORY(ctdb, c);
86
87         c->key = call->key;
88         c->call_data = &call->call_data;
89         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
90         c->record_data.dsize = data->dsize;
91         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
92         c->new_data = NULL;
93         c->reply_data = NULL;
94         c->status = 0;
95
96         for (fn=ctdb_db->calls;fn;fn=fn->next) {
97                 if (fn->id == call->call_id) break;
98         }
99         if (fn == NULL) {
100                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         if (fn->fn(c) != 0) {
106                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
107                 talloc_free(c);
108                 return -1;
109         }
110
111         if (header->laccessor != caller) {
112                 header->lacount = 0;
113         }
114         header->laccessor = caller;
115         header->lacount++;
116
117         /* we need to force the record to be written out if this was a remote access,
118            so that the lacount is updated */
119         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
120                 c->new_data = &c->record_data;
121         }
122
123         if (c->new_data) {
124                 /* XXX check that we always have the lock here? */
125                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
126                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
127                         talloc_free(c);
128                         return -1;
129                 }
130         }
131
132         if (c->reply_data) {
133                 call->reply_data = *c->reply_data;
134
135                 talloc_steal(call, call->reply_data.dptr);
136                 talloc_set_name_const(call->reply_data.dptr, __location__);
137         } else {
138                 call->reply_data.dptr = NULL;
139                 call->reply_data.dsize = 0;
140         }
141         call->status = c->status;
142
143         talloc_free(c);
144
145         return 0;
146 }
147
148
149 /*
150   queue a packet for sending from client to daemon
151 */
152 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
153 {
154         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
155 }
156
157
158 /*
159   called when a CTDB_REPLY_CALL packet comes in in the client
160
161   This packet comes in response to a CTDB_REQ_CALL request packet. It
162   contains any reply data from the call
163 */
164 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
165 {
166         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
167         struct ctdb_client_call_state *state;
168
169         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
170         if (state == NULL) {
171                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
172                 return;
173         }
174
175         if (hdr->reqid != state->reqid) {
176                 /* we found a record  but it was the wrong one */
177                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
178                 return;
179         }
180
181         state->call->reply_data.dptr = c->data;
182         state->call->reply_data.dsize = c->datalen;
183         state->call->status = c->status;
184
185         talloc_steal(state, c);
186
187         state->state = CTDB_CALL_DONE;
188
189         if (state->async.fn) {
190                 state->async.fn(state);
191         }
192 }
193
194 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
195
196 /*
197   this is called in the client, when data comes in from the daemon
198  */
199 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
200 {
201         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
202         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
203         TALLOC_CTX *tmp_ctx;
204
205         /* place the packet as a child of a tmp_ctx. We then use
206            talloc_free() below to free it. If any of the calls want
207            to keep it, then they will steal it somewhere else, and the
208            talloc_free() will be a no-op */
209         tmp_ctx = talloc_new(ctdb);
210         talloc_steal(tmp_ctx, hdr);
211
212         if (cnt == 0) {
213                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
214                 exit(0);
215         }
216
217         if (cnt < sizeof(*hdr)) {
218                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
219                 goto done;
220         }
221         if (cnt != hdr->length) {
222                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
223                                (unsigned)hdr->length, (unsigned)cnt);
224                 goto done;
225         }
226
227         if (hdr->ctdb_magic != CTDB_MAGIC) {
228                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
229                 goto done;
230         }
231
232         if (hdr->ctdb_version != CTDB_VERSION) {
233                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
234                 goto done;
235         }
236
237         switch (hdr->operation) {
238         case CTDB_REPLY_CALL:
239                 ctdb_client_reply_call(ctdb, hdr);
240                 break;
241
242         case CTDB_REQ_MESSAGE:
243                 ctdb_request_message(ctdb, hdr);
244                 break;
245
246         case CTDB_REPLY_CONTROL:
247                 ctdb_client_reply_control(ctdb, hdr);
248                 break;
249
250         default:
251                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
252         }
253
254 done:
255         talloc_free(tmp_ctx);
256 }
257
258 /*
259   connect to a unix domain socket
260 */
261 int ctdb_socket_connect(struct ctdb_context *ctdb)
262 {
263         struct sockaddr_un addr;
264
265         memset(&addr, 0, sizeof(addr));
266         addr.sun_family = AF_UNIX;
267         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
268
269         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
270         if (ctdb->daemon.sd == -1) {
271                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
272                 return -1;
273         }
274
275         set_nonblocking(ctdb->daemon.sd);
276         set_close_on_exec(ctdb->daemon.sd);
277         
278         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
279                 close(ctdb->daemon.sd);
280                 ctdb->daemon.sd = -1;
281                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
282                 return -1;
283         }
284
285         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
286                                               CTDB_DS_ALIGNMENT, 
287                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
288         return 0;
289 }
290
291
292 struct ctdb_record_handle {
293         struct ctdb_db_context *ctdb_db;
294         TDB_DATA key;
295         TDB_DATA *data;
296         struct ctdb_ltdb_header header;
297 };
298
299
300 /*
301   make a recv call to the local ctdb daemon - called from client context
302
303   This is called when the program wants to wait for a ctdb_call to complete and get the 
304   results. This call will block unless the call has already completed.
305 */
306 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
307 {
308         if (state == NULL) {
309                 return -1;
310         }
311
312         while (state->state < CTDB_CALL_DONE) {
313                 event_loop_once(state->ctdb_db->ctdb->ev);
314         }
315         if (state->state != CTDB_CALL_DONE) {
316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
317                 talloc_free(state);
318                 return -1;
319         }
320
321         if (state->call->reply_data.dsize) {
322                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
323                                                       state->call->reply_data.dptr,
324                                                       state->call->reply_data.dsize);
325                 call->reply_data.dsize = state->call->reply_data.dsize;
326         } else {
327                 call->reply_data.dptr = NULL;
328                 call->reply_data.dsize = 0;
329         }
330         call->status = state->call->status;
331         talloc_free(state);
332
333         return 0;
334 }
335
336
337
338
339 /*
340   destroy a ctdb_call in client
341 */
342 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
343 {
344         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
345         return 0;
346 }
347
348 /*
349   construct an event driven local ctdb_call
350
351   this is used so that locally processed ctdb_call requests are processed
352   in an event driven manner
353 */
354 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
355                                                                   struct ctdb_call *call,
356                                                                   struct ctdb_ltdb_header *header,
357                                                                   TDB_DATA *data)
358 {
359         struct ctdb_client_call_state *state;
360         struct ctdb_context *ctdb = ctdb_db->ctdb;
361         int ret;
362
363         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
364         CTDB_NO_MEMORY_NULL(ctdb, state);
365         state->call = talloc_zero(state, struct ctdb_call);
366         CTDB_NO_MEMORY_NULL(ctdb, state->call);
367
368         talloc_steal(state, data->dptr);
369
370         state->state   = CTDB_CALL_DONE;
371         *(state->call) = *call;
372         state->ctdb_db = ctdb_db;
373
374         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
375
376         return state;
377 }
378
379 /*
380   make a ctdb call to the local daemon - async send. Called from client context.
381
382   This constructs a ctdb_call request and queues it for processing. 
383   This call never blocks.
384 */
385 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
386                                               struct ctdb_call *call)
387 {
388         struct ctdb_client_call_state *state;
389         struct ctdb_context *ctdb = ctdb_db->ctdb;
390         struct ctdb_ltdb_header header;
391         TDB_DATA data;
392         int ret;
393         size_t len;
394         struct ctdb_req_call *c;
395
396         /* if the domain socket is not yet open, open it */
397         if (ctdb->daemon.sd==-1) {
398                 ctdb_socket_connect(ctdb);
399         }
400
401         ret = ctdb_ltdb_lock(ctdb_db, call->key);
402         if (ret != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
404                 return NULL;
405         }
406
407         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
408
409         if (ret == 0 && header.dmaster == ctdb->pnn) {
410                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
411                 talloc_free(data.dptr);
412                 ctdb_ltdb_unlock(ctdb_db, call->key);
413                 return state;
414         }
415
416         ctdb_ltdb_unlock(ctdb_db, call->key);
417         talloc_free(data.dptr);
418
419         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
420         if (state == NULL) {
421                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
422                 return NULL;
423         }
424         state->call = talloc_zero(state, struct ctdb_call);
425         if (state->call == NULL) {
426                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
427                 return NULL;
428         }
429
430         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
431         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
432         if (c == NULL) {
433                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
434                 return NULL;
435         }
436
437         state->reqid     = ctdb_reqid_new(ctdb, state);
438         state->ctdb_db = ctdb_db;
439         talloc_set_destructor(state, ctdb_client_call_destructor);
440
441         c->hdr.reqid     = state->reqid;
442         c->flags         = call->flags;
443         c->db_id         = ctdb_db->db_id;
444         c->callid        = call->call_id;
445         c->hopcount      = 0;
446         c->keylen        = call->key.dsize;
447         c->calldatalen   = call->call_data.dsize;
448         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
449         memcpy(&c->data[call->key.dsize], 
450                call->call_data.dptr, call->call_data.dsize);
451         *(state->call)              = *call;
452         state->call->call_data.dptr = &c->data[call->key.dsize];
453         state->call->key.dptr       = &c->data[0];
454
455         state->state  = CTDB_CALL_WAIT;
456
457
458         ctdb_client_queue_pkt(ctdb, &c->hdr);
459
460         return state;
461 }
462
463
464 /*
465   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
466 */
467 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
468 {
469         struct ctdb_client_call_state *state;
470
471         state = ctdb_call_send(ctdb_db, call);
472         return ctdb_call_recv(state, call);
473 }
474
475
476 /*
477   tell the daemon what messaging srvid we will use, and register the message
478   handler function in the client
479 */
480 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
481                              ctdb_msg_fn_t handler,
482                              void *private_data)
483                                     
484 {
485         int res;
486         int32_t status;
487         
488         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
489                            tdb_null, NULL, NULL, &status, NULL, NULL);
490         if (res != 0 || status != 0) {
491                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
492                 return -1;
493         }
494
495         /* also need to register the handler with our own ctdb structure */
496         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
497 }
498
499 /*
500   tell the daemon we no longer want a srvid
501 */
502 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
503 {
504         int res;
505         int32_t status;
506         
507         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
508                            tdb_null, NULL, NULL, &status, NULL, NULL);
509         if (res != 0 || status != 0) {
510                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
511                 return -1;
512         }
513
514         /* also need to register the handler with our own ctdb structure */
515         ctdb_deregister_message_handler(ctdb, srvid, private_data);
516         return 0;
517 }
518
519
520 /*
521   send a message - from client context
522  */
523 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
524                       uint64_t srvid, TDB_DATA data)
525 {
526         struct ctdb_req_message *r;
527         int len, res;
528
529         len = offsetof(struct ctdb_req_message, data) + data.dsize;
530         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
531                                len, struct ctdb_req_message);
532         CTDB_NO_MEMORY(ctdb, r);
533
534         r->hdr.destnode  = pnn;
535         r->srvid         = srvid;
536         r->datalen       = data.dsize;
537         memcpy(&r->data[0], data.dptr, data.dsize);
538         
539         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
540         if (res != 0) {
541                 return res;
542         }
543
544         talloc_free(r);
545         return 0;
546 }
547
548
549 /*
550   cancel a ctdb_fetch_lock operation, releasing the lock
551  */
552 static int fetch_lock_destructor(struct ctdb_record_handle *h)
553 {
554         ctdb_ltdb_unlock(h->ctdb_db, h->key);
555         return 0;
556 }
557
558 /*
559   force the migration of a record to this node
560  */
561 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
562 {
563         struct ctdb_call call;
564         ZERO_STRUCT(call);
565         call.call_id = CTDB_NULL_FUNC;
566         call.key = key;
567         call.flags = CTDB_IMMEDIATE_MIGRATION;
568         return ctdb_call(ctdb_db, &call);
569 }
570
571 /*
572   get a lock on a record, and return the records data. Blocks until it gets the lock
573  */
574 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
575                                            TDB_DATA key, TDB_DATA *data)
576 {
577         int ret;
578         struct ctdb_record_handle *h;
579
580         /*
581           procedure is as follows:
582
583           1) get the chain lock. 
584           2) check if we are dmaster
585           3) if we are the dmaster then return handle 
586           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
587              reply from ctdbd
588           5) when we get the reply, goto (1)
589          */
590
591         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
592         if (h == NULL) {
593                 return NULL;
594         }
595
596         h->ctdb_db = ctdb_db;
597         h->key     = key;
598         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
599         if (h->key.dptr == NULL) {
600                 talloc_free(h);
601                 return NULL;
602         }
603         h->data    = data;
604
605         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
606                  (const char *)key.dptr));
607
608 again:
609         /* step 1 - get the chain lock */
610         ret = ctdb_ltdb_lock(ctdb_db, key);
611         if (ret != 0) {
612                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
613                 talloc_free(h);
614                 return NULL;
615         }
616
617         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
618
619         talloc_set_destructor(h, fetch_lock_destructor);
620
621         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
622
623         /* when torturing, ensure we test the remote path */
624         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
625             random() % 5 == 0) {
626                 h->header.dmaster = (uint32_t)-1;
627         }
628
629
630         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
631
632         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
633                 ctdb_ltdb_unlock(ctdb_db, key);
634                 ret = ctdb_client_force_migration(ctdb_db, key);
635                 if (ret != 0) {
636                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
637                         talloc_free(h);
638                         return NULL;
639                 }
640                 goto again;
641         }
642
643         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
644         return h;
645 }
646
647 /*
648   store some data to the record that was locked with ctdb_fetch_lock()
649 */
650 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
651 {
652         if (h->ctdb_db->persistent) {
653                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
654                 return -1;
655         }
656
657         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
658 }
659
660 /*
661   non-locking fetch of a record
662  */
663 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
664                TDB_DATA key, TDB_DATA *data)
665 {
666         struct ctdb_call call;
667         int ret;
668
669         call.call_id = CTDB_FETCH_FUNC;
670         call.call_data.dptr = NULL;
671         call.call_data.dsize = 0;
672
673         ret = ctdb_call(ctdb_db, &call);
674
675         if (ret == 0) {
676                 *data = call.reply_data;
677                 talloc_steal(mem_ctx, data->dptr);
678         }
679
680         return ret;
681 }
682
683
684
685 /*
686    called when a control completes or timesout to invoke the callback
687    function the user provided
688 */
689 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
690         struct timeval t, void *private_data)
691 {
692         struct ctdb_client_control_state *state;
693         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
694         int ret;
695
696         state = talloc_get_type(private_data, struct ctdb_client_control_state);
697         talloc_steal(tmp_ctx, state);
698
699         ret = ctdb_control_recv(state->ctdb, state, state,
700                         NULL, 
701                         NULL, 
702                         NULL);
703
704         talloc_free(tmp_ctx);
705 }
706
707 /*
708   called when a CTDB_REPLY_CONTROL packet comes in in the client
709
710   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
711   contains any reply data from the control
712 */
713 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
714                                       struct ctdb_req_header *hdr)
715 {
716         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
717         struct ctdb_client_control_state *state;
718
719         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
720         if (state == NULL) {
721                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
722                 return;
723         }
724
725         if (hdr->reqid != state->reqid) {
726                 /* we found a record  but it was the wrong one */
727                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
728                 return;
729         }
730
731         state->outdata.dptr = c->data;
732         state->outdata.dsize = c->datalen;
733         state->status = c->status;
734         if (c->errorlen) {
735                 state->errormsg = talloc_strndup(state, 
736                                                  (char *)&c->data[c->datalen], 
737                                                  c->errorlen);
738         }
739
740         /* state->outdata now uses resources from c so we dont want c
741            to just dissappear from under us while state is still alive
742         */
743         talloc_steal(state, c);
744
745         state->state = CTDB_CONTROL_DONE;
746
747         /* if we had a callback registered for this control, pull the response
748            and call the callback.
749         */
750         if (state->async.fn) {
751                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
752         }
753 }
754
755
756 /*
757   destroy a ctdb_control in client
758 */
759 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
760 {
761         ctdb_reqid_remove(state->ctdb, state->reqid);
762         return 0;
763 }
764
765
766 /* time out handler for ctdb_control */
767 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
768         struct timeval t, void *private_data)
769 {
770         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
771
772         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
773                          "dstnode:%u\n", state->reqid, state->c->opcode,
774                          state->c->hdr.destnode));
775
776         state->state = CTDB_CONTROL_TIMEOUT;
777
778         /* if we had a callback registered for this control, pull the response
779            and call the callback.
780         */
781         if (state->async.fn) {
782                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
783         }
784 }
785
786 /* async version of send control request */
787 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
788                 uint32_t destnode, uint64_t srvid, 
789                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
790                 TALLOC_CTX *mem_ctx,
791                 struct timeval *timeout,
792                 char **errormsg)
793 {
794         struct ctdb_client_control_state *state;
795         size_t len;
796         struct ctdb_req_control *c;
797         int ret;
798
799         if (errormsg) {
800                 *errormsg = NULL;
801         }
802
803         /* if the domain socket is not yet open, open it */
804         if (ctdb->daemon.sd==-1) {
805                 ctdb_socket_connect(ctdb);
806         }
807
808         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
809         CTDB_NO_MEMORY_NULL(ctdb, state);
810
811         state->ctdb       = ctdb;
812         state->reqid      = ctdb_reqid_new(ctdb, state);
813         state->state      = CTDB_CONTROL_WAIT;
814         state->errormsg   = NULL;
815
816         talloc_set_destructor(state, ctdb_control_destructor);
817
818         len = offsetof(struct ctdb_req_control, data) + data.dsize;
819         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
820                                len, struct ctdb_req_control);
821         state->c            = c;        
822         CTDB_NO_MEMORY_NULL(ctdb, c);
823         c->hdr.reqid        = state->reqid;
824         c->hdr.destnode     = destnode;
825         c->opcode           = opcode;
826         c->client_id        = 0;
827         c->flags            = flags;
828         c->srvid            = srvid;
829         c->datalen          = data.dsize;
830         if (data.dsize) {
831                 memcpy(&c->data[0], data.dptr, data.dsize);
832         }
833
834         /* timeout */
835         if (timeout && !timeval_is_zero(timeout)) {
836                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
837         }
838
839         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
840         if (ret != 0) {
841                 talloc_free(state);
842                 return NULL;
843         }
844
845         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
846                 talloc_free(state);
847                 return NULL;
848         }
849
850         return state;
851 }
852
853
854 /* async version of receive control reply */
855 int ctdb_control_recv(struct ctdb_context *ctdb, 
856                 struct ctdb_client_control_state *state, 
857                 TALLOC_CTX *mem_ctx,
858                 TDB_DATA *outdata, int32_t *status, char **errormsg)
859 {
860         TALLOC_CTX *tmp_ctx;
861
862         if (status != NULL) {
863                 *status = -1;
864         }
865         if (errormsg != NULL) {
866                 *errormsg = NULL;
867         }
868
869         if (state == NULL) {
870                 return -1;
871         }
872
873         /* prevent double free of state */
874         tmp_ctx = talloc_new(ctdb);
875         talloc_steal(tmp_ctx, state);
876
877         /* loop one event at a time until we either timeout or the control
878            completes.
879         */
880         while (state->state == CTDB_CONTROL_WAIT) {
881                 event_loop_once(ctdb->ev);
882         }
883
884         if (state->state != CTDB_CONTROL_DONE) {
885                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
886                 if (state->async.fn) {
887                         state->async.fn(state);
888                 }
889                 talloc_free(tmp_ctx);
890                 return -1;
891         }
892
893         if (state->errormsg) {
894                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
895                 if (errormsg) {
896                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
897                 }
898                 if (state->async.fn) {
899                         state->async.fn(state);
900                 }
901                 talloc_free(tmp_ctx);
902                 return -1;
903         }
904
905         if (outdata) {
906                 *outdata = state->outdata;
907                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
908         }
909
910         if (status) {
911                 *status = state->status;
912         }
913
914         if (state->async.fn) {
915                 state->async.fn(state);
916         }
917
918         talloc_free(tmp_ctx);
919         return 0;
920 }
921
922
923
924 /*
925   send a ctdb control message
926   timeout specifies how long we should wait for a reply.
927   if timeout is NULL we wait indefinitely
928  */
929 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
930                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
931                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
932                  struct timeval *timeout,
933                  char **errormsg)
934 {
935         struct ctdb_client_control_state *state;
936
937         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
938                         flags, data, mem_ctx,
939                         timeout, errormsg);
940         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
941                         errormsg);
942 }
943
944
945
946
947 /*
948   a process exists call. Returns 0 if process exists, -1 otherwise
949  */
950 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
951 {
952         int ret;
953         TDB_DATA data;
954         int32_t status;
955
956         data.dptr = (uint8_t*)&pid;
957         data.dsize = sizeof(pid);
958
959         ret = ctdb_control(ctdb, destnode, 0, 
960                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
961                            NULL, NULL, &status, NULL, NULL);
962         if (ret != 0) {
963                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
964                 return -1;
965         }
966
967         return status;
968 }
969
970 /*
971   get remote statistics
972  */
973 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
974 {
975         int ret;
976         TDB_DATA data;
977         int32_t res;
978
979         ret = ctdb_control(ctdb, destnode, 0, 
980                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
981                            ctdb, &data, &res, NULL, NULL);
982         if (ret != 0 || res != 0) {
983                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
984                 return -1;
985         }
986
987         if (data.dsize != sizeof(struct ctdb_statistics)) {
988                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
989                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
990                       return -1;
991         }
992
993         *status = *(struct ctdb_statistics *)data.dptr;
994         talloc_free(data.dptr);
995                         
996         return 0;
997 }
998
999 /*
1000   shutdown a remote ctdb node
1001  */
1002 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1003 {
1004         struct ctdb_client_control_state *state;
1005
1006         state = ctdb_control_send(ctdb, destnode, 0, 
1007                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1008                            NULL, &timeout, NULL);
1009         if (state == NULL) {
1010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1011                 return -1;
1012         }
1013
1014         return 0;
1015 }
1016
1017 /*
1018   get vnn map from a remote node
1019  */
1020 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1021 {
1022         int ret;
1023         TDB_DATA outdata;
1024         int32_t res;
1025         struct ctdb_vnn_map_wire *map;
1026
1027         ret = ctdb_control(ctdb, destnode, 0, 
1028                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1029                            mem_ctx, &outdata, &res, &timeout, NULL);
1030         if (ret != 0 || res != 0) {
1031                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1032                 return -1;
1033         }
1034         
1035         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1036         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1037             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1038                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1039                 return -1;
1040         }
1041
1042         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1043         CTDB_NO_MEMORY(ctdb, *vnnmap);
1044         (*vnnmap)->generation = map->generation;
1045         (*vnnmap)->size       = map->size;
1046         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1047
1048         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1049         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1050         talloc_free(outdata.dptr);
1051                     
1052         return 0;
1053 }
1054
1055
1056 /*
1057   get the recovery mode of a remote node
1058  */
1059 struct ctdb_client_control_state *
1060 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1061 {
1062         return ctdb_control_send(ctdb, destnode, 0, 
1063                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1064                            mem_ctx, &timeout, NULL);
1065 }
1066
1067 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1068 {
1069         int ret;
1070         int32_t res;
1071
1072         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1073         if (ret != 0) {
1074                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1075                 return -1;
1076         }
1077
1078         if (recmode) {
1079                 *recmode = (uint32_t)res;
1080         }
1081
1082         return 0;
1083 }
1084
1085 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1086 {
1087         struct ctdb_client_control_state *state;
1088
1089         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1090         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1091 }
1092
1093
1094
1095
1096 /*
1097   set the recovery mode of a remote node
1098  */
1099 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1100 {
1101         int ret;
1102         TDB_DATA data;
1103         int32_t res;
1104
1105         data.dsize = sizeof(uint32_t);
1106         data.dptr = (unsigned char *)&recmode;
1107
1108         ret = ctdb_control(ctdb, destnode, 0, 
1109                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1110                            NULL, NULL, &res, &timeout, NULL);
1111         if (ret != 0 || res != 0) {
1112                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1113                 return -1;
1114         }
1115
1116         return 0;
1117 }
1118
1119
1120
1121 /*
1122   get the recovery master of a remote node
1123  */
1124 struct ctdb_client_control_state *
1125 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1126                         struct timeval timeout, uint32_t destnode)
1127 {
1128         return ctdb_control_send(ctdb, destnode, 0, 
1129                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1130                            mem_ctx, &timeout, NULL);
1131 }
1132
1133 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1134 {
1135         int ret;
1136         int32_t res;
1137
1138         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1141                 return -1;
1142         }
1143
1144         if (recmaster) {
1145                 *recmaster = (uint32_t)res;
1146         }
1147
1148         return 0;
1149 }
1150
1151 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1152 {
1153         struct ctdb_client_control_state *state;
1154
1155         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1156         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1157 }
1158
1159
1160 /*
1161   set the recovery master of a remote node
1162  */
1163 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1164 {
1165         int ret;
1166         TDB_DATA data;
1167         int32_t res;
1168
1169         ZERO_STRUCT(data);
1170         data.dsize = sizeof(uint32_t);
1171         data.dptr = (unsigned char *)&recmaster;
1172
1173         ret = ctdb_control(ctdb, destnode, 0, 
1174                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1175                            NULL, NULL, &res, &timeout, NULL);
1176         if (ret != 0 || res != 0) {
1177                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1178                 return -1;
1179         }
1180
1181         return 0;
1182 }
1183
1184
1185 /*
1186   get a list of databases off a remote node
1187  */
1188 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1189                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1190 {
1191         int ret;
1192         TDB_DATA outdata;
1193         int32_t res;
1194
1195         ret = ctdb_control(ctdb, destnode, 0, 
1196                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1197                            mem_ctx, &outdata, &res, &timeout, NULL);
1198         if (ret != 0 || res != 0) {
1199                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1200                 return -1;
1201         }
1202
1203         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1204         talloc_free(outdata.dptr);
1205                     
1206         return 0;
1207 }
1208
1209 /*
1210   get a list of nodes (vnn and flags ) from a remote node
1211  */
1212 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1213                 struct timeval timeout, uint32_t destnode, 
1214                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1215 {
1216         int ret;
1217         TDB_DATA outdata;
1218         int32_t res;
1219
1220         ret = ctdb_control(ctdb, destnode, 0, 
1221                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1222                            mem_ctx, &outdata, &res, &timeout, NULL);
1223         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1225                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1226         }
1227         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1228                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1229                 return -1;
1230         }
1231
1232         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1233         talloc_free(outdata.dptr);
1234                     
1235         return 0;
1236 }
1237
1238 /*
1239   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1240  */
1241 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1242                 struct timeval timeout, uint32_t destnode, 
1243                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1244 {
1245         int ret, i, len;
1246         TDB_DATA outdata;
1247         struct ctdb_node_mapv4 *nodemapv4;
1248         int32_t res;
1249
1250         ret = ctdb_control(ctdb, destnode, 0, 
1251                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1252                            mem_ctx, &outdata, &res, &timeout, NULL);
1253         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1255                 return -1;
1256         }
1257
1258         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1259
1260         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1261         (*nodemap) = talloc_zero_size(mem_ctx, len);
1262         CTDB_NO_MEMORY(ctdb, (*nodemap));
1263
1264         (*nodemap)->num = nodemapv4->num;
1265         for (i=0; i<nodemapv4->num; i++) {
1266                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1267                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1268                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1269                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1270         }
1271                 
1272         talloc_free(outdata.dptr);
1273                     
1274         return 0;
1275 }
1276
1277 /*
1278   drop the transport, reload the nodes file and restart the transport
1279  */
1280 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1281                     struct timeval timeout, uint32_t destnode)
1282 {
1283         int ret;
1284         int32_t res;
1285
1286         ret = ctdb_control(ctdb, destnode, 0, 
1287                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1288                            NULL, NULL, &res, &timeout, NULL);
1289         if (ret != 0 || res != 0) {
1290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1291                 return -1;
1292         }
1293
1294         return 0;
1295 }
1296
1297
1298 /*
1299   set vnn map on a node
1300  */
1301 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1302                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1303 {
1304         int ret;
1305         TDB_DATA data;
1306         int32_t res;
1307         struct ctdb_vnn_map_wire *map;
1308         size_t len;
1309
1310         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1311         map = talloc_size(mem_ctx, len);
1312         CTDB_NO_MEMORY(ctdb, map);
1313
1314         map->generation = vnnmap->generation;
1315         map->size = vnnmap->size;
1316         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1317         
1318         data.dsize = len;
1319         data.dptr  = (uint8_t *)map;
1320
1321         ret = ctdb_control(ctdb, destnode, 0, 
1322                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1323                            NULL, NULL, &res, &timeout, NULL);
1324         if (ret != 0 || res != 0) {
1325                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1326                 return -1;
1327         }
1328
1329         talloc_free(map);
1330
1331         return 0;
1332 }
1333
1334
1335 /*
1336   async send for pull database
1337  */
1338 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1339         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1340         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1341 {
1342         TDB_DATA indata;
1343         struct ctdb_control_pulldb *pull;
1344         struct ctdb_client_control_state *state;
1345
1346         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1347         CTDB_NO_MEMORY_NULL(ctdb, pull);
1348
1349         pull->db_id   = dbid;
1350         pull->lmaster = lmaster;
1351
1352         indata.dsize = sizeof(struct ctdb_control_pulldb);
1353         indata.dptr  = (unsigned char *)pull;
1354
1355         state = ctdb_control_send(ctdb, destnode, 0, 
1356                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1357                                   mem_ctx, &timeout, NULL);
1358         talloc_free(pull);
1359
1360         return state;
1361 }
1362
1363 /*
1364   async recv for pull database
1365  */
1366 int ctdb_ctrl_pulldb_recv(
1367         struct ctdb_context *ctdb, 
1368         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1369         TDB_DATA *outdata)
1370 {
1371         int ret;
1372         int32_t res;
1373
1374         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1375         if ( (ret != 0) || (res != 0) ){
1376                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1377                 return -1;
1378         }
1379
1380         return 0;
1381 }
1382
1383 /*
1384   pull all keys and records for a specific database on a node
1385  */
1386 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1387                 uint32_t dbid, uint32_t lmaster, 
1388                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1389                 TDB_DATA *outdata)
1390 {
1391         struct ctdb_client_control_state *state;
1392
1393         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1394                                       timeout);
1395         
1396         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1397 }
1398
1399
1400 /*
1401   change dmaster for all keys in the database to the new value
1402  */
1403 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1404                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1405 {
1406         int ret;
1407         TDB_DATA indata;
1408         int32_t res;
1409
1410         indata.dsize = 2*sizeof(uint32_t);
1411         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1412
1413         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1414         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1415
1416         ret = ctdb_control(ctdb, destnode, 0, 
1417                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1418                            NULL, NULL, &res, &timeout, NULL);
1419         if (ret != 0 || res != 0) {
1420                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1421                 return -1;
1422         }
1423
1424         return 0;
1425 }
1426
1427 /*
1428   ping a node, return number of clients connected
1429  */
1430 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1431 {
1432         int ret;
1433         int32_t res;
1434
1435         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1436                            tdb_null, NULL, NULL, &res, NULL, NULL);
1437         if (ret != 0) {
1438                 return -1;
1439         }
1440         return res;
1441 }
1442
1443 /*
1444   find the real path to a ltdb 
1445  */
1446 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1447                    const char **path)
1448 {
1449         int ret;
1450         int32_t res;
1451         TDB_DATA data;
1452
1453         data.dptr = (uint8_t *)&dbid;
1454         data.dsize = sizeof(dbid);
1455
1456         ret = ctdb_control(ctdb, destnode, 0, 
1457                            CTDB_CONTROL_GETDBPATH, 0, data, 
1458                            mem_ctx, &data, &res, &timeout, NULL);
1459         if (ret != 0 || res != 0) {
1460                 return -1;
1461         }
1462
1463         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1464         if ((*path) == NULL) {
1465                 return -1;
1466         }
1467
1468         talloc_free(data.dptr);
1469
1470         return 0;
1471 }
1472
1473 /*
1474   find the name of a db 
1475  */
1476 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1477                    const char **name)
1478 {
1479         int ret;
1480         int32_t res;
1481         TDB_DATA data;
1482
1483         data.dptr = (uint8_t *)&dbid;
1484         data.dsize = sizeof(dbid);
1485
1486         ret = ctdb_control(ctdb, destnode, 0, 
1487                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1488                            mem_ctx, &data, &res, &timeout, NULL);
1489         if (ret != 0 || res != 0) {
1490                 return -1;
1491         }
1492
1493         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1494         if ((*name) == NULL) {
1495                 return -1;
1496         }
1497
1498         talloc_free(data.dptr);
1499
1500         return 0;
1501 }
1502
1503 /*
1504   get the health status of a db
1505  */
1506 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1507                           struct timeval timeout,
1508                           uint32_t destnode,
1509                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1510                           const char **reason)
1511 {
1512         int ret;
1513         int32_t res;
1514         TDB_DATA data;
1515
1516         data.dptr = (uint8_t *)&dbid;
1517         data.dsize = sizeof(dbid);
1518
1519         ret = ctdb_control(ctdb, destnode, 0,
1520                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1521                            mem_ctx, &data, &res, &timeout, NULL);
1522         if (ret != 0 || res != 0) {
1523                 return -1;
1524         }
1525
1526         if (data.dsize == 0) {
1527                 (*reason) = NULL;
1528                 return 0;
1529         }
1530
1531         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1532         if ((*reason) == NULL) {
1533                 return -1;
1534         }
1535
1536         talloc_free(data.dptr);
1537
1538         return 0;
1539 }
1540
1541 /*
1542   create a database
1543  */
1544 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1545                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1546 {
1547         int ret;
1548         int32_t res;
1549         TDB_DATA data;
1550
1551         data.dptr = discard_const(name);
1552         data.dsize = strlen(name)+1;
1553
1554         ret = ctdb_control(ctdb, destnode, 0, 
1555                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1556                            0, data, 
1557                            mem_ctx, &data, &res, &timeout, NULL);
1558
1559         if (ret != 0 || res != 0) {
1560                 return -1;
1561         }
1562
1563         return 0;
1564 }
1565
1566 /*
1567   get debug level on a node
1568  */
1569 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1570 {
1571         int ret;
1572         int32_t res;
1573         TDB_DATA data;
1574
1575         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1576                            ctdb, &data, &res, NULL, NULL);
1577         if (ret != 0 || res != 0) {
1578                 return -1;
1579         }
1580         if (data.dsize != sizeof(int32_t)) {
1581                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1582                          (unsigned)data.dsize));
1583                 return -1;
1584         }
1585         *level = *(int32_t *)data.dptr;
1586         talloc_free(data.dptr);
1587         return 0;
1588 }
1589
1590 /*
1591   set debug level on a node
1592  */
1593 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1594 {
1595         int ret;
1596         int32_t res;
1597         TDB_DATA data;
1598
1599         data.dptr = (uint8_t *)&level;
1600         data.dsize = sizeof(level);
1601
1602         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1603                            NULL, NULL, &res, NULL, NULL);
1604         if (ret != 0 || res != 0) {
1605                 return -1;
1606         }
1607         return 0;
1608 }
1609
1610
1611 /*
1612   get a list of connected nodes
1613  */
1614 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1615                                 struct timeval timeout,
1616                                 TALLOC_CTX *mem_ctx,
1617                                 uint32_t *num_nodes)
1618 {
1619         struct ctdb_node_map *map=NULL;
1620         int ret, i;
1621         uint32_t *nodes;
1622
1623         *num_nodes = 0;
1624
1625         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1626         if (ret != 0) {
1627                 return NULL;
1628         }
1629
1630         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1631         if (nodes == NULL) {
1632                 return NULL;
1633         }
1634
1635         for (i=0;i<map->num;i++) {
1636                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1637                         nodes[*num_nodes] = map->nodes[i].pnn;
1638                         (*num_nodes)++;
1639                 }
1640         }
1641
1642         return nodes;
1643 }
1644
1645
1646 /*
1647   reset remote status
1648  */
1649 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1650 {
1651         int ret;
1652         int32_t res;
1653
1654         ret = ctdb_control(ctdb, destnode, 0, 
1655                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1656                            NULL, NULL, &res, NULL, NULL);
1657         if (ret != 0 || res != 0) {
1658                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1659                 return -1;
1660         }
1661         return 0;
1662 }
1663
1664 /*
1665   this is the dummy null procedure that all databases support
1666 */
1667 static int ctdb_null_func(struct ctdb_call_info *call)
1668 {
1669         return 0;
1670 }
1671
1672 /*
1673   this is a plain fetch procedure that all databases support
1674 */
1675 static int ctdb_fetch_func(struct ctdb_call_info *call)
1676 {
1677         call->reply_data = &call->record_data;
1678         return 0;
1679 }
1680
1681 /*
1682   attach to a specific database - client call
1683 */
1684 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1685 {
1686         struct ctdb_db_context *ctdb_db;
1687         TDB_DATA data;
1688         int ret;
1689         int32_t res;
1690
1691         ctdb_db = ctdb_db_handle(ctdb, name);
1692         if (ctdb_db) {
1693                 return ctdb_db;
1694         }
1695
1696         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1697         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1698
1699         ctdb_db->ctdb = ctdb;
1700         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1701         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1702
1703         data.dptr = discard_const(name);
1704         data.dsize = strlen(name)+1;
1705
1706         /* tell ctdb daemon to attach */
1707         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1708                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1709                            0, data, ctdb_db, &data, &res, NULL, NULL);
1710         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1711                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1712                 talloc_free(ctdb_db);
1713                 return NULL;
1714         }
1715         
1716         ctdb_db->db_id = *(uint32_t *)data.dptr;
1717         talloc_free(data.dptr);
1718
1719         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1720         if (ret != 0) {
1721                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1722                 talloc_free(ctdb_db);
1723                 return NULL;
1724         }
1725
1726         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1727         if (ctdb->valgrinding) {
1728                 tdb_flags |= TDB_NOMMAP;
1729         }
1730         tdb_flags |= TDB_DISALLOW_NESTING;
1731
1732         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1733         if (ctdb_db->ltdb == NULL) {
1734                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1735                 talloc_free(ctdb_db);
1736                 return NULL;
1737         }
1738
1739         ctdb_db->persistent = persistent;
1740
1741         DLIST_ADD(ctdb->db_list, ctdb_db);
1742
1743         /* add well known functions */
1744         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1745         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1746
1747         return ctdb_db;
1748 }
1749
1750
1751 /*
1752   setup a call for a database
1753  */
1754 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1755 {
1756         struct ctdb_registered_call *call;
1757
1758 #if 0
1759         TDB_DATA data;
1760         int32_t status;
1761         struct ctdb_control_set_call c;
1762         int ret;
1763
1764         /* this is no longer valid with the separate daemon architecture */
1765         c.db_id = ctdb_db->db_id;
1766         c.fn    = fn;
1767         c.id    = id;
1768
1769         data.dptr = (uint8_t *)&c;
1770         data.dsize = sizeof(c);
1771
1772         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1773                            data, NULL, NULL, &status, NULL, NULL);
1774         if (ret != 0 || status != 0) {
1775                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1776                 return -1;
1777         }
1778 #endif
1779
1780         /* also register locally */
1781         call = talloc(ctdb_db, struct ctdb_registered_call);
1782         call->fn = fn;
1783         call->id = id;
1784
1785         DLIST_ADD(ctdb_db->calls, call);        
1786         return 0;
1787 }
1788
1789
1790 struct traverse_state {
1791         bool done;
1792         uint32_t count;
1793         ctdb_traverse_func fn;
1794         void *private_data;
1795 };
1796
1797 /*
1798   called on each key during a ctdb_traverse
1799  */
1800 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1801 {
1802         struct traverse_state *state = (struct traverse_state *)p;
1803         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1804         TDB_DATA key;
1805
1806         if (data.dsize < sizeof(uint32_t) ||
1807             d->length != data.dsize) {
1808                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1809                 state->done = True;
1810                 return;
1811         }
1812
1813         key.dsize = d->keylen;
1814         key.dptr  = &d->data[0];
1815         data.dsize = d->datalen;
1816         data.dptr = &d->data[d->keylen];
1817
1818         if (key.dsize == 0 && data.dsize == 0) {
1819                 /* end of traverse */
1820                 state->done = True;
1821                 return;
1822         }
1823
1824         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1825                 /* empty records are deleted records in ctdb */
1826                 return;
1827         }
1828
1829         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1830                 state->done = True;
1831         }
1832
1833         state->count++;
1834 }
1835
1836
1837 /*
1838   start a cluster wide traverse, calling the supplied fn on each record
1839   return the number of records traversed, or -1 on error
1840  */
1841 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1842 {
1843         TDB_DATA data;
1844         struct ctdb_traverse_start t;
1845         int32_t status;
1846         int ret;
1847         uint64_t srvid = (getpid() | 0xFLL<<60);
1848         struct traverse_state state;
1849
1850         state.done = False;
1851         state.count = 0;
1852         state.private_data = private_data;
1853         state.fn = fn;
1854
1855         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1858                 return -1;
1859         }
1860
1861         t.db_id = ctdb_db->db_id;
1862         t.srvid = srvid;
1863         t.reqid = 0;
1864
1865         data.dptr = (uint8_t *)&t;
1866         data.dsize = sizeof(t);
1867
1868         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1869                            data, NULL, NULL, &status, NULL, NULL);
1870         if (ret != 0 || status != 0) {
1871                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1872                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1873                 return -1;
1874         }
1875
1876         while (!state.done) {
1877                 event_loop_once(ctdb_db->ctdb->ev);
1878         }
1879
1880         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1881         if (ret != 0) {
1882                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1883                 return -1;
1884         }
1885
1886         return state.count;
1887 }
1888
1889 #define ISASCII(x) ((x>31)&&(x<128))
1890 /*
1891   called on each key during a catdb
1892  */
1893 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1894 {
1895         int i;
1896         FILE *f = (FILE *)p;
1897         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1898
1899         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1900         for (i=0;i<key.dsize;i++) {
1901                 if (ISASCII(key.dptr[i])) {
1902                         fprintf(f, "%c", key.dptr[i]);
1903                 } else {
1904                         fprintf(f, "\\%02X", key.dptr[i]);
1905                 }
1906         }
1907         fprintf(f, "\"\n");
1908
1909         fprintf(f, "dmaster: %u\n", h->dmaster);
1910         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1911
1912         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1913         for (i=sizeof(*h);i<data.dsize;i++) {
1914                 if (ISASCII(data.dptr[i])) {
1915                         fprintf(f, "%c", data.dptr[i]);
1916                 } else {
1917                         fprintf(f, "\\%02X", data.dptr[i]);
1918                 }
1919         }
1920         fprintf(f, "\"\n");
1921
1922         fprintf(f, "\n");
1923
1924         return 0;
1925 }
1926
1927 /*
1928   convenience function to list all keys to stdout
1929  */
1930 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1931 {
1932         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1933 }
1934
1935 /*
1936   get the pid of a ctdb daemon
1937  */
1938 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1939 {
1940         int ret;
1941         int32_t res;
1942
1943         ret = ctdb_control(ctdb, destnode, 0, 
1944                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1945                            NULL, NULL, &res, &timeout, NULL);
1946         if (ret != 0) {
1947                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1948                 return -1;
1949         }
1950
1951         *pid = res;
1952
1953         return 0;
1954 }
1955
1956
1957 /*
1958   async freeze send control
1959  */
1960 struct ctdb_client_control_state *
1961 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1962 {
1963         return ctdb_control_send(ctdb, destnode, priority, 
1964                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1965                            mem_ctx, &timeout, NULL);
1966 }
1967
1968 /* 
1969    async freeze recv control
1970 */
1971 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1972 {
1973         int ret;
1974         int32_t res;
1975
1976         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1977         if ( (ret != 0) || (res != 0) ){
1978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1979                 return -1;
1980         }
1981
1982         return 0;
1983 }
1984
1985 /*
1986   freeze databases of a certain priority
1987  */
1988 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1989 {
1990         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1991         struct ctdb_client_control_state *state;
1992         int ret;
1993
1994         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1995         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1996         talloc_free(tmp_ctx);
1997
1998         return ret;
1999 }
2000
2001 /* Freeze all databases */
2002 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2003 {
2004         int i;
2005
2006         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2007                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2008                         return -1;
2009                 }
2010         }
2011         return 0;
2012 }
2013
2014 /*
2015   thaw databases of a certain priority
2016  */
2017 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2018 {
2019         int ret;
2020         int32_t res;
2021
2022         ret = ctdb_control(ctdb, destnode, priority, 
2023                            CTDB_CONTROL_THAW, 0, tdb_null, 
2024                            NULL, NULL, &res, &timeout, NULL);
2025         if (ret != 0 || res != 0) {
2026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2027                 return -1;
2028         }
2029
2030         return 0;
2031 }
2032
2033 /* thaw all databases */
2034 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2035 {
2036         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2037 }
2038
2039 /*
2040   get pnn of a node, or -1
2041  */
2042 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2043 {
2044         int ret;
2045         int32_t res;
2046
2047         ret = ctdb_control(ctdb, destnode, 0, 
2048                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2049                            NULL, NULL, &res, &timeout, NULL);
2050         if (ret != 0) {
2051                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2052                 return -1;
2053         }
2054
2055         return res;
2056 }
2057
2058 /*
2059   get the monitoring mode of a remote node
2060  */
2061 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2062 {
2063         int ret;
2064         int32_t res;
2065
2066         ret = ctdb_control(ctdb, destnode, 0, 
2067                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2068                            NULL, NULL, &res, &timeout, NULL);
2069         if (ret != 0) {
2070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2071                 return -1;
2072         }
2073
2074         *monmode = res;
2075
2076         return 0;
2077 }
2078
2079
2080 /*
2081  set the monitoring mode of a remote node to active
2082  */
2083 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2084 {
2085         int ret;
2086         
2087
2088         ret = ctdb_control(ctdb, destnode, 0, 
2089                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2090                            NULL, NULL,NULL, &timeout, NULL);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2093                 return -1;
2094         }
2095
2096         
2097
2098         return 0;
2099 }
2100
2101 /*
2102   set the monitoring mode of a remote node to disable
2103  */
2104 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2105 {
2106         int ret;
2107         
2108
2109         ret = ctdb_control(ctdb, destnode, 0, 
2110                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2111                            NULL, NULL, NULL, &timeout, NULL);
2112         if (ret != 0) {
2113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2114                 return -1;
2115         }
2116
2117         
2118
2119         return 0;
2120 }
2121
2122
2123
2124 /* 
2125   sent to a node to make it take over an ip address
2126 */
2127 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2128                           uint32_t destnode, struct ctdb_public_ip *ip)
2129 {
2130         TDB_DATA data;
2131         struct ctdb_public_ipv4 ipv4;
2132         int ret;
2133         int32_t res;
2134
2135         if (ip->addr.sa.sa_family == AF_INET) {
2136                 ipv4.pnn = ip->pnn;
2137                 ipv4.sin = ip->addr.ip;
2138
2139                 data.dsize = sizeof(ipv4);
2140                 data.dptr  = (uint8_t *)&ipv4;
2141
2142                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2143                            NULL, &res, &timeout, NULL);
2144         } else {
2145                 data.dsize = sizeof(*ip);
2146                 data.dptr  = (uint8_t *)ip;
2147
2148                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2149                            NULL, &res, &timeout, NULL);
2150         }
2151
2152         if (ret != 0 || res != 0) {
2153                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2154                 return -1;
2155         }
2156
2157         return 0;       
2158 }
2159
2160
2161 /* 
2162   sent to a node to make it release an ip address
2163 */
2164 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2165                          uint32_t destnode, struct ctdb_public_ip *ip)
2166 {
2167         TDB_DATA data;
2168         struct ctdb_public_ipv4 ipv4;
2169         int ret;
2170         int32_t res;
2171
2172         if (ip->addr.sa.sa_family == AF_INET) {
2173                 ipv4.pnn = ip->pnn;
2174                 ipv4.sin = ip->addr.ip;
2175
2176                 data.dsize = sizeof(ipv4);
2177                 data.dptr  = (uint8_t *)&ipv4;
2178
2179                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2180                                    NULL, &res, &timeout, NULL);
2181         } else {
2182                 data.dsize = sizeof(*ip);
2183                 data.dptr  = (uint8_t *)ip;
2184
2185                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2186                                    NULL, &res, &timeout, NULL);
2187         }
2188
2189         if (ret != 0 || res != 0) {
2190                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2191                 return -1;
2192         }
2193
2194         return 0;       
2195 }
2196
2197
2198 /*
2199   get a tunable
2200  */
2201 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2202                           struct timeval timeout, 
2203                           uint32_t destnode,
2204                           const char *name, uint32_t *value)
2205 {
2206         struct ctdb_control_get_tunable *t;
2207         TDB_DATA data, outdata;
2208         int32_t res;
2209         int ret;
2210
2211         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2212         data.dptr  = talloc_size(ctdb, data.dsize);
2213         CTDB_NO_MEMORY(ctdb, data.dptr);
2214
2215         t = (struct ctdb_control_get_tunable *)data.dptr;
2216         t->length = strlen(name)+1;
2217         memcpy(t->name, name, t->length);
2218
2219         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2220                            &outdata, &res, &timeout, NULL);
2221         talloc_free(data.dptr);
2222         if (ret != 0 || res != 0) {
2223                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2224                 return -1;
2225         }
2226
2227         if (outdata.dsize != sizeof(uint32_t)) {
2228                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2229                 talloc_free(outdata.dptr);
2230                 return -1;
2231         }
2232         
2233         *value = *(uint32_t *)outdata.dptr;
2234         talloc_free(outdata.dptr);
2235
2236         return 0;
2237 }
2238
2239 /*
2240   set a tunable
2241  */
2242 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2243                           struct timeval timeout, 
2244                           uint32_t destnode,
2245                           const char *name, uint32_t value)
2246 {
2247         struct ctdb_control_set_tunable *t;
2248         TDB_DATA data;
2249         int32_t res;
2250         int ret;
2251
2252         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2253         data.dptr  = talloc_size(ctdb, data.dsize);
2254         CTDB_NO_MEMORY(ctdb, data.dptr);
2255
2256         t = (struct ctdb_control_set_tunable *)data.dptr;
2257         t->length = strlen(name)+1;
2258         memcpy(t->name, name, t->length);
2259         t->value = value;
2260
2261         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2262                            NULL, &res, &timeout, NULL);
2263         talloc_free(data.dptr);
2264         if (ret != 0 || res != 0) {
2265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2266                 return -1;
2267         }
2268
2269         return 0;
2270 }
2271
2272 /*
2273   list tunables
2274  */
2275 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2276                             struct timeval timeout, 
2277                             uint32_t destnode,
2278                             TALLOC_CTX *mem_ctx,
2279                             const char ***list, uint32_t *count)
2280 {
2281         TDB_DATA outdata;
2282         int32_t res;
2283         int ret;
2284         struct ctdb_control_list_tunable *t;
2285         char *p, *s, *ptr;
2286
2287         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2288                            mem_ctx, &outdata, &res, &timeout, NULL);
2289         if (ret != 0 || res != 0) {
2290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2291                 return -1;
2292         }
2293
2294         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2295         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2296             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2297                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2298                 talloc_free(outdata.dptr);
2299                 return -1;              
2300         }
2301         
2302         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2303         CTDB_NO_MEMORY(ctdb, p);
2304
2305         talloc_free(outdata.dptr);
2306         
2307         (*list) = NULL;
2308         (*count) = 0;
2309
2310         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2311                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2312                 CTDB_NO_MEMORY(ctdb, *list);
2313                 (*list)[*count] = talloc_strdup(*list, s);
2314                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2315                 (*count)++;
2316         }
2317
2318         talloc_free(p);
2319
2320         return 0;
2321 }
2322
2323
2324 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2325                                    struct timeval timeout, uint32_t destnode,
2326                                    TALLOC_CTX *mem_ctx,
2327                                    uint32_t flags,
2328                                    struct ctdb_all_public_ips **ips)
2329 {
2330         int ret;
2331         TDB_DATA outdata;
2332         int32_t res;
2333
2334         ret = ctdb_control(ctdb, destnode, 0, 
2335                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2336                            mem_ctx, &outdata, &res, &timeout, NULL);
2337         if (ret == 0 && res == -1) {
2338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2339                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2340         }
2341         if (ret != 0 || res != 0) {
2342           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2343                 return -1;
2344         }
2345
2346         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2347         talloc_free(outdata.dptr);
2348                     
2349         return 0;
2350 }
2351
2352 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2353                              struct timeval timeout, uint32_t destnode,
2354                              TALLOC_CTX *mem_ctx,
2355                              struct ctdb_all_public_ips **ips)
2356 {
2357         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2358                                               destnode, mem_ctx,
2359                                               0, ips);
2360 }
2361
2362 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2363                         struct timeval timeout, uint32_t destnode, 
2364                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2365 {
2366         int ret, i, len;
2367         TDB_DATA outdata;
2368         int32_t res;
2369         struct ctdb_all_public_ipsv4 *ipsv4;
2370
2371         ret = ctdb_control(ctdb, destnode, 0, 
2372                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2373                            mem_ctx, &outdata, &res, &timeout, NULL);
2374         if (ret != 0 || res != 0) {
2375                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2376                 return -1;
2377         }
2378
2379         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2380         len = offsetof(struct ctdb_all_public_ips, ips) +
2381                 ipsv4->num*sizeof(struct ctdb_public_ip);
2382         *ips = talloc_zero_size(mem_ctx, len);
2383         CTDB_NO_MEMORY(ctdb, *ips);
2384         (*ips)->num = ipsv4->num;
2385         for (i=0; i<ipsv4->num; i++) {
2386                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2387                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2388         }
2389
2390         talloc_free(outdata.dptr);
2391                     
2392         return 0;
2393 }
2394
2395 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2396                                  struct timeval timeout, uint32_t destnode,
2397                                  TALLOC_CTX *mem_ctx,
2398                                  const ctdb_sock_addr *addr,
2399                                  struct ctdb_control_public_ip_info **_info)
2400 {
2401         int ret;
2402         TDB_DATA indata;
2403         TDB_DATA outdata;
2404         int32_t res;
2405         struct ctdb_control_public_ip_info *info;
2406         uint32_t len;
2407         uint32_t i;
2408
2409         indata.dptr = discard_const_p(uint8_t, addr);
2410         indata.dsize = sizeof(*addr);
2411
2412         ret = ctdb_control(ctdb, destnode, 0,
2413                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2414                            mem_ctx, &outdata, &res, &timeout, NULL);
2415         if (ret != 0 || res != 0) {
2416                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2417                                 "failed ret:%d res:%d\n",
2418                                 ret, res));
2419                 return -1;
2420         }
2421
2422         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2423         if (len > outdata.dsize) {
2424                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2425                                 "returned invalid data with size %u > %u\n",
2426                                 (unsigned int)outdata.dsize,
2427                                 (unsigned int)len));
2428                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2429                 return -1;
2430         }
2431
2432         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2433         len += info->num*sizeof(struct ctdb_control_iface_info);
2434
2435         if (len > outdata.dsize) {
2436                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2437                                 "returned invalid data with size %u > %u\n",
2438                                 (unsigned int)outdata.dsize,
2439                                 (unsigned int)len));
2440                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2441                 return -1;
2442         }
2443
2444         /* make sure we null terminate the returned strings */
2445         for (i=0; i < info->num; i++) {
2446                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2447         }
2448
2449         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2450                                                                 outdata.dptr,
2451                                                                 outdata.dsize);
2452         talloc_free(outdata.dptr);
2453         if (*_info == NULL) {
2454                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2455                                 "talloc_memdup size %u failed\n",
2456                                 (unsigned int)outdata.dsize));
2457                 return -1;
2458         }
2459
2460         return 0;
2461 }
2462
2463 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2464                          struct timeval timeout, uint32_t destnode,
2465                          TALLOC_CTX *mem_ctx,
2466                          struct ctdb_control_get_ifaces **_ifaces)
2467 {
2468         int ret;
2469         TDB_DATA outdata;
2470         int32_t res;
2471         struct ctdb_control_get_ifaces *ifaces;
2472         uint32_t len;
2473         uint32_t i;
2474
2475         ret = ctdb_control(ctdb, destnode, 0,
2476                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2477                            mem_ctx, &outdata, &res, &timeout, NULL);
2478         if (ret != 0 || res != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2480                                 "failed ret:%d res:%d\n",
2481                                 ret, res));
2482                 return -1;
2483         }
2484
2485         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2486         if (len > outdata.dsize) {
2487                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2488                                 "returned invalid data with size %u > %u\n",
2489                                 (unsigned int)outdata.dsize,
2490                                 (unsigned int)len));
2491                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2492                 return -1;
2493         }
2494
2495         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2496         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2497
2498         if (len > outdata.dsize) {
2499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2500                                 "returned invalid data with size %u > %u\n",
2501                                 (unsigned int)outdata.dsize,
2502                                 (unsigned int)len));
2503                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2504                 return -1;
2505         }
2506
2507         /* make sure we null terminate the returned strings */
2508         for (i=0; i < ifaces->num; i++) {
2509                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2510         }
2511
2512         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2513                                                                   outdata.dptr,
2514                                                                   outdata.dsize);
2515         talloc_free(outdata.dptr);
2516         if (*_ifaces == NULL) {
2517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2518                                 "talloc_memdup size %u failed\n",
2519                                 (unsigned int)outdata.dsize));
2520                 return -1;
2521         }
2522
2523         return 0;
2524 }
2525
2526 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2527                              struct timeval timeout, uint32_t destnode,
2528                              TALLOC_CTX *mem_ctx,
2529                              const struct ctdb_control_iface_info *info)
2530 {
2531         int ret;
2532         TDB_DATA indata;
2533         int32_t res;
2534
2535         indata.dptr = discard_const_p(uint8_t, info);
2536         indata.dsize = sizeof(*info);
2537
2538         ret = ctdb_control(ctdb, destnode, 0,
2539                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2540                            mem_ctx, NULL, &res, &timeout, NULL);
2541         if (ret != 0 || res != 0) {
2542                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2543                                 "failed ret:%d res:%d\n",
2544                                 ret, res));
2545                 return -1;
2546         }
2547
2548         return 0;
2549 }
2550
2551 /*
2552   set/clear the permanent disabled bit on a remote node
2553  */
2554 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2555                        uint32_t set, uint32_t clear)
2556 {
2557         int ret;
2558         TDB_DATA data;
2559         struct ctdb_node_map *nodemap=NULL;
2560         struct ctdb_node_flag_change c;
2561         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2562         uint32_t recmaster;
2563         uint32_t *nodes;
2564
2565
2566         /* find the recovery master */
2567         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2568         if (ret != 0) {
2569                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2570                 talloc_free(tmp_ctx);
2571                 return ret;
2572         }
2573
2574
2575         /* read the node flags from the recmaster */
2576         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2577         if (ret != 0) {
2578                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2579                 talloc_free(tmp_ctx);
2580                 return -1;
2581         }
2582         if (destnode >= nodemap->num) {
2583                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2584                 talloc_free(tmp_ctx);
2585                 return -1;
2586         }
2587
2588         c.pnn       = destnode;
2589         c.old_flags = nodemap->nodes[destnode].flags;
2590         c.new_flags = c.old_flags;
2591         c.new_flags |= set;
2592         c.new_flags &= ~clear;
2593
2594         data.dsize = sizeof(c);
2595         data.dptr = (unsigned char *)&c;
2596
2597         /* send the flags update to all connected nodes */
2598         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2599
2600         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2601                                         nodes, 0,
2602                                         timeout, false, data,
2603                                         NULL, NULL,
2604                                         NULL) != 0) {
2605                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2606
2607                 talloc_free(tmp_ctx);
2608                 return -1;
2609         }
2610
2611         talloc_free(tmp_ctx);
2612         return 0;
2613 }
2614
2615
2616 /*
2617   get all tunables
2618  */
2619 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2620                                struct timeval timeout, 
2621                                uint32_t destnode,
2622                                struct ctdb_tunable *tunables)
2623 {
2624         TDB_DATA outdata;
2625         int ret;
2626         int32_t res;
2627
2628         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2629                            &outdata, &res, &timeout, NULL);
2630         if (ret != 0 || res != 0) {
2631                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2632                 return -1;
2633         }
2634
2635         if (outdata.dsize != sizeof(*tunables)) {
2636                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2637                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2638                 return -1;              
2639         }
2640
2641         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2642         talloc_free(outdata.dptr);
2643         return 0;
2644 }
2645
2646 /*
2647   add a public address to a node
2648  */
2649 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2650                       struct timeval timeout, 
2651                       uint32_t destnode,
2652                       struct ctdb_control_ip_iface *pub)
2653 {
2654         TDB_DATA data;
2655         int32_t res;
2656         int ret;
2657
2658         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2659         data.dptr  = (unsigned char *)pub;
2660
2661         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2662                            NULL, &res, &timeout, NULL);
2663         if (ret != 0 || res != 0) {
2664                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2665                 return -1;
2666         }
2667
2668         return 0;
2669 }
2670
2671 /*
2672   delete a public address from a node
2673  */
2674 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2675                       struct timeval timeout, 
2676                       uint32_t destnode,
2677                       struct ctdb_control_ip_iface *pub)
2678 {
2679         TDB_DATA data;
2680         int32_t res;
2681         int ret;
2682
2683         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2684         data.dptr  = (unsigned char *)pub;
2685
2686         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2687                            NULL, &res, &timeout, NULL);
2688         if (ret != 0 || res != 0) {
2689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2690                 return -1;
2691         }
2692
2693         return 0;
2694 }
2695
2696 /*
2697   kill a tcp connection
2698  */
2699 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2700                       struct timeval timeout, 
2701                       uint32_t destnode,
2702                       struct ctdb_control_killtcp *killtcp)
2703 {
2704         TDB_DATA data;
2705         int32_t res;
2706         int ret;
2707
2708         data.dsize = sizeof(struct ctdb_control_killtcp);
2709         data.dptr  = (unsigned char *)killtcp;
2710
2711         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2712                            NULL, &res, &timeout, NULL);
2713         if (ret != 0 || res != 0) {
2714                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2715                 return -1;
2716         }
2717
2718         return 0;
2719 }
2720
2721 /*
2722   send a gratious arp
2723  */
2724 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2725                       struct timeval timeout, 
2726                       uint32_t destnode,
2727                       ctdb_sock_addr *addr,
2728                       const char *ifname)
2729 {
2730         TDB_DATA data;
2731         int32_t res;
2732         int ret, len;
2733         struct ctdb_control_gratious_arp *gratious_arp;
2734         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2735
2736
2737         len = strlen(ifname)+1;
2738         gratious_arp = talloc_size(tmp_ctx, 
2739                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2740         CTDB_NO_MEMORY(ctdb, gratious_arp);
2741
2742         gratious_arp->addr = *addr;
2743         gratious_arp->len = len;
2744         memcpy(&gratious_arp->iface[0], ifname, len);
2745
2746
2747         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2748         data.dptr  = (unsigned char *)gratious_arp;
2749
2750         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2751                            NULL, &res, &timeout, NULL);
2752         if (ret != 0 || res != 0) {
2753                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2754                 talloc_free(tmp_ctx);
2755                 return -1;
2756         }
2757
2758         talloc_free(tmp_ctx);
2759         return 0;
2760 }
2761
2762 /*
2763   get a list of all tcp tickles that a node knows about for a particular vnn
2764  */
2765 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2766                               struct timeval timeout, uint32_t destnode, 
2767                               TALLOC_CTX *mem_ctx, 
2768                               ctdb_sock_addr *addr,
2769                               struct ctdb_control_tcp_tickle_list **list)
2770 {
2771         int ret;
2772         TDB_DATA data, outdata;
2773         int32_t status;
2774
2775         data.dptr = (uint8_t*)addr;
2776         data.dsize = sizeof(ctdb_sock_addr);
2777
2778         ret = ctdb_control(ctdb, destnode, 0, 
2779                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2780                            mem_ctx, &outdata, &status, NULL, NULL);
2781         if (ret != 0 || status != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2783                 return -1;
2784         }
2785
2786         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2787
2788         return status;
2789 }
2790
2791 /*
2792   register a server id
2793  */
2794 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2795                       struct timeval timeout, 
2796                       struct ctdb_server_id *id)
2797 {
2798         TDB_DATA data;
2799         int32_t res;
2800         int ret;
2801
2802         data.dsize = sizeof(struct ctdb_server_id);
2803         data.dptr  = (unsigned char *)id;
2804
2805         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2806                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2807                         0, data, NULL,
2808                         NULL, &res, &timeout, NULL);
2809         if (ret != 0 || res != 0) {
2810                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2811                 return -1;
2812         }
2813
2814         return 0;
2815 }
2816
2817 /*
2818   unregister a server id
2819  */
2820 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2821                       struct timeval timeout, 
2822                       struct ctdb_server_id *id)
2823 {
2824         TDB_DATA data;
2825         int32_t res;
2826         int ret;
2827
2828         data.dsize = sizeof(struct ctdb_server_id);
2829         data.dptr  = (unsigned char *)id;
2830
2831         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2832                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2833                         0, data, NULL,
2834                         NULL, &res, &timeout, NULL);
2835         if (ret != 0 || res != 0) {
2836                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2837                 return -1;
2838         }
2839
2840         return 0;
2841 }
2842
2843
2844 /*
2845   check if a server id exists
2846
2847   if a server id does exist, return *status == 1, otherwise *status == 0
2848  */
2849 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2850                       struct timeval timeout, 
2851                       uint32_t destnode,
2852                       struct ctdb_server_id *id,
2853                       uint32_t *status)
2854 {
2855         TDB_DATA data;
2856         int32_t res;
2857         int ret;
2858
2859         data.dsize = sizeof(struct ctdb_server_id);
2860         data.dptr  = (unsigned char *)id;
2861
2862         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2863                         0, data, NULL,
2864                         NULL, &res, &timeout, NULL);
2865         if (ret != 0) {
2866                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2867                 return -1;
2868         }
2869
2870         if (res) {
2871                 *status = 1;
2872         } else {
2873                 *status = 0;
2874         }
2875
2876         return 0;
2877 }
2878
2879 /*
2880    get the list of server ids that are registered on a node
2881 */
2882 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2883                 TALLOC_CTX *mem_ctx,
2884                 struct timeval timeout, uint32_t destnode, 
2885                 struct ctdb_server_id_list **svid_list)
2886 {
2887         int ret;
2888         TDB_DATA outdata;
2889         int32_t res;
2890
2891         ret = ctdb_control(ctdb, destnode, 0, 
2892                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2893                            mem_ctx, &outdata, &res, &timeout, NULL);
2894         if (ret != 0 || res != 0) {
2895                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2896                 return -1;
2897         }
2898
2899         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2900                     
2901         return 0;
2902 }
2903
2904 /*
2905   initialise the ctdb daemon for client applications
2906
2907   NOTE: In current code the daemon does not fork. This is for testing purposes only
2908   and to simplify the code.
2909 */
2910 struct ctdb_context *ctdb_init(struct event_context *ev)
2911 {
2912         int ret;
2913         struct ctdb_context *ctdb;
2914
2915         ctdb = talloc_zero(ev, struct ctdb_context);
2916         if (ctdb == NULL) {
2917                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2918                 return NULL;
2919         }
2920         ctdb->ev  = ev;
2921         ctdb->idr = idr_init(ctdb);
2922         /* Wrap early to exercise code. */
2923         ctdb->lastid = INT_MAX-200;
2924         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2925
2926         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2927         if (ret != 0) {
2928                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2929                 talloc_free(ctdb);
2930                 return NULL;
2931         }
2932
2933         ctdb->statistics.statistics_start_time = timeval_current();
2934
2935         return ctdb;
2936 }
2937
2938
2939 /*
2940   set some ctdb flags
2941 */
2942 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2943 {
2944         ctdb->flags |= flags;
2945 }
2946
2947 /*
2948   setup the local socket name
2949 */
2950 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2951 {
2952         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2953         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2954
2955         return 0;
2956 }
2957
2958 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2959 {
2960         return ctdb->daemon.name;
2961 }
2962
2963 /*
2964   return the pnn of this node
2965 */
2966 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2967 {
2968         return ctdb->pnn;
2969 }
2970
2971
2972 /*
2973   get the uptime of a remote node
2974  */
2975 struct ctdb_client_control_state *
2976 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2977 {
2978         return ctdb_control_send(ctdb, destnode, 0, 
2979                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2980                            mem_ctx, &timeout, NULL);
2981 }
2982
2983 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2984 {
2985         int ret;
2986         int32_t res;
2987         TDB_DATA outdata;
2988
2989         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2990         if (ret != 0 || res != 0) {
2991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2992                 return -1;
2993         }
2994
2995         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2996
2997         return 0;
2998 }
2999
3000 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3001 {
3002         struct ctdb_client_control_state *state;
3003
3004         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3005         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3006 }
3007
3008 /*
3009   send a control to execute the "recovered" event script on a node
3010  */
3011 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3012 {
3013         int ret;
3014         int32_t status;
3015
3016         ret = ctdb_control(ctdb, destnode, 0, 
3017                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3018                            NULL, NULL, &status, &timeout, NULL);
3019         if (ret != 0 || status != 0) {
3020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3021                 return -1;
3022         }
3023
3024         return 0;
3025 }
3026
3027 /* 
3028   callback for the async helpers used when sending the same control
3029   to multiple nodes in parallell.
3030 */
3031 static void async_callback(struct ctdb_client_control_state *state)
3032 {
3033         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3034         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3035         int ret;
3036         TDB_DATA outdata;
3037         int32_t res;
3038         uint32_t destnode = state->c->hdr.destnode;
3039
3040         /* one more node has responded with recmode data */
3041         data->count--;
3042
3043         /* if we failed to push the db, then return an error and let
3044            the main loop try again.
3045         */
3046         if (state->state != CTDB_CONTROL_DONE) {
3047                 if ( !data->dont_log_errors) {
3048                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3049                 }
3050                 data->fail_count++;
3051                 if (data->fail_callback) {
3052                         data->fail_callback(ctdb, destnode, res, outdata,
3053                                         data->callback_data);
3054                 }
3055                 return;
3056         }
3057         
3058         state->async.fn = NULL;
3059
3060         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3061         if ((ret != 0) || (res != 0)) {
3062                 if ( !data->dont_log_errors) {
3063                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3064                 }
3065                 data->fail_count++;
3066                 if (data->fail_callback) {
3067                         data->fail_callback(ctdb, destnode, res, outdata,
3068                                         data->callback_data);
3069                 }
3070         }
3071         if ((ret == 0) && (data->callback != NULL)) {
3072                 data->callback(ctdb, destnode, res, outdata,
3073                                         data->callback_data);
3074         }
3075 }
3076
3077
3078 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3079 {
3080         /* set up the callback functions */
3081         state->async.fn = async_callback;
3082         state->async.private_data = data;
3083         
3084         /* one more control to wait for to complete */
3085         data->count++;
3086 }
3087
3088
3089 /* wait for up to the maximum number of seconds allowed
3090    or until all nodes we expect a response from has replied
3091 */
3092 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3093 {
3094         while (data->count > 0) {
3095                 event_loop_once(ctdb->ev);
3096         }
3097         if (data->fail_count != 0) {
3098                 if (!data->dont_log_errors) {
3099                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3100                                  data->fail_count));
3101                 }
3102                 return -1;
3103         }
3104         return 0;
3105 }
3106
3107
3108 /* 
3109    perform a simple control on the listed nodes
3110    The control cannot return data
3111  */
3112 int ctdb_client_async_control(struct ctdb_context *ctdb,
3113                                 enum ctdb_controls opcode,
3114                                 uint32_t *nodes,
3115                                 uint64_t srvid,
3116                                 struct timeval timeout,
3117                                 bool dont_log_errors,
3118                                 TDB_DATA data,
3119                                 client_async_callback client_callback,
3120                                 client_async_callback fail_callback,
3121                                 void *callback_data)
3122 {
3123         struct client_async_data *async_data;
3124         struct ctdb_client_control_state *state;
3125         int j, num_nodes;
3126
3127         async_data = talloc_zero(ctdb, struct client_async_data);
3128         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3129         async_data->dont_log_errors = dont_log_errors;
3130         async_data->callback = client_callback;
3131         async_data->fail_callback = fail_callback;
3132         async_data->callback_data = callback_data;
3133         async_data->opcode        = opcode;
3134
3135         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3136
3137         /* loop over all nodes and send an async control to each of them */
3138         for (j=0; j<num_nodes; j++) {
3139                 uint32_t pnn = nodes[j];
3140
3141                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3142                                           0, data, async_data, &timeout, NULL);
3143                 if (state == NULL) {
3144                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3145                         talloc_free(async_data);
3146                         return -1;
3147                 }
3148                 
3149                 ctdb_client_async_add(async_data, state);
3150         }
3151
3152         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3153                 talloc_free(async_data);
3154                 return -1;
3155         }
3156
3157         talloc_free(async_data);
3158         return 0;
3159 }
3160
3161 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3162                                 struct ctdb_vnn_map *vnn_map,
3163                                 TALLOC_CTX *mem_ctx,
3164                                 bool include_self)
3165 {
3166         int i, j, num_nodes;
3167         uint32_t *nodes;
3168
3169         for (i=num_nodes=0;i<vnn_map->size;i++) {
3170                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3171                         continue;
3172                 }
3173                 num_nodes++;
3174         } 
3175
3176         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3177         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3178
3179         for (i=j=0;i<vnn_map->size;i++) {
3180                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3181                         continue;
3182                 }
3183                 nodes[j++] = vnn_map->map[i];
3184         } 
3185
3186         return nodes;
3187 }
3188
3189 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3190                                 struct ctdb_node_map *node_map,
3191                                 TALLOC_CTX *mem_ctx,
3192                                 bool include_self)
3193 {
3194         int i, j, num_nodes;
3195         uint32_t *nodes;
3196
3197         for (i=num_nodes=0;i<node_map->num;i++) {
3198                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3199                         continue;
3200                 }
3201                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3202                         continue;
3203                 }
3204                 num_nodes++;
3205         } 
3206
3207         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3208         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3209
3210         for (i=j=0;i<node_map->num;i++) {
3211                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3212                         continue;
3213                 }
3214                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3215                         continue;
3216                 }
3217                 nodes[j++] = node_map->nodes[i].pnn;
3218         } 
3219
3220         return nodes;
3221 }
3222
3223 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3224                                 struct ctdb_node_map *node_map,
3225                                 TALLOC_CTX *mem_ctx,
3226                                 uint32_t pnn)
3227 {
3228         int i, j, num_nodes;
3229         uint32_t *nodes;
3230
3231         for (i=num_nodes=0;i<node_map->num;i++) {
3232                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3233                         continue;
3234                 }
3235                 if (node_map->nodes[i].pnn == pnn) {
3236                         continue;
3237                 }
3238                 num_nodes++;
3239         } 
3240
3241         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3242         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3243
3244         for (i=j=0;i<node_map->num;i++) {
3245                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3246                         continue;
3247                 }
3248                 if (node_map->nodes[i].pnn == pnn) {
3249                         continue;
3250                 }
3251                 nodes[j++] = node_map->nodes[i].pnn;
3252         } 
3253
3254         return nodes;
3255 }
3256
3257 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3258                                 struct ctdb_node_map *node_map,
3259                                 TALLOC_CTX *mem_ctx,
3260                                 bool include_self)
3261 {
3262         int i, j, num_nodes;
3263         uint32_t *nodes;
3264
3265         for (i=num_nodes=0;i<node_map->num;i++) {
3266                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3267                         continue;
3268                 }
3269                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3270                         continue;
3271                 }
3272                 num_nodes++;
3273         } 
3274
3275         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3276         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3277
3278         for (i=j=0;i<node_map->num;i++) {
3279                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3280                         continue;
3281                 }
3282                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3283                         continue;
3284                 }
3285                 nodes[j++] = node_map->nodes[i].pnn;
3286         } 
3287
3288         return nodes;
3289 }
3290
3291 /* 
3292   this is used to test if a pnn lock exists and if it exists will return
3293   the number of connections that pnn has reported or -1 if that recovery
3294   daemon is not running.
3295 */
3296 int
3297 ctdb_read_pnn_lock(int fd, int32_t pnn)
3298 {
3299         struct flock lock;
3300         char c;
3301
3302         lock.l_type = F_WRLCK;
3303         lock.l_whence = SEEK_SET;
3304         lock.l_start = pnn;
3305         lock.l_len = 1;
3306         lock.l_pid = 0;
3307
3308         if (fcntl(fd, F_GETLK, &lock) != 0) {
3309                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3310                 return -1;
3311         }
3312
3313         if (lock.l_type == F_UNLCK) {
3314                 return -1;
3315         }
3316
3317         if (pread(fd, &c, 1, pnn) == -1) {
3318                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3319                 return -1;
3320         }
3321
3322         return c;
3323 }
3324
3325 /*
3326   get capabilities of a remote node
3327  */
3328 struct ctdb_client_control_state *
3329 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3330 {
3331         return ctdb_control_send(ctdb, destnode, 0, 
3332                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3333                            mem_ctx, &timeout, NULL);
3334 }
3335
3336 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3337 {
3338         int ret;
3339         int32_t res;
3340         TDB_DATA outdata;
3341
3342         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3343         if ( (ret != 0) || (res != 0) ) {
3344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3345                 return -1;
3346         }
3347
3348         if (capabilities) {
3349                 *capabilities = *((uint32_t *)outdata.dptr);
3350         }
3351
3352         return 0;
3353 }
3354
3355 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3356 {
3357         struct ctdb_client_control_state *state;
3358         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3359         int ret;
3360
3361         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3362         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3363         talloc_free(tmp_ctx);
3364         return ret;
3365 }
3366
3367 /**
3368  * check whether a transaction is active on a given db on a given node
3369  */
3370 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3371                                      uint32_t destnode,
3372                                      uint32_t db_id)
3373 {
3374         int32_t status;
3375         int ret;
3376         TDB_DATA indata;
3377
3378         indata.dptr = (uint8_t *)&db_id;
3379         indata.dsize = sizeof(db_id);
3380
3381         ret = ctdb_control(ctdb, destnode, 0,
3382                            CTDB_CONTROL_TRANS2_ACTIVE,
3383                            0, indata, NULL, NULL, &status,
3384                            NULL, NULL);
3385
3386         if (ret != 0) {
3387                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3388                 return -1;
3389         }
3390
3391         return status;
3392 }
3393
3394
3395 struct ctdb_transaction_handle {
3396         struct ctdb_db_context *ctdb_db;
3397         bool in_replay;
3398         /*
3399          * we store the reads and writes done under a transaction:
3400          * - one list stores both reads and writes (m_all),
3401          * - the other just writes (m_write)
3402          */
3403         struct ctdb_marshall_buffer *m_all;
3404         struct ctdb_marshall_buffer *m_write;
3405 };
3406
3407 /* start a transaction on a database */
3408 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3409 {
3410         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3411         return 0;
3412 }
3413
3414 /* start a transaction on a database */
3415 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3416 {
3417         struct ctdb_record_handle *rh;
3418         TDB_DATA key;
3419         TDB_DATA data;
3420         struct ctdb_ltdb_header header;
3421         TALLOC_CTX *tmp_ctx;
3422         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3423         int ret;
3424         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3425         pid_t pid;
3426         int32_t status;
3427
3428         key.dptr = discard_const(keyname);
3429         key.dsize = strlen(keyname);
3430
3431         if (!ctdb_db->persistent) {
3432                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3433                 return -1;
3434         }
3435
3436 again:
3437         tmp_ctx = talloc_new(h);
3438
3439         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3440         if (rh == NULL) {
3441                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3442                 talloc_free(tmp_ctx);
3443                 return -1;
3444         }
3445
3446         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3447                                               CTDB_CURRENT_NODE,
3448                                               ctdb_db->db_id);
3449         if (status == 1) {
3450                 unsigned long int usec = (1000 + random()) % 100000;
3451                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3452                                     "on db_id[0x%08x]. waiting for %lu "
3453                                     "microseconds\n",
3454                                     ctdb_db->db_id, usec));
3455                 talloc_free(tmp_ctx);
3456                 usleep(usec);
3457                 goto again;
3458         }
3459
3460         /*
3461          * store the pid in the database:
3462          * it is not enough that the node is dmaster...
3463          */
3464         pid = getpid();
3465         data.dptr = (unsigned char *)&pid;
3466         data.dsize = sizeof(pid_t);
3467         rh->header.rsn++;
3468         rh->header.dmaster = ctdb_db->ctdb->pnn;
3469         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3470         if (ret != 0) {
3471                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3472                                   "transaction record\n"));
3473                 talloc_free(tmp_ctx);
3474                 return -1;
3475         }
3476
3477         talloc_free(rh);
3478
3479         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3480         if (ret != 0) {
3481                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3482                 talloc_free(tmp_ctx);
3483                 return -1;
3484         }
3485
3486         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3487         if (ret != 0) {
3488                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3489                                  "lock record inside transaction\n"));
3490                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3491                 talloc_free(tmp_ctx);
3492                 goto again;
3493         }
3494
3495         if (header.dmaster != ctdb_db->ctdb->pnn) {
3496                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3497                                    "transaction lock record\n"));
3498                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3499                 talloc_free(tmp_ctx);
3500                 goto again;
3501         }
3502
3503         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3504                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3505                                     "the transaction lock record\n"));
3506                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3507                 talloc_free(tmp_ctx);
3508                 goto again;
3509         }
3510
3511         talloc_free(tmp_ctx);
3512
3513         return 0;
3514 }
3515
3516
3517 /* start a transaction on a database */
3518 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3519                                                        TALLOC_CTX *mem_ctx)
3520 {
3521         struct ctdb_transaction_handle *h;
3522         int ret;
3523
3524         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3525         if (h == NULL) {
3526                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3527                 return NULL;
3528         }
3529
3530         h->ctdb_db = ctdb_db;
3531
3532         ret = ctdb_transaction_fetch_start(h);
3533         if (ret != 0) {
3534                 talloc_free(h);
3535                 return NULL;
3536         }
3537
3538         talloc_set_destructor(h, ctdb_transaction_destructor);
3539
3540         return h;
3541 }
3542
3543
3544
3545 /*
3546   fetch a record inside a transaction
3547  */
3548 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3549                            TALLOC_CTX *mem_ctx, 
3550                            TDB_DATA key, TDB_DATA *data)
3551 {
3552         struct ctdb_ltdb_header header;
3553         int ret;
3554
3555         ZERO_STRUCT(header);
3556
3557         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3558         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3559                 /* record doesn't exist yet */
3560                 *data = tdb_null;
3561                 ret = 0;
3562         }
3563         
3564         if (ret != 0) {
3565                 return ret;
3566         }
3567
3568         if (!h->in_replay) {
3569                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3570                 if (h->m_all == NULL) {
3571                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3572                         return -1;
3573                 }
3574         }
3575
3576         return 0;
3577 }
3578
3579 /*
3580   stores a record inside a transaction
3581  */
3582 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3583                            TDB_DATA key, TDB_DATA data)
3584 {
3585         TALLOC_CTX *tmp_ctx = talloc_new(h);
3586         struct ctdb_ltdb_header header;
3587         TDB_DATA olddata;
3588         int ret;
3589
3590         ZERO_STRUCT(header);
3591
3592         /* we need the header so we can update the RSN */
3593         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3594         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3595                 /* the record doesn't exist - create one with us as dmaster.
3596                    This is only safe because we are in a transaction and this
3597                    is a persistent database */
3598                 ZERO_STRUCT(header);
3599         } else if (ret != 0) {
3600                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3601                 talloc_free(tmp_ctx);
3602                 return ret;
3603         }
3604
3605         if (data.dsize == olddata.dsize &&
3606             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3607                 /* save writing the same data */
3608                 talloc_free(tmp_ctx);
3609                 return 0;
3610         }
3611
3612         header.dmaster = h->ctdb_db->ctdb->pnn;
3613         header.rsn++;
3614
3615         if (!h->in_replay) {
3616                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3617                 if (h->m_all == NULL) {
3618                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3619                         talloc_free(tmp_ctx);
3620                         return -1;
3621                 }
3622         }               
3623
3624         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3625         if (h->m_write == NULL) {
3626                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3627                 talloc_free(tmp_ctx);
3628                 return -1;
3629         }
3630         
3631         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3632
3633         talloc_free(tmp_ctx);
3634         
3635         return ret;
3636 }
3637
3638 /*
3639   replay a transaction
3640  */
3641 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3642 {
3643         int ret, i;
3644         struct ctdb_rec_data *rec = NULL;
3645
3646         h->in_replay = true;
3647         talloc_free(h->m_write);
3648         h->m_write = NULL;
3649
3650         ret = ctdb_transaction_fetch_start(h);
3651         if (ret != 0) {
3652                 return ret;
3653         }
3654
3655         for (i=0;i<h->m_all->count;i++) {
3656                 TDB_DATA key, data;
3657
3658                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3659                 if (rec == NULL) {
3660                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3661                         goto failed;
3662                 }
3663
3664                 if (rec->reqid == 0) {
3665                         /* its a store */
3666                         if (ctdb_transaction_store(h, key, data) != 0) {
3667                                 goto failed;
3668                         }
3669                 } else {
3670                         TDB_DATA data2;
3671                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3672
3673                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3674                                 talloc_free(tmp_ctx);
3675                                 goto failed;
3676                         }
3677                         if (data2.dsize != data.dsize ||
3678                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3679                                 /* the record has changed on us - we have to give up */
3680                                 talloc_free(tmp_ctx);
3681                                 goto failed;
3682                         }
3683                         talloc_free(tmp_ctx);
3684                 }
3685         }
3686         
3687         return 0;
3688
3689 failed:
3690         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3691         return -1;
3692 }
3693
3694
3695 /*
3696   commit a transaction
3697  */
3698 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3699 {
3700         int ret, retries=0;
3701         int32_t status;
3702         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3703         struct timeval timeout;
3704         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3705
3706         talloc_set_destructor(h, NULL);
3707
3708         /* our commit strategy is quite complex.
3709
3710            - we first try to commit the changes to all other nodes
3711
3712            - if that works, then we commit locally and we are done
3713
3714            - if a commit on another node fails, then we need to cancel
3715              the transaction, then restart the transaction (thus
3716              opening a window of time for a pending recovery to
3717              complete), then replay the transaction, checking all the
3718              reads and writes (checking that reads give the same data,
3719              and writes succeed). Then we retry the transaction to the
3720              other nodes
3721         */
3722
3723 again:
3724         if (h->m_write == NULL) {
3725                 /* no changes were made */
3726                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3727                 talloc_free(h);
3728                 return 0;
3729         }
3730
3731         /* tell ctdbd to commit to the other nodes */
3732         timeout = timeval_current_ofs(1, 0);
3733         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3734                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3735                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3736                            &timeout, NULL);
3737         if (ret != 0 || status != 0) {
3738                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3739                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3740                                      ", retrying after 1 second...\n",
3741                                      (retries==0)?"":"retry "));
3742                 sleep(1);
3743
3744                 if (ret != 0) {
3745                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3746                 } else {
3747                         /* work out what error code we will give if we 
3748                            have to fail the operation */
3749                         switch ((enum ctdb_trans2_commit_error)status) {
3750                         case CTDB_TRANS2_COMMIT_SUCCESS:
3751                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3752                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3753                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3754                                 break;
3755                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3756                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3757                                 break;
3758                         }
3759                 }
3760
3761                 if (++retries == 100) {
3762                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3763                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3764                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3765                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3766                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3767                         talloc_free(h);
3768                         return -1;
3769                 }               
3770
3771                 if (ctdb_replay_transaction(h) != 0) {
3772                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3773                                           "transaction on db 0x%08x, "
3774                                           "failure control =%u\n",
3775                                           h->ctdb_db->db_id,
3776                                           (unsigned)failure_control));
3777                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3778                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3779                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3780                         talloc_free(h);
3781                         return -1;
3782                 }
3783                 goto again;
3784         } else {
3785                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3786         }
3787
3788         /* do the real commit locally */
3789         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3790         if (ret != 0) {
3791                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3792                                   "on db id 0x%08x locally, "
3793                                   "failure_control=%u\n",
3794                                   h->ctdb_db->db_id,
3795                                   (unsigned)failure_control));
3796                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3797                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3798                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3799                 talloc_free(h);
3800                 return ret;
3801         }
3802
3803         /* tell ctdbd that we are finished with our local commit */
3804         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3805                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3806                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3807         talloc_free(h);
3808         return 0;
3809 }
3810
3811 /*
3812   recovery daemon ping to main daemon
3813  */
3814 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3815 {
3816         int ret;
3817         int32_t res;
3818
3819         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3820                            ctdb, NULL, &res, NULL, NULL);
3821         if (ret != 0 || res != 0) {
3822                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3823                 return -1;
3824         }
3825
3826         return 0;
3827 }
3828
3829 /* when forking the main daemon and the child process needs to connect back
3830  * to the daemon as a client process, this function can be used to change
3831  * the ctdb context from daemon into client mode
3832  */
3833 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3834 {
3835         int ret;
3836         va_list ap;
3837
3838         /* Add extra information so we can identify this in the logs */
3839         va_start(ap, fmt);
3840         debug_extra = talloc_append_string(NULL, talloc_vasprintf(NULL, fmt, ap), ":");
3841         va_end(ap);
3842
3843         /* shutdown the transport */
3844         if (ctdb->methods) {
3845                 ctdb->methods->shutdown(ctdb);
3846         }
3847
3848         /* get a new event context */
3849         talloc_free(ctdb->ev);
3850         ctdb->ev = event_context_init(ctdb);
3851         tevent_loop_allow_nesting(ctdb->ev);
3852
3853         close(ctdb->daemon.sd);
3854         ctdb->daemon.sd = -1;
3855
3856         /* initialise ctdb */
3857         ret = ctdb_socket_connect(ctdb);
3858         if (ret != 0) {
3859                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3860                 return -1;
3861         }
3862
3863          return 0;
3864 }
3865
3866 /*
3867   get the status of running the monitor eventscripts: NULL means never run.
3868  */
3869 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3870                 struct timeval timeout, uint32_t destnode, 
3871                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3872                 struct ctdb_scripts_wire **script_status)
3873 {
3874         int ret;
3875         TDB_DATA outdata, indata;
3876         int32_t res;
3877         uint32_t uinttype = type;
3878
3879         indata.dptr = (uint8_t *)&uinttype;
3880         indata.dsize = sizeof(uinttype);
3881
3882         ret = ctdb_control(ctdb, destnode, 0, 
3883                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3884                            mem_ctx, &outdata, &res, &timeout, NULL);
3885         if (ret != 0 || res != 0) {
3886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3887                 return -1;
3888         }
3889
3890         if (outdata.dsize == 0) {
3891                 *script_status = NULL;
3892         } else {
3893                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3894                 talloc_free(outdata.dptr);
3895         }
3896                     
3897         return 0;
3898 }
3899
3900 /*
3901   tell the main daemon how long it took to lock the reclock file
3902  */
3903 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3904 {
3905         int ret;
3906         int32_t res;
3907         TDB_DATA data;
3908
3909         data.dptr = (uint8_t *)&latency;
3910         data.dsize = sizeof(latency);
3911
3912         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3913                            ctdb, NULL, &res, NULL, NULL);
3914         if (ret != 0 || res != 0) {
3915                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3916                 return -1;
3917         }
3918
3919         return 0;
3920 }
3921
3922 /*
3923   get the name of the reclock file
3924  */
3925 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3926                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3927                          const char **name)
3928 {
3929         int ret;
3930         int32_t res;
3931         TDB_DATA data;
3932
3933         ret = ctdb_control(ctdb, destnode, 0, 
3934                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3935                            mem_ctx, &data, &res, &timeout, NULL);
3936         if (ret != 0 || res != 0) {
3937                 return -1;
3938         }
3939
3940         if (data.dsize == 0) {
3941                 *name = NULL;
3942         } else {
3943                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3944         }
3945         talloc_free(data.dptr);
3946
3947         return 0;
3948 }
3949
3950 /*
3951   set the reclock filename for a node
3952  */
3953 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3954 {
3955         int ret;
3956         TDB_DATA data;
3957         int32_t res;
3958
3959         if (reclock == NULL) {
3960                 data.dsize = 0;
3961                 data.dptr  = NULL;
3962         } else {
3963                 data.dsize = strlen(reclock) + 1;
3964                 data.dptr  = discard_const(reclock);
3965         }
3966
3967         ret = ctdb_control(ctdb, destnode, 0, 
3968                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3969                            NULL, NULL, &res, &timeout, NULL);
3970         if (ret != 0 || res != 0) {
3971                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3972                 return -1;
3973         }
3974
3975         return 0;
3976 }
3977
3978 /*
3979   stop a node
3980  */
3981 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3982 {
3983         int ret;
3984         int32_t res;
3985
3986         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3987                            ctdb, NULL, &res, &timeout, NULL);
3988         if (ret != 0 || res != 0) {
3989                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3990                 return -1;
3991         }
3992
3993         return 0;
3994 }
3995
3996 /*
3997   continue a node
3998  */
3999 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4000 {
4001         int ret;
4002
4003         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4004                            ctdb, NULL, NULL, &timeout, NULL);
4005         if (ret != 0) {
4006                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4007                 return -1;
4008         }
4009
4010         return 0;
4011 }
4012
4013 /*
4014   set the natgw state for a node
4015  */
4016 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4017 {
4018         int ret;
4019         TDB_DATA data;
4020         int32_t res;
4021
4022         data.dsize = sizeof(natgwstate);
4023         data.dptr  = (uint8_t *)&natgwstate;
4024
4025         ret = ctdb_control(ctdb, destnode, 0, 
4026                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4027                            NULL, NULL, &res, &timeout, NULL);
4028         if (ret != 0 || res != 0) {
4029                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4030                 return -1;
4031         }
4032
4033         return 0;
4034 }
4035
4036 /*
4037   set the lmaster role for a node
4038  */
4039 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4040 {
4041         int ret;
4042         TDB_DATA data;
4043         int32_t res;
4044
4045         data.dsize = sizeof(lmasterrole);
4046         data.dptr  = (uint8_t *)&lmasterrole;
4047
4048         ret = ctdb_control(ctdb, destnode, 0, 
4049                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4050                            NULL, NULL, &res, &timeout, NULL);
4051         if (ret != 0 || res != 0) {
4052                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4053                 return -1;
4054         }
4055
4056         return 0;
4057 }
4058
4059 /*
4060   set the recmaster role for a node
4061  */
4062 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4063 {
4064         int ret;
4065         TDB_DATA data;
4066         int32_t res;
4067
4068         data.dsize = sizeof(recmasterrole);
4069         data.dptr  = (uint8_t *)&recmasterrole;
4070
4071         ret = ctdb_control(ctdb, destnode, 0, 
4072                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4073                            NULL, NULL, &res, &timeout, NULL);
4074         if (ret != 0 || res != 0) {
4075                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4076                 return -1;
4077         }
4078
4079         return 0;
4080 }
4081
4082 /* enable an eventscript
4083  */
4084 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4085 {
4086         int ret;
4087         TDB_DATA data;
4088         int32_t res;
4089
4090         data.dsize = strlen(script) + 1;
4091         data.dptr  = discard_const(script);
4092
4093         ret = ctdb_control(ctdb, destnode, 0, 
4094                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4095                            NULL, NULL, &res, &timeout, NULL);
4096         if (ret != 0 || res != 0) {
4097                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4098                 return -1;
4099         }
4100
4101         return 0;
4102 }
4103
4104 /* disable an eventscript
4105  */
4106 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4107 {
4108         int ret;
4109         TDB_DATA data;
4110         int32_t res;
4111
4112         data.dsize = strlen(script) + 1;
4113         data.dptr  = discard_const(script);
4114
4115         ret = ctdb_control(ctdb, destnode, 0, 
4116                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4117                            NULL, NULL, &res, &timeout, NULL);
4118         if (ret != 0 || res != 0) {
4119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4120                 return -1;
4121         }
4122
4123         return 0;
4124 }
4125
4126
4127 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4128 {
4129         int ret;
4130         TDB_DATA data;
4131         int32_t res;
4132
4133         data.dsize = sizeof(*bantime);
4134         data.dptr  = (uint8_t *)bantime;
4135
4136         ret = ctdb_control(ctdb, destnode, 0, 
4137                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4138                            NULL, NULL, &res, &timeout, NULL);
4139         if (ret != 0 || res != 0) {
4140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4141                 return -1;
4142         }
4143
4144         return 0;
4145 }
4146
4147
4148 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4149 {
4150         int ret;
4151         TDB_DATA outdata;
4152         int32_t res;
4153         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4154
4155         ret = ctdb_control(ctdb, destnode, 0, 
4156                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4157                            tmp_ctx, &outdata, &res, &timeout, NULL);
4158         if (ret != 0 || res != 0) {
4159                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4160                 talloc_free(tmp_ctx);
4161                 return -1;
4162         }
4163
4164         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4165         talloc_free(tmp_ctx);
4166
4167         return 0;
4168 }
4169
4170
4171 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4172 {
4173         int ret;
4174         int32_t res;
4175         TDB_DATA data;
4176         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4177
4178         data.dptr = (uint8_t*)db_prio;
4179         data.dsize = sizeof(*db_prio);
4180
4181         ret = ctdb_control(ctdb, destnode, 0, 
4182                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4183                            tmp_ctx, NULL, &res, &timeout, NULL);
4184         if (ret != 0 || res != 0) {
4185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4186                 talloc_free(tmp_ctx);
4187                 return -1;
4188         }
4189
4190         talloc_free(tmp_ctx);
4191
4192         return 0;
4193 }
4194
4195 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4196 {
4197         int ret;
4198         int32_t res;
4199         TDB_DATA data;
4200         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4201
4202         data.dptr = (uint8_t*)&db_id;
4203         data.dsize = sizeof(db_id);
4204
4205         ret = ctdb_control(ctdb, destnode, 0, 
4206                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4207                            tmp_ctx, NULL, &res, &timeout, NULL);
4208         if (ret != 0 || res < 0) {
4209                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4210                 talloc_free(tmp_ctx);
4211                 return -1;
4212         }
4213
4214         if (priority) {
4215                 *priority = res;
4216         }
4217
4218         talloc_free(tmp_ctx);
4219
4220         return 0;
4221 }