ctdb_client: fix DEBUG statement in ctdb_ctrl_modflags()
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 /*
34   allocate a packet for use in client<->daemon communication
35  */
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37                                             TALLOC_CTX *mem_ctx, 
38                                             enum ctdb_operation operation, 
39                                             size_t length, size_t slength,
40                                             const char *type)
41 {
42         int size;
43         struct ctdb_req_header *hdr;
44
45         length = MAX(length, slength);
46         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47
48         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49         if (hdr == NULL) {
50                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51                          operation, (unsigned)length));
52                 return NULL;
53         }
54         talloc_set_name_const(hdr, type);
55         memset(hdr, 0, slength);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90
91         for (fn=ctdb_db->calls;fn;fn=fn->next) {
92                 if (fn->id == call->call_id) break;
93         }
94         if (fn == NULL) {
95                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
96                 talloc_free(c);
97                 return -1;
98         }
99
100         if (fn->fn(c) != 0) {
101                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
102                 talloc_free(c);
103                 return -1;
104         }
105
106         if (header->laccessor != caller) {
107                 header->lacount = 0;
108         }
109         header->laccessor = caller;
110         header->lacount++;
111
112         /* we need to force the record to be written out if this was a remote access,
113            so that the lacount is updated */
114         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115                 c->new_data = &c->record_data;
116         }
117
118         if (c->new_data) {
119                 /* XXX check that we always have the lock here? */
120                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122                         talloc_free(c);
123                         return -1;
124                 }
125         }
126
127         if (c->reply_data) {
128                 call->reply_data = *c->reply_data;
129
130                 talloc_steal(call, call->reply_data.dptr);
131                 talloc_set_name_const(call->reply_data.dptr, __location__);
132         } else {
133                 call->reply_data.dptr = NULL;
134                 call->reply_data.dsize = 0;
135         }
136         call->status = c->status;
137
138         talloc_free(c);
139
140         return 0;
141 }
142
143
144 /*
145   queue a packet for sending from client to daemon
146 */
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 {
149         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 }
151
152
153 /*
154   called when a CTDB_REPLY_CALL packet comes in in the client
155
156   This packet comes in response to a CTDB_REQ_CALL request packet. It
157   contains any reply data from the call
158 */
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 {
161         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162         struct ctdb_client_call_state *state;
163
164         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165         if (state == NULL) {
166                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
167                 return;
168         }
169
170         if (hdr->reqid != state->reqid) {
171                 /* we found a record  but it was the wrong one */
172                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
173                 return;
174         }
175
176         state->call->reply_data.dptr = c->data;
177         state->call->reply_data.dsize = c->datalen;
178         state->call->status = c->status;
179
180         talloc_steal(state, c);
181
182         state->state = CTDB_CALL_DONE;
183
184         if (state->async.fn) {
185                 state->async.fn(state);
186         }
187 }
188
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
190
191 /*
192   this is called in the client, when data comes in from the daemon
193  */
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 {
196         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
198         TALLOC_CTX *tmp_ctx;
199
200         /* place the packet as a child of a tmp_ctx. We then use
201            talloc_free() below to free it. If any of the calls want
202            to keep it, then they will steal it somewhere else, and the
203            talloc_free() will be a no-op */
204         tmp_ctx = talloc_new(ctdb);
205         talloc_steal(tmp_ctx, hdr);
206
207         if (cnt == 0) {
208                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
209                 exit(0);
210         }
211
212         if (cnt < sizeof(*hdr)) {
213                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
214                 goto done;
215         }
216         if (cnt != hdr->length) {
217                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
218                                (unsigned)hdr->length, (unsigned)cnt);
219                 goto done;
220         }
221
222         if (hdr->ctdb_magic != CTDB_MAGIC) {
223                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
224                 goto done;
225         }
226
227         if (hdr->ctdb_version != CTDB_VERSION) {
228                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
229                 goto done;
230         }
231
232         switch (hdr->operation) {
233         case CTDB_REPLY_CALL:
234                 ctdb_client_reply_call(ctdb, hdr);
235                 break;
236
237         case CTDB_REQ_MESSAGE:
238                 ctdb_request_message(ctdb, hdr);
239                 break;
240
241         case CTDB_REPLY_CONTROL:
242                 ctdb_client_reply_control(ctdb, hdr);
243                 break;
244
245         default:
246                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
247         }
248
249 done:
250         talloc_free(tmp_ctx);
251 }
252
253 /*
254   connect to a unix domain socket
255 */
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 {
258         struct sockaddr_un addr;
259
260         memset(&addr, 0, sizeof(addr));
261         addr.sun_family = AF_UNIX;
262         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263
264         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265         if (ctdb->daemon.sd == -1) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
267                 return -1;
268         }
269
270         set_nonblocking(ctdb->daemon.sd);
271         set_close_on_exec(ctdb->daemon.sd);
272         
273         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
274                 close(ctdb->daemon.sd);
275                 ctdb->daemon.sd = -1;
276                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
277                 return -1;
278         }
279
280         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
281                                               CTDB_DS_ALIGNMENT, 
282                                               ctdb_client_read_cb, ctdb);
283         return 0;
284 }
285
286
287 struct ctdb_record_handle {
288         struct ctdb_db_context *ctdb_db;
289         TDB_DATA key;
290         TDB_DATA *data;
291         struct ctdb_ltdb_header header;
292 };
293
294
295 /*
296   make a recv call to the local ctdb daemon - called from client context
297
298   This is called when the program wants to wait for a ctdb_call to complete and get the 
299   results. This call will block unless the call has already completed.
300 */
301 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
302 {
303         if (state == NULL) {
304                 return -1;
305         }
306
307         while (state->state < CTDB_CALL_DONE) {
308                 event_loop_once(state->ctdb_db->ctdb->ev);
309         }
310         if (state->state != CTDB_CALL_DONE) {
311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
312                 talloc_free(state);
313                 return -1;
314         }
315
316         if (state->call->reply_data.dsize) {
317                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
318                                                       state->call->reply_data.dptr,
319                                                       state->call->reply_data.dsize);
320                 call->reply_data.dsize = state->call->reply_data.dsize;
321         } else {
322                 call->reply_data.dptr = NULL;
323                 call->reply_data.dsize = 0;
324         }
325         call->status = state->call->status;
326         talloc_free(state);
327
328         return 0;
329 }
330
331
332
333
334 /*
335   destroy a ctdb_call in client
336 */
337 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
338 {
339         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
340         return 0;
341 }
342
343 /*
344   construct an event driven local ctdb_call
345
346   this is used so that locally processed ctdb_call requests are processed
347   in an event driven manner
348 */
349 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
350                                                                   struct ctdb_call *call,
351                                                                   struct ctdb_ltdb_header *header,
352                                                                   TDB_DATA *data)
353 {
354         struct ctdb_client_call_state *state;
355         struct ctdb_context *ctdb = ctdb_db->ctdb;
356         int ret;
357
358         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
359         CTDB_NO_MEMORY_NULL(ctdb, state);
360         state->call = talloc_zero(state, struct ctdb_call);
361         CTDB_NO_MEMORY_NULL(ctdb, state->call);
362
363         talloc_steal(state, data->dptr);
364
365         state->state   = CTDB_CALL_DONE;
366         *(state->call) = *call;
367         state->ctdb_db = ctdb_db;
368
369         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
370
371         return state;
372 }
373
374 /*
375   make a ctdb call to the local daemon - async send. Called from client context.
376
377   This constructs a ctdb_call request and queues it for processing. 
378   This call never blocks.
379 */
380 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
381                                               struct ctdb_call *call)
382 {
383         struct ctdb_client_call_state *state;
384         struct ctdb_context *ctdb = ctdb_db->ctdb;
385         struct ctdb_ltdb_header header;
386         TDB_DATA data;
387         int ret;
388         size_t len;
389         struct ctdb_req_call *c;
390
391         /* if the domain socket is not yet open, open it */
392         if (ctdb->daemon.sd==-1) {
393                 ctdb_socket_connect(ctdb);
394         }
395
396         ret = ctdb_ltdb_lock(ctdb_db, call->key);
397         if (ret != 0) {
398                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
399                 return NULL;
400         }
401
402         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
403
404         if (ret == 0 && header.dmaster == ctdb->pnn) {
405                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
406                 talloc_free(data.dptr);
407                 ctdb_ltdb_unlock(ctdb_db, call->key);
408                 return state;
409         }
410
411         ctdb_ltdb_unlock(ctdb_db, call->key);
412         talloc_free(data.dptr);
413
414         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
415         if (state == NULL) {
416                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
417                 return NULL;
418         }
419         state->call = talloc_zero(state, struct ctdb_call);
420         if (state->call == NULL) {
421                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
422                 return NULL;
423         }
424
425         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
426         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
427         if (c == NULL) {
428                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
429                 return NULL;
430         }
431
432         state->reqid     = ctdb_reqid_new(ctdb, state);
433         state->ctdb_db = ctdb_db;
434         talloc_set_destructor(state, ctdb_client_call_destructor);
435
436         c->hdr.reqid     = state->reqid;
437         c->flags         = call->flags;
438         c->db_id         = ctdb_db->db_id;
439         c->callid        = call->call_id;
440         c->hopcount      = 0;
441         c->keylen        = call->key.dsize;
442         c->calldatalen   = call->call_data.dsize;
443         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
444         memcpy(&c->data[call->key.dsize], 
445                call->call_data.dptr, call->call_data.dsize);
446         *(state->call)              = *call;
447         state->call->call_data.dptr = &c->data[call->key.dsize];
448         state->call->key.dptr       = &c->data[0];
449
450         state->state  = CTDB_CALL_WAIT;
451
452
453         ctdb_client_queue_pkt(ctdb, &c->hdr);
454
455         return state;
456 }
457
458
459 /*
460   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
461 */
462 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
463 {
464         struct ctdb_client_call_state *state;
465
466         state = ctdb_call_send(ctdb_db, call);
467         return ctdb_call_recv(state, call);
468 }
469
470
471 /*
472   tell the daemon what messaging srvid we will use, and register the message
473   handler function in the client
474 */
475 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
476                              ctdb_message_fn_t handler,
477                              void *private_data)
478                                     
479 {
480         int res;
481         int32_t status;
482         
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501         
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514
515 /*
516   send a message - from client context
517  */
518 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
519                       uint64_t srvid, TDB_DATA data)
520 {
521         struct ctdb_req_message *r;
522         int len, res;
523
524         len = offsetof(struct ctdb_req_message, data) + data.dsize;
525         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
526                                len, struct ctdb_req_message);
527         CTDB_NO_MEMORY(ctdb, r);
528
529         r->hdr.destnode  = pnn;
530         r->srvid         = srvid;
531         r->datalen       = data.dsize;
532         memcpy(&r->data[0], data.dptr, data.dsize);
533         
534         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
535         if (res != 0) {
536                 return res;
537         }
538
539         talloc_free(r);
540         return 0;
541 }
542
543
544 /*
545   cancel a ctdb_fetch_lock operation, releasing the lock
546  */
547 static int fetch_lock_destructor(struct ctdb_record_handle *h)
548 {
549         ctdb_ltdb_unlock(h->ctdb_db, h->key);
550         return 0;
551 }
552
553 /*
554   force the migration of a record to this node
555  */
556 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
557 {
558         struct ctdb_call call;
559         ZERO_STRUCT(call);
560         call.call_id = CTDB_NULL_FUNC;
561         call.key = key;
562         call.flags = CTDB_IMMEDIATE_MIGRATION;
563         return ctdb_call(ctdb_db, &call);
564 }
565
566 /*
567   get a lock on a record, and return the records data. Blocks until it gets the lock
568  */
569 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
570                                            TDB_DATA key, TDB_DATA *data)
571 {
572         int ret;
573         struct ctdb_record_handle *h;
574
575         /*
576           procedure is as follows:
577
578           1) get the chain lock. 
579           2) check if we are dmaster
580           3) if we are the dmaster then return handle 
581           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
582              reply from ctdbd
583           5) when we get the reply, goto (1)
584          */
585
586         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
587         if (h == NULL) {
588                 return NULL;
589         }
590
591         h->ctdb_db = ctdb_db;
592         h->key     = key;
593         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
594         if (h->key.dptr == NULL) {
595                 talloc_free(h);
596                 return NULL;
597         }
598         h->data    = data;
599
600         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
601                  (const char *)key.dptr));
602
603 again:
604         /* step 1 - get the chain lock */
605         ret = ctdb_ltdb_lock(ctdb_db, key);
606         if (ret != 0) {
607                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
608                 talloc_free(h);
609                 return NULL;
610         }
611
612         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
613
614         talloc_set_destructor(h, fetch_lock_destructor);
615
616         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
617
618         /* when torturing, ensure we test the remote path */
619         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
620             random() % 5 == 0) {
621                 h->header.dmaster = (uint32_t)-1;
622         }
623
624
625         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
626
627         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
628                 ctdb_ltdb_unlock(ctdb_db, key);
629                 ret = ctdb_client_force_migration(ctdb_db, key);
630                 if (ret != 0) {
631                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
632                         talloc_free(h);
633                         return NULL;
634                 }
635                 goto again;
636         }
637
638         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
639         return h;
640 }
641
642 /*
643   store some data to the record that was locked with ctdb_fetch_lock()
644 */
645 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
646 {
647         if (h->ctdb_db->persistent) {
648                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
649                 return -1;
650         }
651
652         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
653 }
654
655 /*
656   non-locking fetch of a record
657  */
658 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
659                TDB_DATA key, TDB_DATA *data)
660 {
661         struct ctdb_call call;
662         int ret;
663
664         call.call_id = CTDB_FETCH_FUNC;
665         call.call_data.dptr = NULL;
666         call.call_data.dsize = 0;
667
668         ret = ctdb_call(ctdb_db, &call);
669
670         if (ret == 0) {
671                 *data = call.reply_data;
672                 talloc_steal(mem_ctx, data->dptr);
673         }
674
675         return ret;
676 }
677
678
679
680 /*
681    called when a control completes or timesout to invoke the callback
682    function the user provided
683 */
684 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
685         struct timeval t, void *private_data)
686 {
687         struct ctdb_client_control_state *state;
688         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
689         int ret;
690
691         state = talloc_get_type(private_data, struct ctdb_client_control_state);
692         talloc_steal(tmp_ctx, state);
693
694         ret = ctdb_control_recv(state->ctdb, state, state,
695                         NULL, 
696                         NULL, 
697                         NULL);
698
699         talloc_free(tmp_ctx);
700 }
701
702 /*
703   called when a CTDB_REPLY_CONTROL packet comes in in the client
704
705   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
706   contains any reply data from the control
707 */
708 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
709                                       struct ctdb_req_header *hdr)
710 {
711         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
712         struct ctdb_client_control_state *state;
713
714         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
715         if (state == NULL) {
716                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
717                 return;
718         }
719
720         if (hdr->reqid != state->reqid) {
721                 /* we found a record  but it was the wrong one */
722                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
723                 return;
724         }
725
726         state->outdata.dptr = c->data;
727         state->outdata.dsize = c->datalen;
728         state->status = c->status;
729         if (c->errorlen) {
730                 state->errormsg = talloc_strndup(state, 
731                                                  (char *)&c->data[c->datalen], 
732                                                  c->errorlen);
733         }
734
735         /* state->outdata now uses resources from c so we dont want c
736            to just dissappear from under us while state is still alive
737         */
738         talloc_steal(state, c);
739
740         state->state = CTDB_CONTROL_DONE;
741
742         /* if we had a callback registered for this control, pull the response
743            and call the callback.
744         */
745         if (state->async.fn) {
746                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
747         }
748 }
749
750
751 /*
752   destroy a ctdb_control in client
753 */
754 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
755 {
756         ctdb_reqid_remove(state->ctdb, state->reqid);
757         return 0;
758 }
759
760
761 /* time out handler for ctdb_control */
762 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
763         struct timeval t, void *private_data)
764 {
765         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
766
767         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
768
769         state->state = CTDB_CONTROL_TIMEOUT;
770
771         /* if we had a callback registered for this control, pull the response
772            and call the callback.
773         */
774         if (state->async.fn) {
775                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
776         }
777 }
778
779 /* async version of send control request */
780 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
781                 uint32_t destnode, uint64_t srvid, 
782                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
783                 TALLOC_CTX *mem_ctx,
784                 struct timeval *timeout,
785                 char **errormsg)
786 {
787         struct ctdb_client_control_state *state;
788         size_t len;
789         struct ctdb_req_control *c;
790         int ret;
791
792         if (errormsg) {
793                 *errormsg = NULL;
794         }
795
796         /* if the domain socket is not yet open, open it */
797         if (ctdb->daemon.sd==-1) {
798                 ctdb_socket_connect(ctdb);
799         }
800
801         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
802         CTDB_NO_MEMORY_NULL(ctdb, state);
803
804         state->ctdb       = ctdb;
805         state->reqid      = ctdb_reqid_new(ctdb, state);
806         state->state      = CTDB_CONTROL_WAIT;
807         state->errormsg   = NULL;
808
809         talloc_set_destructor(state, ctdb_control_destructor);
810
811         len = offsetof(struct ctdb_req_control, data) + data.dsize;
812         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
813                                len, struct ctdb_req_control);
814         state->c            = c;        
815         CTDB_NO_MEMORY_NULL(ctdb, c);
816         c->hdr.reqid        = state->reqid;
817         c->hdr.destnode     = destnode;
818         c->opcode           = opcode;
819         c->client_id        = 0;
820         c->flags            = flags;
821         c->srvid            = srvid;
822         c->datalen          = data.dsize;
823         if (data.dsize) {
824                 memcpy(&c->data[0], data.dptr, data.dsize);
825         }
826
827         /* timeout */
828         if (timeout && !timeval_is_zero(timeout)) {
829                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
830         }
831
832         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
833         if (ret != 0) {
834                 talloc_free(state);
835                 return NULL;
836         }
837
838         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
839                 talloc_free(state);
840                 return NULL;
841         }
842
843         return state;
844 }
845
846
847 /* async version of receive control reply */
848 int ctdb_control_recv(struct ctdb_context *ctdb, 
849                 struct ctdb_client_control_state *state, 
850                 TALLOC_CTX *mem_ctx,
851                 TDB_DATA *outdata, int32_t *status, char **errormsg)
852 {
853         TALLOC_CTX *tmp_ctx;
854
855         if (status != NULL) {
856                 *status = -1;
857         }
858         if (errormsg != NULL) {
859                 *errormsg = NULL;
860         }
861
862         if (state == NULL) {
863                 return -1;
864         }
865
866         /* prevent double free of state */
867         tmp_ctx = talloc_new(ctdb);
868         talloc_steal(tmp_ctx, state);
869
870         /* loop one event at a time until we either timeout or the control
871            completes.
872         */
873         while (state->state == CTDB_CONTROL_WAIT) {
874                 event_loop_once(ctdb->ev);
875         }
876
877         if (state->state != CTDB_CONTROL_DONE) {
878                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
879                 if (state->async.fn) {
880                         state->async.fn(state);
881                 }
882                 talloc_free(tmp_ctx);
883                 return -1;
884         }
885
886         if (state->errormsg) {
887                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
888                 if (errormsg) {
889                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
890                 }
891                 if (state->async.fn) {
892                         state->async.fn(state);
893                 }
894                 talloc_free(tmp_ctx);
895                 return -1;
896         }
897
898         if (outdata) {
899                 *outdata = state->outdata;
900                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
901         }
902
903         if (status) {
904                 *status = state->status;
905         }
906
907         if (state->async.fn) {
908                 state->async.fn(state);
909         }
910
911         talloc_free(tmp_ctx);
912         return 0;
913 }
914
915
916
917 /*
918   send a ctdb control message
919   timeout specifies how long we should wait for a reply.
920   if timeout is NULL we wait indefinitely
921  */
922 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
923                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
924                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
925                  struct timeval *timeout,
926                  char **errormsg)
927 {
928         struct ctdb_client_control_state *state;
929
930         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
931                         flags, data, mem_ctx,
932                         timeout, errormsg);
933         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
934                         errormsg);
935 }
936
937
938
939
940 /*
941   a process exists call. Returns 0 if process exists, -1 otherwise
942  */
943 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
944 {
945         int ret;
946         TDB_DATA data;
947         int32_t status;
948
949         data.dptr = (uint8_t*)&pid;
950         data.dsize = sizeof(pid);
951
952         ret = ctdb_control(ctdb, destnode, 0, 
953                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
954                            NULL, NULL, &status, NULL, NULL);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
957                 return -1;
958         }
959
960         return status;
961 }
962
963 /*
964   get remote statistics
965  */
966 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
967 {
968         int ret;
969         TDB_DATA data;
970         int32_t res;
971
972         ret = ctdb_control(ctdb, destnode, 0, 
973                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
974                            ctdb, &data, &res, NULL, NULL);
975         if (ret != 0 || res != 0) {
976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
977                 return -1;
978         }
979
980         if (data.dsize != sizeof(struct ctdb_statistics)) {
981                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
982                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
983                       return -1;
984         }
985
986         *status = *(struct ctdb_statistics *)data.dptr;
987         talloc_free(data.dptr);
988                         
989         return 0;
990 }
991
992 /*
993   shutdown a remote ctdb node
994  */
995 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
996 {
997         struct ctdb_client_control_state *state;
998
999         state = ctdb_control_send(ctdb, destnode, 0, 
1000                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1001                            NULL, &timeout, NULL);
1002         if (state == NULL) {
1003                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1004                 return -1;
1005         }
1006
1007         return 0;
1008 }
1009
1010 /*
1011   get vnn map from a remote node
1012  */
1013 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1014 {
1015         int ret;
1016         TDB_DATA outdata;
1017         int32_t res;
1018         struct ctdb_vnn_map_wire *map;
1019
1020         ret = ctdb_control(ctdb, destnode, 0, 
1021                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1022                            mem_ctx, &outdata, &res, &timeout, NULL);
1023         if (ret != 0 || res != 0) {
1024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1025                 return -1;
1026         }
1027         
1028         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1029         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1030             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1031                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1032                 return -1;
1033         }
1034
1035         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1036         CTDB_NO_MEMORY(ctdb, *vnnmap);
1037         (*vnnmap)->generation = map->generation;
1038         (*vnnmap)->size       = map->size;
1039         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1040
1041         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1042         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1043         talloc_free(outdata.dptr);
1044                     
1045         return 0;
1046 }
1047
1048
1049 /*
1050   get the recovery mode of a remote node
1051  */
1052 struct ctdb_client_control_state *
1053 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1054 {
1055         return ctdb_control_send(ctdb, destnode, 0, 
1056                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1057                            mem_ctx, &timeout, NULL);
1058 }
1059
1060 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1061 {
1062         int ret;
1063         int32_t res;
1064
1065         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1066         if (ret != 0) {
1067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1068                 return -1;
1069         }
1070
1071         if (recmode) {
1072                 *recmode = (uint32_t)res;
1073         }
1074
1075         return 0;
1076 }
1077
1078 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1079 {
1080         struct ctdb_client_control_state *state;
1081
1082         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1083         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1084 }
1085
1086
1087
1088
1089 /*
1090   set the recovery mode of a remote node
1091  */
1092 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1093 {
1094         int ret;
1095         TDB_DATA data;
1096         int32_t res;
1097
1098         data.dsize = sizeof(uint32_t);
1099         data.dptr = (unsigned char *)&recmode;
1100
1101         ret = ctdb_control(ctdb, destnode, 0, 
1102                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1103                            NULL, NULL, &res, &timeout, NULL);
1104         if (ret != 0 || res != 0) {
1105                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1106                 return -1;
1107         }
1108
1109         return 0;
1110 }
1111
1112
1113
1114 /*
1115   get the recovery master of a remote node
1116  */
1117 struct ctdb_client_control_state *
1118 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1119                         struct timeval timeout, uint32_t destnode)
1120 {
1121         return ctdb_control_send(ctdb, destnode, 0, 
1122                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1123                            mem_ctx, &timeout, NULL);
1124 }
1125
1126 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1127 {
1128         int ret;
1129         int32_t res;
1130
1131         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1132         if (ret != 0) {
1133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1134                 return -1;
1135         }
1136
1137         if (recmaster) {
1138                 *recmaster = (uint32_t)res;
1139         }
1140
1141         return 0;
1142 }
1143
1144 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1145 {
1146         struct ctdb_client_control_state *state;
1147
1148         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1149         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1150 }
1151
1152
1153 /*
1154   set the recovery master of a remote node
1155  */
1156 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1157 {
1158         int ret;
1159         TDB_DATA data;
1160         int32_t res;
1161
1162         ZERO_STRUCT(data);
1163         data.dsize = sizeof(uint32_t);
1164         data.dptr = (unsigned char *)&recmaster;
1165
1166         ret = ctdb_control(ctdb, destnode, 0, 
1167                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1168                            NULL, NULL, &res, &timeout, NULL);
1169         if (ret != 0 || res != 0) {
1170                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1171                 return -1;
1172         }
1173
1174         return 0;
1175 }
1176
1177
1178 /*
1179   get a list of databases off a remote node
1180  */
1181 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1182                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1183 {
1184         int ret;
1185         TDB_DATA outdata;
1186         int32_t res;
1187
1188         ret = ctdb_control(ctdb, destnode, 0, 
1189                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1190                            mem_ctx, &outdata, &res, &timeout, NULL);
1191         if (ret != 0 || res != 0) {
1192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1193                 return -1;
1194         }
1195
1196         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1197         talloc_free(outdata.dptr);
1198                     
1199         return 0;
1200 }
1201
1202 /*
1203   get a list of nodes (vnn and flags ) from a remote node
1204  */
1205 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1206                 struct timeval timeout, uint32_t destnode, 
1207                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1208 {
1209         int ret;
1210         TDB_DATA outdata;
1211         int32_t res;
1212
1213         ret = ctdb_control(ctdb, destnode, 0, 
1214                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1215                            mem_ctx, &outdata, &res, &timeout, NULL);
1216         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1218                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1219         }
1220         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1222                 return -1;
1223         }
1224
1225         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1226         talloc_free(outdata.dptr);
1227                     
1228         return 0;
1229 }
1230
1231 /*
1232   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1233  */
1234 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1235                 struct timeval timeout, uint32_t destnode, 
1236                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1237 {
1238         int ret, i, len;
1239         TDB_DATA outdata;
1240         struct ctdb_node_mapv4 *nodemapv4;
1241         int32_t res;
1242
1243         ret = ctdb_control(ctdb, destnode, 0, 
1244                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1245                            mem_ctx, &outdata, &res, &timeout, NULL);
1246         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1247                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1248                 return -1;
1249         }
1250
1251         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1252
1253         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1254         (*nodemap) = talloc_zero_size(mem_ctx, len);
1255         CTDB_NO_MEMORY(ctdb, (*nodemap));
1256
1257         (*nodemap)->num = nodemapv4->num;
1258         for (i=0; i<nodemapv4->num; i++) {
1259                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1260                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1261                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1262                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1263         }
1264                 
1265         talloc_free(outdata.dptr);
1266                     
1267         return 0;
1268 }
1269
1270 /*
1271   drop the transport, reload the nodes file and restart the transport
1272  */
1273 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1274                     struct timeval timeout, uint32_t destnode)
1275 {
1276         int ret;
1277         int32_t res;
1278
1279         ret = ctdb_control(ctdb, destnode, 0, 
1280                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1281                            NULL, NULL, &res, &timeout, NULL);
1282         if (ret != 0 || res != 0) {
1283                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1284                 return -1;
1285         }
1286
1287         return 0;
1288 }
1289
1290
1291 /*
1292   set vnn map on a node
1293  */
1294 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1295                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1296 {
1297         int ret;
1298         TDB_DATA data;
1299         int32_t res;
1300         struct ctdb_vnn_map_wire *map;
1301         size_t len;
1302
1303         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1304         map = talloc_size(mem_ctx, len);
1305         CTDB_NO_MEMORY(ctdb, map);
1306
1307         map->generation = vnnmap->generation;
1308         map->size = vnnmap->size;
1309         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1310         
1311         data.dsize = len;
1312         data.dptr  = (uint8_t *)map;
1313
1314         ret = ctdb_control(ctdb, destnode, 0, 
1315                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1316                            NULL, NULL, &res, &timeout, NULL);
1317         if (ret != 0 || res != 0) {
1318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1319                 return -1;
1320         }
1321
1322         talloc_free(map);
1323
1324         return 0;
1325 }
1326
1327
1328 /*
1329   async send for pull database
1330  */
1331 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1332         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1333         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1334 {
1335         TDB_DATA indata;
1336         struct ctdb_control_pulldb *pull;
1337         struct ctdb_client_control_state *state;
1338
1339         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1340         CTDB_NO_MEMORY_NULL(ctdb, pull);
1341
1342         pull->db_id   = dbid;
1343         pull->lmaster = lmaster;
1344
1345         indata.dsize = sizeof(struct ctdb_control_pulldb);
1346         indata.dptr  = (unsigned char *)pull;
1347
1348         state = ctdb_control_send(ctdb, destnode, 0, 
1349                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1350                                   mem_ctx, &timeout, NULL);
1351         talloc_free(pull);
1352
1353         return state;
1354 }
1355
1356 /*
1357   async recv for pull database
1358  */
1359 int ctdb_ctrl_pulldb_recv(
1360         struct ctdb_context *ctdb, 
1361         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1362         TDB_DATA *outdata)
1363 {
1364         int ret;
1365         int32_t res;
1366
1367         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1368         if ( (ret != 0) || (res != 0) ){
1369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1370                 return -1;
1371         }
1372
1373         return 0;
1374 }
1375
1376 /*
1377   pull all keys and records for a specific database on a node
1378  */
1379 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1380                 uint32_t dbid, uint32_t lmaster, 
1381                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1382                 TDB_DATA *outdata)
1383 {
1384         struct ctdb_client_control_state *state;
1385
1386         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1387                                       timeout);
1388         
1389         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1390 }
1391
1392
1393 /*
1394   change dmaster for all keys in the database to the new value
1395  */
1396 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1397                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1398 {
1399         int ret;
1400         TDB_DATA indata;
1401         int32_t res;
1402
1403         indata.dsize = 2*sizeof(uint32_t);
1404         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1405
1406         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1407         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1408
1409         ret = ctdb_control(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1411                            NULL, NULL, &res, &timeout, NULL);
1412         if (ret != 0 || res != 0) {
1413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1414                 return -1;
1415         }
1416
1417         return 0;
1418 }
1419
1420 /*
1421   ping a node, return number of clients connected
1422  */
1423 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1424 {
1425         int ret;
1426         int32_t res;
1427
1428         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1429                            tdb_null, NULL, NULL, &res, NULL, NULL);
1430         if (ret != 0) {
1431                 return -1;
1432         }
1433         return res;
1434 }
1435
1436 /*
1437   find the real path to a ltdb 
1438  */
1439 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1440                    const char **path)
1441 {
1442         int ret;
1443         int32_t res;
1444         TDB_DATA data;
1445
1446         data.dptr = (uint8_t *)&dbid;
1447         data.dsize = sizeof(dbid);
1448
1449         ret = ctdb_control(ctdb, destnode, 0, 
1450                            CTDB_CONTROL_GETDBPATH, 0, data, 
1451                            mem_ctx, &data, &res, &timeout, NULL);
1452         if (ret != 0 || res != 0) {
1453                 return -1;
1454         }
1455
1456         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1457         if ((*path) == NULL) {
1458                 return -1;
1459         }
1460
1461         talloc_free(data.dptr);
1462
1463         return 0;
1464 }
1465
1466 /*
1467   find the name of a db 
1468  */
1469 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1470                    const char **name)
1471 {
1472         int ret;
1473         int32_t res;
1474         TDB_DATA data;
1475
1476         data.dptr = (uint8_t *)&dbid;
1477         data.dsize = sizeof(dbid);
1478
1479         ret = ctdb_control(ctdb, destnode, 0, 
1480                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1481                            mem_ctx, &data, &res, &timeout, NULL);
1482         if (ret != 0 || res != 0) {
1483                 return -1;
1484         }
1485
1486         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1487         if ((*name) == NULL) {
1488                 return -1;
1489         }
1490
1491         talloc_free(data.dptr);
1492
1493         return 0;
1494 }
1495
1496 /*
1497   create a database
1498  */
1499 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1500                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1501 {
1502         int ret;
1503         int32_t res;
1504         TDB_DATA data;
1505
1506         data.dptr = discard_const(name);
1507         data.dsize = strlen(name)+1;
1508
1509         ret = ctdb_control(ctdb, destnode, 0, 
1510                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1511                            0, data, 
1512                            mem_ctx, &data, &res, &timeout, NULL);
1513
1514         if (ret != 0 || res != 0) {
1515                 return -1;
1516         }
1517
1518         return 0;
1519 }
1520
1521 /*
1522   get debug level on a node
1523  */
1524 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1525 {
1526         int ret;
1527         int32_t res;
1528         TDB_DATA data;
1529
1530         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1531                            ctdb, &data, &res, NULL, NULL);
1532         if (ret != 0 || res != 0) {
1533                 return -1;
1534         }
1535         if (data.dsize != sizeof(int32_t)) {
1536                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1537                          (unsigned)data.dsize));
1538                 return -1;
1539         }
1540         *level = *(int32_t *)data.dptr;
1541         talloc_free(data.dptr);
1542         return 0;
1543 }
1544
1545 /*
1546   set debug level on a node
1547  */
1548 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1549 {
1550         int ret;
1551         int32_t res;
1552         TDB_DATA data;
1553
1554         data.dptr = (uint8_t *)&level;
1555         data.dsize = sizeof(level);
1556
1557         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1558                            NULL, NULL, &res, NULL, NULL);
1559         if (ret != 0 || res != 0) {
1560                 return -1;
1561         }
1562         return 0;
1563 }
1564
1565
1566 /*
1567   get a list of connected nodes
1568  */
1569 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1570                                 struct timeval timeout,
1571                                 TALLOC_CTX *mem_ctx,
1572                                 uint32_t *num_nodes)
1573 {
1574         struct ctdb_node_map *map=NULL;
1575         int ret, i;
1576         uint32_t *nodes;
1577
1578         *num_nodes = 0;
1579
1580         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1581         if (ret != 0) {
1582                 return NULL;
1583         }
1584
1585         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1586         if (nodes == NULL) {
1587                 return NULL;
1588         }
1589
1590         for (i=0;i<map->num;i++) {
1591                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1592                         nodes[*num_nodes] = map->nodes[i].pnn;
1593                         (*num_nodes)++;
1594                 }
1595         }
1596
1597         return nodes;
1598 }
1599
1600
1601 /*
1602   reset remote status
1603  */
1604 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1605 {
1606         int ret;
1607         int32_t res;
1608
1609         ret = ctdb_control(ctdb, destnode, 0, 
1610                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1611                            NULL, NULL, &res, NULL, NULL);
1612         if (ret != 0 || res != 0) {
1613                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1614                 return -1;
1615         }
1616         return 0;
1617 }
1618
1619 /*
1620   this is the dummy null procedure that all databases support
1621 */
1622 static int ctdb_null_func(struct ctdb_call_info *call)
1623 {
1624         return 0;
1625 }
1626
1627 /*
1628   this is a plain fetch procedure that all databases support
1629 */
1630 static int ctdb_fetch_func(struct ctdb_call_info *call)
1631 {
1632         call->reply_data = &call->record_data;
1633         return 0;
1634 }
1635
1636 /*
1637   attach to a specific database - client call
1638 */
1639 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1640 {
1641         struct ctdb_db_context *ctdb_db;
1642         TDB_DATA data;
1643         int ret;
1644         int32_t res;
1645
1646         ctdb_db = ctdb_db_handle(ctdb, name);
1647         if (ctdb_db) {
1648                 return ctdb_db;
1649         }
1650
1651         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1652         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1653
1654         ctdb_db->ctdb = ctdb;
1655         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1656         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1657
1658         data.dptr = discard_const(name);
1659         data.dsize = strlen(name)+1;
1660
1661         /* tell ctdb daemon to attach */
1662         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1663                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1664                            0, data, ctdb_db, &data, &res, NULL, NULL);
1665         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1666                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1667                 talloc_free(ctdb_db);
1668                 return NULL;
1669         }
1670         
1671         ctdb_db->db_id = *(uint32_t *)data.dptr;
1672         talloc_free(data.dptr);
1673
1674         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1675         if (ret != 0) {
1676                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1677                 talloc_free(ctdb_db);
1678                 return NULL;
1679         }
1680
1681         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1682         if (!ctdb->do_setsched) {
1683                 tdb_flags |= TDB_NOMMAP;
1684         }
1685
1686         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1687         if (ctdb_db->ltdb == NULL) {
1688                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1689                 talloc_free(ctdb_db);
1690                 return NULL;
1691         }
1692
1693         ctdb_db->persistent = persistent;
1694
1695         DLIST_ADD(ctdb->db_list, ctdb_db);
1696
1697         /* add well known functions */
1698         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1699         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1700
1701         return ctdb_db;
1702 }
1703
1704
1705 /*
1706   setup a call for a database
1707  */
1708 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1709 {
1710         struct ctdb_registered_call *call;
1711
1712 #if 0
1713         TDB_DATA data;
1714         int32_t status;
1715         struct ctdb_control_set_call c;
1716         int ret;
1717
1718         /* this is no longer valid with the separate daemon architecture */
1719         c.db_id = ctdb_db->db_id;
1720         c.fn    = fn;
1721         c.id    = id;
1722
1723         data.dptr = (uint8_t *)&c;
1724         data.dsize = sizeof(c);
1725
1726         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1727                            data, NULL, NULL, &status, NULL, NULL);
1728         if (ret != 0 || status != 0) {
1729                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1730                 return -1;
1731         }
1732 #endif
1733
1734         /* also register locally */
1735         call = talloc(ctdb_db, struct ctdb_registered_call);
1736         call->fn = fn;
1737         call->id = id;
1738
1739         DLIST_ADD(ctdb_db->calls, call);        
1740         return 0;
1741 }
1742
1743
1744 struct traverse_state {
1745         bool done;
1746         uint32_t count;
1747         ctdb_traverse_func fn;
1748         void *private_data;
1749 };
1750
1751 /*
1752   called on each key during a ctdb_traverse
1753  */
1754 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1755 {
1756         struct traverse_state *state = (struct traverse_state *)p;
1757         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1758         TDB_DATA key;
1759
1760         if (data.dsize < sizeof(uint32_t) ||
1761             d->length != data.dsize) {
1762                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1763                 state->done = True;
1764                 return;
1765         }
1766
1767         key.dsize = d->keylen;
1768         key.dptr  = &d->data[0];
1769         data.dsize = d->datalen;
1770         data.dptr = &d->data[d->keylen];
1771
1772         if (key.dsize == 0 && data.dsize == 0) {
1773                 /* end of traverse */
1774                 state->done = True;
1775                 return;
1776         }
1777
1778         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1779                 /* empty records are deleted records in ctdb */
1780                 return;
1781         }
1782
1783         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1784                 state->done = True;
1785         }
1786
1787         state->count++;
1788 }
1789
1790
1791 /*
1792   start a cluster wide traverse, calling the supplied fn on each record
1793   return the number of records traversed, or -1 on error
1794  */
1795 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1796 {
1797         TDB_DATA data;
1798         struct ctdb_traverse_start t;
1799         int32_t status;
1800         int ret;
1801         uint64_t srvid = (getpid() | 0xFLL<<60);
1802         struct traverse_state state;
1803
1804         state.done = False;
1805         state.count = 0;
1806         state.private_data = private_data;
1807         state.fn = fn;
1808
1809         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1812                 return -1;
1813         }
1814
1815         t.db_id = ctdb_db->db_id;
1816         t.srvid = srvid;
1817         t.reqid = 0;
1818
1819         data.dptr = (uint8_t *)&t;
1820         data.dsize = sizeof(t);
1821
1822         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1823                            data, NULL, NULL, &status, NULL, NULL);
1824         if (ret != 0 || status != 0) {
1825                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1826                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1827                 return -1;
1828         }
1829
1830         while (!state.done) {
1831                 event_loop_once(ctdb_db->ctdb->ev);
1832         }
1833
1834         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1837                 return -1;
1838         }
1839
1840         return state.count;
1841 }
1842
1843 #define ISASCII(x) ((x>31)&&(x<128))
1844 /*
1845   called on each key during a catdb
1846  */
1847 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1848 {
1849         int i;
1850         FILE *f = (FILE *)p;
1851         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1852
1853         fprintf(f, "dmaster: %u\n", h->dmaster);
1854         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1855
1856         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1857         for (i=0;i<key.dsize;i++) {
1858                 if (ISASCII(key.dptr[i])) {
1859                         fprintf(f, "%c", key.dptr[i]);
1860                 } else {
1861                         fprintf(f, "\\%02X", key.dptr[i]);
1862                 }
1863         }
1864         fprintf(f, "\"\n");
1865
1866         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1867         for (i=sizeof(*h);i<data.dsize;i++) {
1868                 if (ISASCII(data.dptr[i])) {
1869                         fprintf(f, "%c", data.dptr[i]);
1870                 } else {
1871                         fprintf(f, "\\%02X", data.dptr[i]);
1872                 }
1873         }
1874         fprintf(f, "\"\n");
1875
1876         return 0;
1877 }
1878
1879 /*
1880   convenience function to list all keys to stdout
1881  */
1882 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1883 {
1884         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1885 }
1886
1887 /*
1888   get the pid of a ctdb daemon
1889  */
1890 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1891 {
1892         int ret;
1893         int32_t res;
1894
1895         ret = ctdb_control(ctdb, destnode, 0, 
1896                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1897                            NULL, NULL, &res, &timeout, NULL);
1898         if (ret != 0) {
1899                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1900                 return -1;
1901         }
1902
1903         *pid = res;
1904
1905         return 0;
1906 }
1907
1908
1909 /*
1910   async freeze send control
1911  */
1912 struct ctdb_client_control_state *
1913 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1914 {
1915         return ctdb_control_send(ctdb, destnode, priority, 
1916                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1917                            mem_ctx, &timeout, NULL);
1918 }
1919
1920 /* 
1921    async freeze recv control
1922 */
1923 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1924 {
1925         int ret;
1926         int32_t res;
1927
1928         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1929         if ( (ret != 0) || (res != 0) ){
1930                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1931                 return -1;
1932         }
1933
1934         return 0;
1935 }
1936
1937 /*
1938   freeze databases of a certain priority
1939  */
1940 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1941 {
1942         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1943         struct ctdb_client_control_state *state;
1944         int ret;
1945
1946         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1947         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1948         talloc_free(tmp_ctx);
1949
1950         return ret;
1951 }
1952
1953 /* Freeze all databases */
1954 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1955 {
1956         int i;
1957
1958         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1959                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1960                         return -1;
1961                 }
1962         }
1963         return 0;
1964 }
1965
1966 /*
1967   thaw databases of a certain priority
1968  */
1969 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1970 {
1971         int ret;
1972         int32_t res;
1973
1974         ret = ctdb_control(ctdb, destnode, priority, 
1975                            CTDB_CONTROL_THAW, 0, tdb_null, 
1976                            NULL, NULL, &res, &timeout, NULL);
1977         if (ret != 0 || res != 0) {
1978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1979                 return -1;
1980         }
1981
1982         return 0;
1983 }
1984
1985 /* thaw all databases */
1986 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1987 {
1988         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1989 }
1990
1991 /*
1992   get pnn of a node, or -1
1993  */
1994 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1995 {
1996         int ret;
1997         int32_t res;
1998
1999         ret = ctdb_control(ctdb, destnode, 0, 
2000                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2001                            NULL, NULL, &res, &timeout, NULL);
2002         if (ret != 0) {
2003                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2004                 return -1;
2005         }
2006
2007         return res;
2008 }
2009
2010 /*
2011   get the monitoring mode of a remote node
2012  */
2013 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2014 {
2015         int ret;
2016         int32_t res;
2017
2018         ret = ctdb_control(ctdb, destnode, 0, 
2019                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2020                            NULL, NULL, &res, &timeout, NULL);
2021         if (ret != 0) {
2022                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2023                 return -1;
2024         }
2025
2026         *monmode = res;
2027
2028         return 0;
2029 }
2030
2031
2032 /*
2033  set the monitoring mode of a remote node to active
2034  */
2035 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2036 {
2037         int ret;
2038         
2039
2040         ret = ctdb_control(ctdb, destnode, 0, 
2041                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2042                            NULL, NULL,NULL, &timeout, NULL);
2043         if (ret != 0) {
2044                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2045                 return -1;
2046         }
2047
2048         
2049
2050         return 0;
2051 }
2052
2053 /*
2054   set the monitoring mode of a remote node to disable
2055  */
2056 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2057 {
2058         int ret;
2059         
2060
2061         ret = ctdb_control(ctdb, destnode, 0, 
2062                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2063                            NULL, NULL, NULL, &timeout, NULL);
2064         if (ret != 0) {
2065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2066                 return -1;
2067         }
2068
2069         
2070
2071         return 0;
2072 }
2073
2074
2075
2076 /* 
2077   sent to a node to make it take over an ip address
2078 */
2079 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2080                           uint32_t destnode, struct ctdb_public_ip *ip)
2081 {
2082         TDB_DATA data;
2083         struct ctdb_public_ipv4 ipv4;
2084         int ret;
2085         int32_t res;
2086
2087         if (ip->addr.sa.sa_family == AF_INET) {
2088                 ipv4.pnn = ip->pnn;
2089                 ipv4.sin = ip->addr.ip;
2090
2091                 data.dsize = sizeof(ipv4);
2092                 data.dptr  = (uint8_t *)&ipv4;
2093
2094                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2095                            NULL, &res, &timeout, NULL);
2096         } else {
2097                 data.dsize = sizeof(*ip);
2098                 data.dptr  = (uint8_t *)ip;
2099
2100                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2101                            NULL, &res, &timeout, NULL);
2102         }
2103
2104         if (ret != 0 || res != 0) {
2105                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2106                 return -1;
2107         }
2108
2109         return 0;       
2110 }
2111
2112
2113 /* 
2114   sent to a node to make it release an ip address
2115 */
2116 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2117                          uint32_t destnode, struct ctdb_public_ip *ip)
2118 {
2119         TDB_DATA data;
2120         struct ctdb_public_ipv4 ipv4;
2121         int ret;
2122         int32_t res;
2123
2124         if (ip->addr.sa.sa_family == AF_INET) {
2125                 ipv4.pnn = ip->pnn;
2126                 ipv4.sin = ip->addr.ip;
2127
2128                 data.dsize = sizeof(ipv4);
2129                 data.dptr  = (uint8_t *)&ipv4;
2130
2131                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2132                                    NULL, &res, &timeout, NULL);
2133         } else {
2134                 data.dsize = sizeof(*ip);
2135                 data.dptr  = (uint8_t *)ip;
2136
2137                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2138                                    NULL, &res, &timeout, NULL);
2139         }
2140
2141         if (ret != 0 || res != 0) {
2142                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2143                 return -1;
2144         }
2145
2146         return 0;       
2147 }
2148
2149
2150 /*
2151   get a tunable
2152  */
2153 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2154                           struct timeval timeout, 
2155                           uint32_t destnode,
2156                           const char *name, uint32_t *value)
2157 {
2158         struct ctdb_control_get_tunable *t;
2159         TDB_DATA data, outdata;
2160         int32_t res;
2161         int ret;
2162
2163         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2164         data.dptr  = talloc_size(ctdb, data.dsize);
2165         CTDB_NO_MEMORY(ctdb, data.dptr);
2166
2167         t = (struct ctdb_control_get_tunable *)data.dptr;
2168         t->length = strlen(name)+1;
2169         memcpy(t->name, name, t->length);
2170
2171         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2172                            &outdata, &res, &timeout, NULL);
2173         talloc_free(data.dptr);
2174         if (ret != 0 || res != 0) {
2175                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2176                 return -1;
2177         }
2178
2179         if (outdata.dsize != sizeof(uint32_t)) {
2180                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2181                 talloc_free(outdata.dptr);
2182                 return -1;
2183         }
2184         
2185         *value = *(uint32_t *)outdata.dptr;
2186         talloc_free(outdata.dptr);
2187
2188         return 0;
2189 }
2190
2191 /*
2192   set a tunable
2193  */
2194 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2195                           struct timeval timeout, 
2196                           uint32_t destnode,
2197                           const char *name, uint32_t value)
2198 {
2199         struct ctdb_control_set_tunable *t;
2200         TDB_DATA data;
2201         int32_t res;
2202         int ret;
2203
2204         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2205         data.dptr  = talloc_size(ctdb, data.dsize);
2206         CTDB_NO_MEMORY(ctdb, data.dptr);
2207
2208         t = (struct ctdb_control_set_tunable *)data.dptr;
2209         t->length = strlen(name)+1;
2210         memcpy(t->name, name, t->length);
2211         t->value = value;
2212
2213         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2214                            NULL, &res, &timeout, NULL);
2215         talloc_free(data.dptr);
2216         if (ret != 0 || res != 0) {
2217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2218                 return -1;
2219         }
2220
2221         return 0;
2222 }
2223
2224 /*
2225   list tunables
2226  */
2227 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2228                             struct timeval timeout, 
2229                             uint32_t destnode,
2230                             TALLOC_CTX *mem_ctx,
2231                             const char ***list, uint32_t *count)
2232 {
2233         TDB_DATA outdata;
2234         int32_t res;
2235         int ret;
2236         struct ctdb_control_list_tunable *t;
2237         char *p, *s, *ptr;
2238
2239         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2240                            mem_ctx, &outdata, &res, &timeout, NULL);
2241         if (ret != 0 || res != 0) {
2242                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2243                 return -1;
2244         }
2245
2246         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2247         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2248             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2249                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2250                 talloc_free(outdata.dptr);
2251                 return -1;              
2252         }
2253         
2254         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2255         CTDB_NO_MEMORY(ctdb, p);
2256
2257         talloc_free(outdata.dptr);
2258         
2259         (*list) = NULL;
2260         (*count) = 0;
2261
2262         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2263                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2264                 CTDB_NO_MEMORY(ctdb, *list);
2265                 (*list)[*count] = talloc_strdup(*list, s);
2266                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2267                 (*count)++;
2268         }
2269
2270         talloc_free(p);
2271
2272         return 0;
2273 }
2274
2275
2276 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2277                         struct timeval timeout, uint32_t destnode, 
2278                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2279 {
2280         int ret;
2281         TDB_DATA outdata;
2282         int32_t res;
2283
2284         ret = ctdb_control(ctdb, destnode, 0, 
2285                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2286                            mem_ctx, &outdata, &res, &timeout, NULL);
2287         if (ret == 0 && res == -1) {
2288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2289                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2290         }
2291         if (ret != 0 || res != 0) {
2292           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2293                 return -1;
2294         }
2295
2296         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2297         talloc_free(outdata.dptr);
2298                     
2299         return 0;
2300 }
2301
2302 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2303                         struct timeval timeout, uint32_t destnode, 
2304                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2305 {
2306         int ret, i, len;
2307         TDB_DATA outdata;
2308         int32_t res;
2309         struct ctdb_all_public_ipsv4 *ipsv4;
2310
2311         ret = ctdb_control(ctdb, destnode, 0, 
2312                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2313                            mem_ctx, &outdata, &res, &timeout, NULL);
2314         if (ret != 0 || res != 0) {
2315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2316                 return -1;
2317         }
2318
2319         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2320         len = offsetof(struct ctdb_all_public_ips, ips) +
2321                 ipsv4->num*sizeof(struct ctdb_public_ip);
2322         *ips = talloc_zero_size(mem_ctx, len);
2323         CTDB_NO_MEMORY(ctdb, *ips);
2324         (*ips)->num = ipsv4->num;
2325         for (i=0; i<ipsv4->num; i++) {
2326                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2327                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2328         }
2329
2330         talloc_free(outdata.dptr);
2331                     
2332         return 0;
2333 }
2334
2335 /*
2336   set/clear the permanent disabled bit on a remote node
2337  */
2338 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2339                        uint32_t set, uint32_t clear)
2340 {
2341         int ret;
2342         TDB_DATA data;
2343         struct ctdb_node_map *nodemap=NULL;
2344         struct ctdb_node_flag_change c;
2345         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2346         uint32_t recmaster;
2347         uint32_t *nodes;
2348
2349
2350         /* find the recovery master */
2351         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2352         if (ret != 0) {
2353                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2354                 talloc_free(tmp_ctx);
2355                 return ret;
2356         }
2357
2358
2359         /* read the node flags from the recmaster */
2360         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2361         if (ret != 0) {
2362                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2363                 talloc_free(tmp_ctx);
2364                 return -1;
2365         }
2366         if (destnode >= nodemap->num) {
2367                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2368                 talloc_free(tmp_ctx);
2369                 return -1;
2370         }
2371
2372         c.pnn       = destnode;
2373         c.old_flags = nodemap->nodes[destnode].flags;
2374         c.new_flags = c.old_flags;
2375         c.new_flags |= set;
2376         c.new_flags &= ~clear;
2377
2378         data.dsize = sizeof(c);
2379         data.dptr = (unsigned char *)&c;
2380
2381         /* send the flags update to all connected nodes */
2382         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2383
2384         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2385                                         nodes, 0,
2386                                         timeout, false, data,
2387                                         NULL, NULL,
2388                                         NULL) != 0) {
2389                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2390
2391                 talloc_free(tmp_ctx);
2392                 return -1;
2393         }
2394
2395         talloc_free(tmp_ctx);
2396         return 0;
2397 }
2398
2399
2400 /*
2401   get all tunables
2402  */
2403 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2404                                struct timeval timeout, 
2405                                uint32_t destnode,
2406                                struct ctdb_tunable *tunables)
2407 {
2408         TDB_DATA outdata;
2409         int ret;
2410         int32_t res;
2411
2412         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2413                            &outdata, &res, &timeout, NULL);
2414         if (ret != 0 || res != 0) {
2415                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2416                 return -1;
2417         }
2418
2419         if (outdata.dsize != sizeof(*tunables)) {
2420                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2421                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2422                 return -1;              
2423         }
2424
2425         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2426         talloc_free(outdata.dptr);
2427         return 0;
2428 }
2429
2430 /*
2431   add a public address to a node
2432  */
2433 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2434                       struct timeval timeout, 
2435                       uint32_t destnode,
2436                       struct ctdb_control_ip_iface *pub)
2437 {
2438         TDB_DATA data;
2439         int32_t res;
2440         int ret;
2441
2442         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2443         data.dptr  = (unsigned char *)pub;
2444
2445         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2446                            NULL, &res, &timeout, NULL);
2447         if (ret != 0 || res != 0) {
2448                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2449                 return -1;
2450         }
2451
2452         return 0;
2453 }
2454
2455 /*
2456   delete a public address from a node
2457  */
2458 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2459                       struct timeval timeout, 
2460                       uint32_t destnode,
2461                       struct ctdb_control_ip_iface *pub)
2462 {
2463         TDB_DATA data;
2464         int32_t res;
2465         int ret;
2466
2467         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2468         data.dptr  = (unsigned char *)pub;
2469
2470         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2471                            NULL, &res, &timeout, NULL);
2472         if (ret != 0 || res != 0) {
2473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2474                 return -1;
2475         }
2476
2477         return 0;
2478 }
2479
2480 /*
2481   kill a tcp connection
2482  */
2483 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2484                       struct timeval timeout, 
2485                       uint32_t destnode,
2486                       struct ctdb_control_killtcp *killtcp)
2487 {
2488         TDB_DATA data;
2489         int32_t res;
2490         int ret;
2491
2492         data.dsize = sizeof(struct ctdb_control_killtcp);
2493         data.dptr  = (unsigned char *)killtcp;
2494
2495         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2496                            NULL, &res, &timeout, NULL);
2497         if (ret != 0 || res != 0) {
2498                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2499                 return -1;
2500         }
2501
2502         return 0;
2503 }
2504
2505 /*
2506   send a gratious arp
2507  */
2508 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2509                       struct timeval timeout, 
2510                       uint32_t destnode,
2511                       ctdb_sock_addr *addr,
2512                       const char *ifname)
2513 {
2514         TDB_DATA data;
2515         int32_t res;
2516         int ret, len;
2517         struct ctdb_control_gratious_arp *gratious_arp;
2518         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2519
2520
2521         len = strlen(ifname)+1;
2522         gratious_arp = talloc_size(tmp_ctx, 
2523                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2524         CTDB_NO_MEMORY(ctdb, gratious_arp);
2525
2526         gratious_arp->addr = *addr;
2527         gratious_arp->len = len;
2528         memcpy(&gratious_arp->iface[0], ifname, len);
2529
2530
2531         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2532         data.dptr  = (unsigned char *)gratious_arp;
2533
2534         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2535                            NULL, &res, &timeout, NULL);
2536         if (ret != 0 || res != 0) {
2537                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2538                 talloc_free(tmp_ctx);
2539                 return -1;
2540         }
2541
2542         talloc_free(tmp_ctx);
2543         return 0;
2544 }
2545
2546 /*
2547   get a list of all tcp tickles that a node knows about for a particular vnn
2548  */
2549 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2550                               struct timeval timeout, uint32_t destnode, 
2551                               TALLOC_CTX *mem_ctx, 
2552                               ctdb_sock_addr *addr,
2553                               struct ctdb_control_tcp_tickle_list **list)
2554 {
2555         int ret;
2556         TDB_DATA data, outdata;
2557         int32_t status;
2558
2559         data.dptr = (uint8_t*)addr;
2560         data.dsize = sizeof(ctdb_sock_addr);
2561
2562         ret = ctdb_control(ctdb, destnode, 0, 
2563                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2564                            mem_ctx, &outdata, &status, NULL, NULL);
2565         if (ret != 0 || status != 0) {
2566                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2567                 return -1;
2568         }
2569
2570         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2571
2572         return status;
2573 }
2574
2575 /*
2576   register a server id
2577  */
2578 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2579                       struct timeval timeout, 
2580                       struct ctdb_server_id *id)
2581 {
2582         TDB_DATA data;
2583         int32_t res;
2584         int ret;
2585
2586         data.dsize = sizeof(struct ctdb_server_id);
2587         data.dptr  = (unsigned char *)id;
2588
2589         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2590                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2591                         0, data, NULL,
2592                         NULL, &res, &timeout, NULL);
2593         if (ret != 0 || res != 0) {
2594                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2595                 return -1;
2596         }
2597
2598         return 0;
2599 }
2600
2601 /*
2602   unregister a server id
2603  */
2604 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2605                       struct timeval timeout, 
2606                       struct ctdb_server_id *id)
2607 {
2608         TDB_DATA data;
2609         int32_t res;
2610         int ret;
2611
2612         data.dsize = sizeof(struct ctdb_server_id);
2613         data.dptr  = (unsigned char *)id;
2614
2615         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2616                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2617                         0, data, NULL,
2618                         NULL, &res, &timeout, NULL);
2619         if (ret != 0 || res != 0) {
2620                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2621                 return -1;
2622         }
2623
2624         return 0;
2625 }
2626
2627
2628 /*
2629   check if a server id exists
2630
2631   if a server id does exist, return *status == 1, otherwise *status == 0
2632  */
2633 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2634                       struct timeval timeout, 
2635                       uint32_t destnode,
2636                       struct ctdb_server_id *id,
2637                       uint32_t *status)
2638 {
2639         TDB_DATA data;
2640         int32_t res;
2641         int ret;
2642
2643         data.dsize = sizeof(struct ctdb_server_id);
2644         data.dptr  = (unsigned char *)id;
2645
2646         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2647                         0, data, NULL,
2648                         NULL, &res, &timeout, NULL);
2649         if (ret != 0) {
2650                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2651                 return -1;
2652         }
2653
2654         if (res) {
2655                 *status = 1;
2656         } else {
2657                 *status = 0;
2658         }
2659
2660         return 0;
2661 }
2662
2663 /*
2664    get the list of server ids that are registered on a node
2665 */
2666 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2667                 TALLOC_CTX *mem_ctx,
2668                 struct timeval timeout, uint32_t destnode, 
2669                 struct ctdb_server_id_list **svid_list)
2670 {
2671         int ret;
2672         TDB_DATA outdata;
2673         int32_t res;
2674
2675         ret = ctdb_control(ctdb, destnode, 0, 
2676                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2677                            mem_ctx, &outdata, &res, &timeout, NULL);
2678         if (ret != 0 || res != 0) {
2679                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2680                 return -1;
2681         }
2682
2683         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2684                     
2685         return 0;
2686 }
2687
2688 /*
2689   initialise the ctdb daemon for client applications
2690
2691   NOTE: In current code the daemon does not fork. This is for testing purposes only
2692   and to simplify the code.
2693 */
2694 struct ctdb_context *ctdb_init(struct event_context *ev)
2695 {
2696         int ret;
2697         struct ctdb_context *ctdb;
2698
2699         ctdb = talloc_zero(ev, struct ctdb_context);
2700         if (ctdb == NULL) {
2701                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2702                 return NULL;
2703         }
2704         ctdb->ev  = ev;
2705         ctdb->idr = idr_init(ctdb);
2706         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2707
2708         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2709         if (ret != 0) {
2710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2711                 talloc_free(ctdb);
2712                 return NULL;
2713         }
2714
2715         return ctdb;
2716 }
2717
2718
2719 /*
2720   set some ctdb flags
2721 */
2722 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2723 {
2724         ctdb->flags |= flags;
2725 }
2726
2727 /*
2728   setup the local socket name
2729 */
2730 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2731 {
2732         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2733         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2734
2735         return 0;
2736 }
2737
2738 /*
2739   return the pnn of this node
2740 */
2741 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2742 {
2743         return ctdb->pnn;
2744 }
2745
2746
2747 /*
2748   get the uptime of a remote node
2749  */
2750 struct ctdb_client_control_state *
2751 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2752 {
2753         return ctdb_control_send(ctdb, destnode, 0, 
2754                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2755                            mem_ctx, &timeout, NULL);
2756 }
2757
2758 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2759 {
2760         int ret;
2761         int32_t res;
2762         TDB_DATA outdata;
2763
2764         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2765         if (ret != 0 || res != 0) {
2766                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2767                 return -1;
2768         }
2769
2770         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2771
2772         return 0;
2773 }
2774
2775 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2776 {
2777         struct ctdb_client_control_state *state;
2778
2779         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2780         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2781 }
2782
2783 /*
2784   send a control to execute the "recovered" event script on a node
2785  */
2786 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2787 {
2788         int ret;
2789         int32_t status;
2790
2791         ret = ctdb_control(ctdb, destnode, 0, 
2792                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2793                            NULL, NULL, &status, &timeout, NULL);
2794         if (ret != 0 || status != 0) {
2795                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2796                 return -1;
2797         }
2798
2799         return 0;
2800 }
2801
2802 /* 
2803   callback for the async helpers used when sending the same control
2804   to multiple nodes in parallell.
2805 */
2806 static void async_callback(struct ctdb_client_control_state *state)
2807 {
2808         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2809         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2810         int ret;
2811         TDB_DATA outdata;
2812         int32_t res;
2813         uint32_t destnode = state->c->hdr.destnode;
2814
2815         /* one more node has responded with recmode data */
2816         data->count--;
2817
2818         /* if we failed to push the db, then return an error and let
2819            the main loop try again.
2820         */
2821         if (state->state != CTDB_CONTROL_DONE) {
2822                 if ( !data->dont_log_errors) {
2823                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2824                 }
2825                 data->fail_count++;
2826                 if (data->fail_callback) {
2827                         data->fail_callback(ctdb, destnode, res, outdata,
2828                                         data->callback_data);
2829                 }
2830                 return;
2831         }
2832         
2833         state->async.fn = NULL;
2834
2835         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2836         if ((ret != 0) || (res != 0)) {
2837                 if ( !data->dont_log_errors) {
2838                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2839                 }
2840                 data->fail_count++;
2841                 if (data->fail_callback) {
2842                         data->fail_callback(ctdb, destnode, res, outdata,
2843                                         data->callback_data);
2844                 }
2845         }
2846         if ((ret == 0) && (data->callback != NULL)) {
2847                 data->callback(ctdb, destnode, res, outdata,
2848                                         data->callback_data);
2849         }
2850 }
2851
2852
2853 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2854 {
2855         /* set up the callback functions */
2856         state->async.fn = async_callback;
2857         state->async.private_data = data;
2858         
2859         /* one more control to wait for to complete */
2860         data->count++;
2861 }
2862
2863
2864 /* wait for up to the maximum number of seconds allowed
2865    or until all nodes we expect a response from has replied
2866 */
2867 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2868 {
2869         while (data->count > 0) {
2870                 event_loop_once(ctdb->ev);
2871         }
2872         if (data->fail_count != 0) {
2873                 if (!data->dont_log_errors) {
2874                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2875                                  data->fail_count));
2876                 }
2877                 return -1;
2878         }
2879         return 0;
2880 }
2881
2882
2883 /* 
2884    perform a simple control on the listed nodes
2885    The control cannot return data
2886  */
2887 int ctdb_client_async_control(struct ctdb_context *ctdb,
2888                                 enum ctdb_controls opcode,
2889                                 uint32_t *nodes,
2890                                 uint64_t srvid,
2891                                 struct timeval timeout,
2892                                 bool dont_log_errors,
2893                                 TDB_DATA data,
2894                                 client_async_callback client_callback,
2895                                 client_async_callback fail_callback,
2896                                 void *callback_data)
2897 {
2898         struct client_async_data *async_data;
2899         struct ctdb_client_control_state *state;
2900         int j, num_nodes;
2901
2902         async_data = talloc_zero(ctdb, struct client_async_data);
2903         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2904         async_data->dont_log_errors = dont_log_errors;
2905         async_data->callback = client_callback;
2906         async_data->fail_callback = fail_callback;
2907         async_data->callback_data = callback_data;
2908         async_data->opcode        = opcode;
2909
2910         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2911
2912         /* loop over all nodes and send an async control to each of them */
2913         for (j=0; j<num_nodes; j++) {
2914                 uint32_t pnn = nodes[j];
2915
2916                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2917                                           0, data, async_data, &timeout, NULL);
2918                 if (state == NULL) {
2919                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2920                         talloc_free(async_data);
2921                         return -1;
2922                 }
2923                 
2924                 ctdb_client_async_add(async_data, state);
2925         }
2926
2927         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2928                 talloc_free(async_data);
2929                 return -1;
2930         }
2931
2932         talloc_free(async_data);
2933         return 0;
2934 }
2935
2936 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2937                                 struct ctdb_vnn_map *vnn_map,
2938                                 TALLOC_CTX *mem_ctx,
2939                                 bool include_self)
2940 {
2941         int i, j, num_nodes;
2942         uint32_t *nodes;
2943
2944         for (i=num_nodes=0;i<vnn_map->size;i++) {
2945                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2946                         continue;
2947                 }
2948                 num_nodes++;
2949         } 
2950
2951         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2952         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2953
2954         for (i=j=0;i<vnn_map->size;i++) {
2955                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2956                         continue;
2957                 }
2958                 nodes[j++] = vnn_map->map[i];
2959         } 
2960
2961         return nodes;
2962 }
2963
2964 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2965                                 struct ctdb_node_map *node_map,
2966                                 TALLOC_CTX *mem_ctx,
2967                                 bool include_self)
2968 {
2969         int i, j, num_nodes;
2970         uint32_t *nodes;
2971
2972         for (i=num_nodes=0;i<node_map->num;i++) {
2973                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2974                         continue;
2975                 }
2976                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2977                         continue;
2978                 }
2979                 num_nodes++;
2980         } 
2981
2982         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2983         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2984
2985         for (i=j=0;i<node_map->num;i++) {
2986                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2987                         continue;
2988                 }
2989                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2990                         continue;
2991                 }
2992                 nodes[j++] = node_map->nodes[i].pnn;
2993         } 
2994
2995         return nodes;
2996 }
2997
2998 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2999                                 struct ctdb_node_map *node_map,
3000                                 TALLOC_CTX *mem_ctx,
3001                                 uint32_t pnn)
3002 {
3003         int i, j, num_nodes;
3004         uint32_t *nodes;
3005
3006         for (i=num_nodes=0;i<node_map->num;i++) {
3007                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3008                         continue;
3009                 }
3010                 if (node_map->nodes[i].pnn == pnn) {
3011                         continue;
3012                 }
3013                 num_nodes++;
3014         } 
3015
3016         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3017         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3018
3019         for (i=j=0;i<node_map->num;i++) {
3020                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3021                         continue;
3022                 }
3023                 if (node_map->nodes[i].pnn == pnn) {
3024                         continue;
3025                 }
3026                 nodes[j++] = node_map->nodes[i].pnn;
3027         } 
3028
3029         return nodes;
3030 }
3031
3032 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3033                                 struct ctdb_node_map *node_map,
3034                                 TALLOC_CTX *mem_ctx,
3035                                 bool include_self)
3036 {
3037         int i, j, num_nodes;
3038         uint32_t *nodes;
3039
3040         for (i=num_nodes=0;i<node_map->num;i++) {
3041                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3042                         continue;
3043                 }
3044                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3045                         continue;
3046                 }
3047                 num_nodes++;
3048         } 
3049
3050         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3051         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3052
3053         for (i=j=0;i<node_map->num;i++) {
3054                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3055                         continue;
3056                 }
3057                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3058                         continue;
3059                 }
3060                 nodes[j++] = node_map->nodes[i].pnn;
3061         } 
3062
3063         return nodes;
3064 }
3065
3066 /* 
3067   this is used to test if a pnn lock exists and if it exists will return
3068   the number of connections that pnn has reported or -1 if that recovery
3069   daemon is not running.
3070 */
3071 int
3072 ctdb_read_pnn_lock(int fd, int32_t pnn)
3073 {
3074         struct flock lock;
3075         char c;
3076
3077         lock.l_type = F_WRLCK;
3078         lock.l_whence = SEEK_SET;
3079         lock.l_start = pnn;
3080         lock.l_len = 1;
3081         lock.l_pid = 0;
3082
3083         if (fcntl(fd, F_GETLK, &lock) != 0) {
3084                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3085                 return -1;
3086         }
3087
3088         if (lock.l_type == F_UNLCK) {
3089                 return -1;
3090         }
3091
3092         if (pread(fd, &c, 1, pnn) == -1) {
3093                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3094                 return -1;
3095         }
3096
3097         return c;
3098 }
3099
3100 /*
3101   get capabilities of a remote node
3102  */
3103 struct ctdb_client_control_state *
3104 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3105 {
3106         return ctdb_control_send(ctdb, destnode, 0, 
3107                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3108                            mem_ctx, &timeout, NULL);
3109 }
3110
3111 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3112 {
3113         int ret;
3114         int32_t res;
3115         TDB_DATA outdata;
3116
3117         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3118         if ( (ret != 0) || (res != 0) ) {
3119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3120                 return -1;
3121         }
3122
3123         if (capabilities) {
3124                 *capabilities = *((uint32_t *)outdata.dptr);
3125         }
3126
3127         return 0;
3128 }
3129
3130 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3131 {
3132         struct ctdb_client_control_state *state;
3133         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3134         int ret;
3135
3136         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3137         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3138         talloc_free(tmp_ctx);
3139         return ret;
3140 }
3141
3142 struct ctdb_transaction_handle {
3143         struct ctdb_db_context *ctdb_db;
3144         bool in_replay;
3145         /* we store the reads and writes done under a transaction one
3146            list stores both reads and writes, the other just writes
3147         */
3148         struct ctdb_marshall_buffer *m_all;
3149         struct ctdb_marshall_buffer *m_write;
3150 };
3151
3152 /* start a transaction on a database */
3153 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3154 {
3155         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3156         return 0;
3157 }
3158
3159 /* start a transaction on a database */
3160 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3161 {
3162         struct ctdb_record_handle *rh;
3163         TDB_DATA key;
3164         TDB_DATA data;
3165         struct ctdb_ltdb_header header;
3166         TALLOC_CTX *tmp_ctx;
3167         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3168         int ret;
3169         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3170         pid_t pid;
3171
3172         key.dptr = discard_const(keyname);
3173         key.dsize = strlen(keyname);
3174
3175         if (!ctdb_db->persistent) {
3176                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3177                 return -1;
3178         }
3179
3180 again:
3181         tmp_ctx = talloc_new(h);
3182
3183         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3184         if (rh == NULL) {
3185                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3186                 talloc_free(tmp_ctx);
3187                 return -1;
3188         }
3189         /*
3190          * store the pid in the database:
3191          * it is not enough that the node is dmaster...
3192          */
3193         pid = getpid();
3194         data.dptr = (unsigned char *)&pid;
3195         data.dsize = sizeof(pid_t);
3196         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3197         if (ret != 0) {
3198                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3199                                   "transaction record\n"));
3200                 talloc_free(tmp_ctx);
3201                 return -1;
3202         }
3203
3204         talloc_free(rh);
3205
3206         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3207         if (ret != 0) {
3208                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3209                 talloc_free(tmp_ctx);
3210                 return -1;
3211         }
3212
3213         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3214         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3215                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3216                 talloc_free(tmp_ctx);
3217                 goto again;
3218         }
3219
3220         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3221                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3222                 talloc_free(tmp_ctx);
3223                 goto again;
3224         }
3225
3226         talloc_free(tmp_ctx);
3227
3228         return 0;
3229 }
3230
3231
3232 /* start a transaction on a database */
3233 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3234                                                        TALLOC_CTX *mem_ctx)
3235 {
3236         struct ctdb_transaction_handle *h;
3237         int ret;
3238
3239         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3240         if (h == NULL) {
3241                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3242                 return NULL;
3243         }
3244
3245         h->ctdb_db = ctdb_db;
3246
3247         ret = ctdb_transaction_fetch_start(h);
3248         if (ret != 0) {
3249                 talloc_free(h);
3250                 return NULL;
3251         }
3252
3253         talloc_set_destructor(h, ctdb_transaction_destructor);
3254
3255         return h;
3256 }
3257
3258
3259
3260 /*
3261   fetch a record inside a transaction
3262  */
3263 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3264                            TALLOC_CTX *mem_ctx, 
3265                            TDB_DATA key, TDB_DATA *data)
3266 {
3267         struct ctdb_ltdb_header header;
3268         int ret;
3269
3270         ZERO_STRUCT(header);
3271
3272         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3273         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3274                 /* record doesn't exist yet */
3275                 *data = tdb_null;
3276                 ret = 0;
3277         }
3278         
3279         if (ret != 0) {
3280                 return ret;
3281         }
3282
3283         if (!h->in_replay) {
3284                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3285                 if (h->m_all == NULL) {
3286                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3287                         return -1;
3288                 }
3289         }
3290
3291         return 0;
3292 }
3293
3294 /*
3295   stores a record inside a transaction
3296  */
3297 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3298                            TDB_DATA key, TDB_DATA data)
3299 {
3300         TALLOC_CTX *tmp_ctx = talloc_new(h);
3301         struct ctdb_ltdb_header header;
3302         TDB_DATA olddata;
3303         int ret;
3304
3305         ZERO_STRUCT(header);
3306
3307         /* we need the header so we can update the RSN */
3308         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3309         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3310                 /* the record doesn't exist - create one with us as dmaster.
3311                    This is only safe because we are in a transaction and this
3312                    is a persistent database */
3313                 ZERO_STRUCT(header);
3314         } else if (ret != 0) {
3315                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3316                 talloc_free(tmp_ctx);
3317                 return ret;
3318         }
3319
3320         if (data.dsize == olddata.dsize &&
3321             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3322                 /* save writing the same data */
3323                 talloc_free(tmp_ctx);
3324                 return 0;
3325         }
3326
3327         header.dmaster = h->ctdb_db->ctdb->pnn;
3328         header.rsn++;
3329
3330         if (!h->in_replay) {
3331                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3332                 if (h->m_all == NULL) {
3333                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3334                         talloc_free(tmp_ctx);
3335                         return -1;
3336                 }
3337         }               
3338
3339         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3340         if (h->m_write == NULL) {
3341                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3342                 talloc_free(tmp_ctx);
3343                 return -1;
3344         }
3345         
3346         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3347
3348         talloc_free(tmp_ctx);
3349         
3350         return ret;
3351 }
3352
3353 /*
3354   replay a transaction
3355  */
3356 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3357 {
3358         int ret, i;
3359         struct ctdb_rec_data *rec = NULL;
3360
3361         h->in_replay = true;
3362         talloc_free(h->m_write);
3363         h->m_write = NULL;
3364
3365         ret = ctdb_transaction_fetch_start(h);
3366         if (ret != 0) {
3367                 return ret;
3368         }
3369
3370         for (i=0;i<h->m_all->count;i++) {
3371                 TDB_DATA key, data;
3372
3373                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3374                 if (rec == NULL) {
3375                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3376                         goto failed;
3377                 }
3378
3379                 if (rec->reqid == 0) {
3380                         /* its a store */
3381                         if (ctdb_transaction_store(h, key, data) != 0) {
3382                                 goto failed;
3383                         }
3384                 } else {
3385                         TDB_DATA data2;
3386                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3387
3388                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3389                                 talloc_free(tmp_ctx);
3390                                 goto failed;
3391                         }
3392                         if (data2.dsize != data.dsize ||
3393                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3394                                 /* the record has changed on us - we have to give up */
3395                                 talloc_free(tmp_ctx);
3396                                 goto failed;
3397                         }
3398                         talloc_free(tmp_ctx);
3399                 }
3400         }
3401         
3402         return 0;
3403
3404 failed:
3405         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3406         return -1;
3407 }
3408
3409
3410 /*
3411   commit a transaction
3412  */
3413 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3414 {
3415         int ret, retries=0;
3416         int32_t status;
3417         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3418         struct timeval timeout;
3419         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3420
3421         talloc_set_destructor(h, NULL);
3422
3423         /* our commit strategy is quite complex.
3424
3425            - we first try to commit the changes to all other nodes
3426
3427            - if that works, then we commit locally and we are done
3428
3429            - if a commit on another node fails, then we need to cancel
3430              the transaction, then restart the transaction (thus
3431              opening a window of time for a pending recovery to
3432              complete), then replay the transaction, checking all the
3433              reads and writes (checking that reads give the same data,
3434              and writes succeed). Then we retry the transaction to the
3435              other nodes
3436         */
3437
3438 again:
3439         if (h->m_write == NULL) {
3440                 /* no changes were made */
3441                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3442                 talloc_free(h);
3443                 return 0;
3444         }
3445
3446         /* tell ctdbd to commit to the other nodes */
3447         timeout = timeval_current_ofs(1, 0);
3448         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3449                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3450                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3451                            &timeout, NULL);
3452         if (ret != 0 || status != 0) {
3453                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3454                 sleep(1);
3455
3456                 if (ret != 0) {
3457                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3458                 } else {
3459                         /* work out what error code we will give if we 
3460                            have to fail the operation */
3461                         switch ((enum ctdb_trans2_commit_error)status) {
3462                         case CTDB_TRANS2_COMMIT_SUCCESS:
3463                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3464                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3465                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3466                                 break;
3467                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3468                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3469                                 break;
3470                         }
3471                 }
3472
3473                 if (++retries == 10) {
3474                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3475                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3476                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3477                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3478                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3479                         talloc_free(h);
3480                         return -1;
3481                 }               
3482
3483                 if (ctdb_replay_transaction(h) != 0) {
3484                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3485                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3486                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3487                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3488                         talloc_free(h);
3489                         return -1;
3490                 }
3491                 goto again;
3492         } else {
3493                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3494         }
3495
3496         /* do the real commit locally */
3497         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3498         if (ret != 0) {
3499                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3500                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3501                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3502                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3503                 talloc_free(h);
3504                 return ret;
3505         }
3506
3507         /* tell ctdbd that we are finished with our local commit */
3508         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3509                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3510                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3511         talloc_free(h);
3512         return 0;
3513 }
3514
3515 /*
3516   recovery daemon ping to main daemon
3517  */
3518 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3519 {
3520         int ret;
3521         int32_t res;
3522
3523         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3524                            ctdb, NULL, &res, NULL, NULL);
3525         if (ret != 0 || res != 0) {
3526                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3527                 return -1;
3528         }
3529
3530         return 0;
3531 }
3532
3533 /* when forking the main daemon and the child process needs to connect back
3534  * to the daemon as a client process, this function can be used to change
3535  * the ctdb context from daemon into client mode
3536  */
3537 int switch_from_server_to_client(struct ctdb_context *ctdb)
3538 {
3539         int ret;
3540
3541         /* shutdown the transport */
3542         if (ctdb->methods) {
3543                 ctdb->methods->shutdown(ctdb);
3544         }
3545
3546         /* get a new event context */
3547         talloc_free(ctdb->ev);
3548         ctdb->ev = event_context_init(ctdb);
3549
3550         close(ctdb->daemon.sd);
3551         ctdb->daemon.sd = -1;
3552
3553         /* the client does not need to be realtime */
3554         if (ctdb->do_setsched) {
3555                 ctdb_restore_scheduler(ctdb);
3556         }
3557
3558         /* initialise ctdb */
3559         ret = ctdb_socket_connect(ctdb);
3560         if (ret != 0) {
3561                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3562                 return -1;
3563         }
3564
3565          return 0;
3566 }
3567
3568 /*
3569   tell the main daemon we are starting a new monitor event script
3570  */
3571 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3572 {
3573         int ret;
3574         int32_t res;
3575
3576         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3577                            ctdb, NULL, &res, NULL, NULL);
3578         if (ret != 0 || res != 0) {
3579                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3580                 return -1;
3581         }
3582
3583         return 0;
3584 }
3585
3586 /*
3587   tell the main daemon we are starting a new monitor event script
3588  */
3589 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3590 {
3591         int ret;
3592         int32_t res;
3593
3594         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3595                            ctdb, NULL, &res, NULL, NULL);
3596         if (ret != 0 || res != 0) {
3597                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3598                 return -1;
3599         }
3600
3601         return 0;
3602 }
3603
3604 /*
3605   tell the main daemon we are starting to run an eventscript
3606  */
3607 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3608 {
3609         int ret;
3610         int32_t res;
3611         TDB_DATA data;
3612
3613         data.dptr = discard_const(name);
3614         data.dsize = strlen(name)+1;
3615
3616         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3617                            ctdb, NULL, &res, NULL, NULL);
3618         if (ret != 0 || res != 0) {
3619                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3620                 return -1;
3621         }
3622
3623         return 0;
3624 }
3625
3626 /*
3627   tell the main daemon the status of the script we ran
3628  */
3629 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3630 {
3631         int ret;
3632         int32_t res;
3633         TDB_DATA data;
3634
3635         data.dptr = (uint8_t *)&result;
3636         data.dsize = sizeof(result);
3637
3638         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3639                            ctdb, NULL, &res, NULL, NULL);
3640         if (ret != 0 || res != 0) {
3641                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3642                 return -1;
3643         }
3644
3645         return 0;
3646 }
3647
3648 /*
3649   tell the main daemon a script was disabled
3650  */
3651 int ctdb_ctrl_event_script_disabled(struct ctdb_context *ctdb, const char *name)
3652 {
3653         int ret;
3654         int32_t res;
3655         TDB_DATA data;
3656
3657         data.dptr = discard_const(name);
3658         data.dsize = strlen(name)+1;
3659
3660         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_DISABLED, 0, data, 
3661                            ctdb, NULL, &res, NULL, NULL);
3662         if (ret != 0 || res != 0) {
3663                 DEBUG(DEBUG_ERR,("Failed to send event_script_disabeld\n"));
3664                 return -1;
3665         }
3666
3667         return 0;
3668 }
3669
3670 /*
3671   get the status of running the monitor eventscripts
3672  */
3673 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3674                 struct timeval timeout, uint32_t destnode, 
3675                 TALLOC_CTX *mem_ctx,
3676                 struct ctdb_monitoring_wire **script_status)
3677 {
3678         int ret;
3679         TDB_DATA outdata;
3680         int32_t res;
3681
3682         ret = ctdb_control(ctdb, destnode, 0, 
3683                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3684                            mem_ctx, &outdata, &res, &timeout, NULL);
3685         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3686                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3687                 return -1;
3688         }
3689
3690         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3691         talloc_free(outdata.dptr);
3692                     
3693         return 0;
3694 }
3695
3696 /*
3697   tell the main daemon how long it took to lock the reclock file
3698  */
3699 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3700 {
3701         int ret;
3702         int32_t res;
3703         TDB_DATA data;
3704
3705         data.dptr = (uint8_t *)&latency;
3706         data.dsize = sizeof(latency);
3707
3708         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3709                            ctdb, NULL, &res, NULL, NULL);
3710         if (ret != 0 || res != 0) {
3711                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3712                 return -1;
3713         }
3714
3715         return 0;
3716 }
3717
3718 /*
3719   get the name of the reclock file
3720  */
3721 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3722                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3723                          const char **name)
3724 {
3725         int ret;
3726         int32_t res;
3727         TDB_DATA data;
3728
3729         ret = ctdb_control(ctdb, destnode, 0, 
3730                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3731                            mem_ctx, &data, &res, &timeout, NULL);
3732         if (ret != 0 || res != 0) {
3733                 return -1;
3734         }
3735
3736         if (data.dsize == 0) {
3737                 *name = NULL;
3738         } else {
3739                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3740         }
3741         talloc_free(data.dptr);
3742
3743         return 0;
3744 }
3745
3746 /*
3747   set the reclock filename for a node
3748  */
3749 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3750 {
3751         int ret;
3752         TDB_DATA data;
3753         int32_t res;
3754
3755         if (reclock == NULL) {
3756                 data.dsize = 0;
3757                 data.dptr  = NULL;
3758         } else {
3759                 data.dsize = strlen(reclock) + 1;
3760                 data.dptr  = discard_const(reclock);
3761         }
3762
3763         ret = ctdb_control(ctdb, destnode, 0, 
3764                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3765                            NULL, NULL, &res, &timeout, NULL);
3766         if (ret != 0 || res != 0) {
3767                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3768                 return -1;
3769         }
3770
3771         return 0;
3772 }
3773
3774 /*
3775   stop a node
3776  */
3777 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3778 {
3779         int ret;
3780         int32_t res;
3781
3782         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3783                            ctdb, NULL, &res, &timeout, NULL);
3784         if (ret != 0 || res != 0) {
3785                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3786                 return -1;
3787         }
3788
3789         return 0;
3790 }
3791
3792 /*
3793   continue a node
3794  */
3795 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3796 {
3797         int ret;
3798
3799         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3800                            ctdb, NULL, NULL, &timeout, NULL);
3801         if (ret != 0) {
3802                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3803                 return -1;
3804         }
3805
3806         return 0;
3807 }
3808
3809 /*
3810   set the natgw state for a node
3811  */
3812 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3813 {
3814         int ret;
3815         TDB_DATA data;
3816         int32_t res;
3817
3818         data.dsize = sizeof(natgwstate);
3819         data.dptr  = (uint8_t *)&natgwstate;
3820
3821         ret = ctdb_control(ctdb, destnode, 0, 
3822                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3823                            NULL, NULL, &res, &timeout, NULL);
3824         if (ret != 0 || res != 0) {
3825                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3826                 return -1;
3827         }
3828
3829         return 0;
3830 }
3831
3832 /*
3833   set the lmaster role for a node
3834  */
3835 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3836 {
3837         int ret;
3838         TDB_DATA data;
3839         int32_t res;
3840
3841         data.dsize = sizeof(lmasterrole);
3842         data.dptr  = (uint8_t *)&lmasterrole;
3843
3844         ret = ctdb_control(ctdb, destnode, 0, 
3845                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3846                            NULL, NULL, &res, &timeout, NULL);
3847         if (ret != 0 || res != 0) {
3848                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3849                 return -1;
3850         }
3851
3852         return 0;
3853 }
3854
3855 /*
3856   set the recmaster role for a node
3857  */
3858 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3859 {
3860         int ret;
3861         TDB_DATA data;
3862         int32_t res;
3863
3864         data.dsize = sizeof(recmasterrole);
3865         data.dptr  = (uint8_t *)&recmasterrole;
3866
3867         ret = ctdb_control(ctdb, destnode, 0, 
3868                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3869                            NULL, NULL, &res, &timeout, NULL);
3870         if (ret != 0 || res != 0) {
3871                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3872                 return -1;
3873         }
3874
3875         return 0;
3876 }
3877
3878 /* enable an eventscript
3879  */
3880 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3881 {
3882         int ret;
3883         TDB_DATA data;
3884         int32_t res;
3885
3886         data.dsize = strlen(script) + 1;
3887         data.dptr  = discard_const(script);
3888
3889         ret = ctdb_control(ctdb, destnode, 0, 
3890                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3891                            NULL, NULL, &res, &timeout, NULL);
3892         if (ret != 0 || res != 0) {
3893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3894                 return -1;
3895         }
3896
3897         return 0;
3898 }
3899
3900 /* disable an eventscript
3901  */
3902 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3903 {
3904         int ret;
3905         TDB_DATA data;
3906         int32_t res;
3907
3908         data.dsize = strlen(script) + 1;
3909         data.dptr  = discard_const(script);
3910
3911         ret = ctdb_control(ctdb, destnode, 0, 
3912                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3913                            NULL, NULL, &res, &timeout, NULL);
3914         if (ret != 0 || res != 0) {
3915                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3916                 return -1;
3917         }
3918
3919         return 0;
3920 }
3921
3922
3923 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3924 {
3925         int ret;
3926         TDB_DATA data;
3927         int32_t res;
3928
3929         data.dsize = sizeof(*bantime);
3930         data.dptr  = (uint8_t *)bantime;
3931
3932         ret = ctdb_control(ctdb, destnode, 0, 
3933                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3934                            NULL, NULL, &res, &timeout, NULL);
3935         if (ret != 0 || res != 0) {
3936                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3937                 return -1;
3938         }
3939
3940         return 0;
3941 }
3942
3943
3944 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3945 {
3946         int ret;
3947         TDB_DATA outdata;
3948         int32_t res;
3949         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3950
3951         ret = ctdb_control(ctdb, destnode, 0, 
3952                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3953                            tmp_ctx, &outdata, &res, &timeout, NULL);
3954         if (ret != 0 || res != 0) {
3955                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3956                 talloc_free(tmp_ctx);
3957                 return -1;
3958         }
3959
3960         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3961         talloc_free(tmp_ctx);
3962
3963         return 0;
3964 }
3965
3966
3967 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3968 {
3969         int ret;
3970         int32_t res;
3971         TDB_DATA data;
3972         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3973
3974         data.dptr = (uint8_t*)db_prio;
3975         data.dsize = sizeof(*db_prio);
3976
3977         ret = ctdb_control(ctdb, destnode, 0, 
3978                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3979                            tmp_ctx, NULL, &res, &timeout, NULL);
3980         if (ret != 0 || res != 0) {
3981                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3982                 talloc_free(tmp_ctx);
3983                 return -1;
3984         }
3985
3986         talloc_free(tmp_ctx);
3987
3988         return 0;
3989 }
3990
3991 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3992 {
3993         int ret;
3994         int32_t res;
3995         TDB_DATA data;
3996         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3997
3998         data.dptr = (uint8_t*)&db_id;
3999         data.dsize = sizeof(db_id);
4000
4001         ret = ctdb_control(ctdb, destnode, 0, 
4002                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4003                            tmp_ctx, NULL, &res, &timeout, NULL);
4004         if (ret != 0 || res < 0) {
4005                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4006                 talloc_free(tmp_ctx);
4007                 return -1;
4008         }
4009
4010         if (priority) {
4011                 *priority = res;
4012         }
4013
4014         talloc_free(tmp_ctx);
4015
4016         return 0;
4017 }