client: fix a race in the local race condition fix in transaction_start
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return 0;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372
373         return state;
374 }
375
376 /*
377   make a ctdb call to the local daemon - async send. Called from client context.
378
379   This constructs a ctdb_call request and queues it for processing. 
380   This call never blocks.
381 */
382 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
383                                               struct ctdb_call *call)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         struct ctdb_ltdb_header header;
388         TDB_DATA data;
389         int ret;
390         size_t len;
391         struct ctdb_req_call *c;
392
393         /* if the domain socket is not yet open, open it */
394         if (ctdb->daemon.sd==-1) {
395                 ctdb_socket_connect(ctdb);
396         }
397
398         ret = ctdb_ltdb_lock(ctdb_db, call->key);
399         if (ret != 0) {
400                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
401                 return NULL;
402         }
403
404         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
405
406         if (ret == 0 && header.dmaster == ctdb->pnn) {
407                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
408                 talloc_free(data.dptr);
409                 ctdb_ltdb_unlock(ctdb_db, call->key);
410                 return state;
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, call->key);
414         talloc_free(data.dptr);
415
416         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
419                 return NULL;
420         }
421         state->call = talloc_zero(state, struct ctdb_call);
422         if (state->call == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
424                 return NULL;
425         }
426
427         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
428         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
429         if (c == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
431                 return NULL;
432         }
433
434         state->reqid     = ctdb_reqid_new(ctdb, state);
435         state->ctdb_db = ctdb_db;
436         talloc_set_destructor(state, ctdb_client_call_destructor);
437
438         c->hdr.reqid     = state->reqid;
439         c->flags         = call->flags;
440         c->db_id         = ctdb_db->db_id;
441         c->callid        = call->call_id;
442         c->hopcount      = 0;
443         c->keylen        = call->key.dsize;
444         c->calldatalen   = call->call_data.dsize;
445         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
446         memcpy(&c->data[call->key.dsize], 
447                call->call_data.dptr, call->call_data.dsize);
448         *(state->call)              = *call;
449         state->call->call_data.dptr = &c->data[call->key.dsize];
450         state->call->key.dptr       = &c->data[0];
451
452         state->state  = CTDB_CALL_WAIT;
453
454
455         ctdb_client_queue_pkt(ctdb, &c->hdr);
456
457         return state;
458 }
459
460
461 /*
462   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
463 */
464 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
465 {
466         struct ctdb_client_call_state *state;
467
468         state = ctdb_call_send(ctdb_db, call);
469         return ctdb_call_recv(state, call);
470 }
471
472
473 /*
474   tell the daemon what messaging srvid we will use, and register the message
475   handler function in the client
476 */
477 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
478                              ctdb_message_fn_t handler,
479                              void *private_data)
480                                     
481 {
482         int res;
483         int32_t status;
484         
485         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
486                            tdb_null, NULL, NULL, &status, NULL, NULL);
487         if (res != 0 || status != 0) {
488                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
489                 return -1;
490         }
491
492         /* also need to register the handler with our own ctdb structure */
493         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
494 }
495
496 /*
497   tell the daemon we no longer want a srvid
498 */
499 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         ctdb_deregister_message_handler(ctdb, srvid, private_data);
513         return 0;
514 }
515
516
517 /*
518   send a message - from client context
519  */
520 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
521                       uint64_t srvid, TDB_DATA data)
522 {
523         struct ctdb_req_message *r;
524         int len, res;
525
526         len = offsetof(struct ctdb_req_message, data) + data.dsize;
527         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
528                                len, struct ctdb_req_message);
529         CTDB_NO_MEMORY(ctdb, r);
530
531         r->hdr.destnode  = pnn;
532         r->srvid         = srvid;
533         r->datalen       = data.dsize;
534         memcpy(&r->data[0], data.dptr, data.dsize);
535         
536         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
537         if (res != 0) {
538                 return res;
539         }
540
541         talloc_free(r);
542         return 0;
543 }
544
545
546 /*
547   cancel a ctdb_fetch_lock operation, releasing the lock
548  */
549 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 {
551         ctdb_ltdb_unlock(h->ctdb_db, h->key);
552         return 0;
553 }
554
555 /*
556   force the migration of a record to this node
557  */
558 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
559 {
560         struct ctdb_call call;
561         ZERO_STRUCT(call);
562         call.call_id = CTDB_NULL_FUNC;
563         call.key = key;
564         call.flags = CTDB_IMMEDIATE_MIGRATION;
565         return ctdb_call(ctdb_db, &call);
566 }
567
568 /*
569   get a lock on a record, and return the records data. Blocks until it gets the lock
570  */
571 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
572                                            TDB_DATA key, TDB_DATA *data)
573 {
574         int ret;
575         struct ctdb_record_handle *h;
576
577         /*
578           procedure is as follows:
579
580           1) get the chain lock. 
581           2) check if we are dmaster
582           3) if we are the dmaster then return handle 
583           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
584              reply from ctdbd
585           5) when we get the reply, goto (1)
586          */
587
588         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589         if (h == NULL) {
590                 return NULL;
591         }
592
593         h->ctdb_db = ctdb_db;
594         h->key     = key;
595         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
596         if (h->key.dptr == NULL) {
597                 talloc_free(h);
598                 return NULL;
599         }
600         h->data    = data;
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
603                  (const char *)key.dptr));
604
605 again:
606         /* step 1 - get the chain lock */
607         ret = ctdb_ltdb_lock(ctdb_db, key);
608         if (ret != 0) {
609                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610                 talloc_free(h);
611                 return NULL;
612         }
613
614         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
615
616         talloc_set_destructor(h, fetch_lock_destructor);
617
618         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
619
620         /* when torturing, ensure we test the remote path */
621         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
622             random() % 5 == 0) {
623                 h->header.dmaster = (uint32_t)-1;
624         }
625
626
627         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
628
629         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
630                 ctdb_ltdb_unlock(ctdb_db, key);
631                 ret = ctdb_client_force_migration(ctdb_db, key);
632                 if (ret != 0) {
633                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634                         talloc_free(h);
635                         return NULL;
636                 }
637                 goto again;
638         }
639
640         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641         return h;
642 }
643
644 /*
645   store some data to the record that was locked with ctdb_fetch_lock()
646 */
647 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
648 {
649         if (h->ctdb_db->persistent) {
650                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
651                 return -1;
652         }
653
654         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655 }
656
657 /*
658   non-locking fetch of a record
659  */
660 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
661                TDB_DATA key, TDB_DATA *data)
662 {
663         struct ctdb_call call;
664         int ret;
665
666         call.call_id = CTDB_FETCH_FUNC;
667         call.call_data.dptr = NULL;
668         call.call_data.dsize = 0;
669
670         ret = ctdb_call(ctdb_db, &call);
671
672         if (ret == 0) {
673                 *data = call.reply_data;
674                 talloc_steal(mem_ctx, data->dptr);
675         }
676
677         return ret;
678 }
679
680
681
682 /*
683    called when a control completes or timesout to invoke the callback
684    function the user provided
685 */
686 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
687         struct timeval t, void *private_data)
688 {
689         struct ctdb_client_control_state *state;
690         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691         int ret;
692
693         state = talloc_get_type(private_data, struct ctdb_client_control_state);
694         talloc_steal(tmp_ctx, state);
695
696         ret = ctdb_control_recv(state->ctdb, state, state,
697                         NULL, 
698                         NULL, 
699                         NULL);
700
701         talloc_free(tmp_ctx);
702 }
703
704 /*
705   called when a CTDB_REPLY_CONTROL packet comes in in the client
706
707   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
708   contains any reply data from the control
709 */
710 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
711                                       struct ctdb_req_header *hdr)
712 {
713         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
714         struct ctdb_client_control_state *state;
715
716         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
717         if (state == NULL) {
718                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
719                 return;
720         }
721
722         if (hdr->reqid != state->reqid) {
723                 /* we found a record  but it was the wrong one */
724                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
725                 return;
726         }
727
728         state->outdata.dptr = c->data;
729         state->outdata.dsize = c->datalen;
730         state->status = c->status;
731         if (c->errorlen) {
732                 state->errormsg = talloc_strndup(state, 
733                                                  (char *)&c->data[c->datalen], 
734                                                  c->errorlen);
735         }
736
737         /* state->outdata now uses resources from c so we dont want c
738            to just dissappear from under us while state is still alive
739         */
740         talloc_steal(state, c);
741
742         state->state = CTDB_CONTROL_DONE;
743
744         /* if we had a callback registered for this control, pull the response
745            and call the callback.
746         */
747         if (state->async.fn) {
748                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
749         }
750 }
751
752
753 /*
754   destroy a ctdb_control in client
755 */
756 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
757 {
758         ctdb_reqid_remove(state->ctdb, state->reqid);
759         return 0;
760 }
761
762
763 /* time out handler for ctdb_control */
764 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
765         struct timeval t, void *private_data)
766 {
767         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
768
769         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
770                          "dstnode:%u\n", state->reqid, state->c->opcode,
771                          state->c->hdr.destnode));
772
773         state->state = CTDB_CONTROL_TIMEOUT;
774
775         /* if we had a callback registered for this control, pull the response
776            and call the callback.
777         */
778         if (state->async.fn) {
779                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
780         }
781 }
782
783 /* async version of send control request */
784 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
785                 uint32_t destnode, uint64_t srvid, 
786                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
787                 TALLOC_CTX *mem_ctx,
788                 struct timeval *timeout,
789                 char **errormsg)
790 {
791         struct ctdb_client_control_state *state;
792         size_t len;
793         struct ctdb_req_control *c;
794         int ret;
795
796         if (errormsg) {
797                 *errormsg = NULL;
798         }
799
800         /* if the domain socket is not yet open, open it */
801         if (ctdb->daemon.sd==-1) {
802                 ctdb_socket_connect(ctdb);
803         }
804
805         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
806         CTDB_NO_MEMORY_NULL(ctdb, state);
807
808         state->ctdb       = ctdb;
809         state->reqid      = ctdb_reqid_new(ctdb, state);
810         state->state      = CTDB_CONTROL_WAIT;
811         state->errormsg   = NULL;
812
813         talloc_set_destructor(state, ctdb_control_destructor);
814
815         len = offsetof(struct ctdb_req_control, data) + data.dsize;
816         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
817                                len, struct ctdb_req_control);
818         state->c            = c;        
819         CTDB_NO_MEMORY_NULL(ctdb, c);
820         c->hdr.reqid        = state->reqid;
821         c->hdr.destnode     = destnode;
822         c->opcode           = opcode;
823         c->client_id        = 0;
824         c->flags            = flags;
825         c->srvid            = srvid;
826         c->datalen          = data.dsize;
827         if (data.dsize) {
828                 memcpy(&c->data[0], data.dptr, data.dsize);
829         }
830
831         /* timeout */
832         if (timeout && !timeval_is_zero(timeout)) {
833                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
834         }
835
836         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
837         if (ret != 0) {
838                 talloc_free(state);
839                 return NULL;
840         }
841
842         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
843                 talloc_free(state);
844                 return NULL;
845         }
846
847         return state;
848 }
849
850
851 /* async version of receive control reply */
852 int ctdb_control_recv(struct ctdb_context *ctdb, 
853                 struct ctdb_client_control_state *state, 
854                 TALLOC_CTX *mem_ctx,
855                 TDB_DATA *outdata, int32_t *status, char **errormsg)
856 {
857         TALLOC_CTX *tmp_ctx;
858
859         if (status != NULL) {
860                 *status = -1;
861         }
862         if (errormsg != NULL) {
863                 *errormsg = NULL;
864         }
865
866         if (state == NULL) {
867                 return -1;
868         }
869
870         /* prevent double free of state */
871         tmp_ctx = talloc_new(ctdb);
872         talloc_steal(tmp_ctx, state);
873
874         /* loop one event at a time until we either timeout or the control
875            completes.
876         */
877         while (state->state == CTDB_CONTROL_WAIT) {
878                 event_loop_once(ctdb->ev);
879         }
880
881         if (state->state != CTDB_CONTROL_DONE) {
882                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
883                 if (state->async.fn) {
884                         state->async.fn(state);
885                 }
886                 talloc_free(tmp_ctx);
887                 return -1;
888         }
889
890         if (state->errormsg) {
891                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
892                 if (errormsg) {
893                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
894                 }
895                 if (state->async.fn) {
896                         state->async.fn(state);
897                 }
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (outdata) {
903                 *outdata = state->outdata;
904                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
905         }
906
907         if (status) {
908                 *status = state->status;
909         }
910
911         if (state->async.fn) {
912                 state->async.fn(state);
913         }
914
915         talloc_free(tmp_ctx);
916         return 0;
917 }
918
919
920
921 /*
922   send a ctdb control message
923   timeout specifies how long we should wait for a reply.
924   if timeout is NULL we wait indefinitely
925  */
926 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
927                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
928                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
929                  struct timeval *timeout,
930                  char **errormsg)
931 {
932         struct ctdb_client_control_state *state;
933
934         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
935                         flags, data, mem_ctx,
936                         timeout, errormsg);
937         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
938                         errormsg);
939 }
940
941
942
943
944 /*
945   a process exists call. Returns 0 if process exists, -1 otherwise
946  */
947 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
948 {
949         int ret;
950         TDB_DATA data;
951         int32_t status;
952
953         data.dptr = (uint8_t*)&pid;
954         data.dsize = sizeof(pid);
955
956         ret = ctdb_control(ctdb, destnode, 0, 
957                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
958                            NULL, NULL, &status, NULL, NULL);
959         if (ret != 0) {
960                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
961                 return -1;
962         }
963
964         return status;
965 }
966
967 /*
968   get remote statistics
969  */
970 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
971 {
972         int ret;
973         TDB_DATA data;
974         int32_t res;
975
976         ret = ctdb_control(ctdb, destnode, 0, 
977                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
978                            ctdb, &data, &res, NULL, NULL);
979         if (ret != 0 || res != 0) {
980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
981                 return -1;
982         }
983
984         if (data.dsize != sizeof(struct ctdb_statistics)) {
985                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
986                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
987                       return -1;
988         }
989
990         *status = *(struct ctdb_statistics *)data.dptr;
991         talloc_free(data.dptr);
992                         
993         return 0;
994 }
995
996 /*
997   shutdown a remote ctdb node
998  */
999 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1000 {
1001         struct ctdb_client_control_state *state;
1002
1003         state = ctdb_control_send(ctdb, destnode, 0, 
1004                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1005                            NULL, &timeout, NULL);
1006         if (state == NULL) {
1007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1008                 return -1;
1009         }
1010
1011         return 0;
1012 }
1013
1014 /*
1015   get vnn map from a remote node
1016  */
1017 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1018 {
1019         int ret;
1020         TDB_DATA outdata;
1021         int32_t res;
1022         struct ctdb_vnn_map_wire *map;
1023
1024         ret = ctdb_control(ctdb, destnode, 0, 
1025                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1026                            mem_ctx, &outdata, &res, &timeout, NULL);
1027         if (ret != 0 || res != 0) {
1028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1029                 return -1;
1030         }
1031         
1032         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1033         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1034             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1035                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1036                 return -1;
1037         }
1038
1039         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1040         CTDB_NO_MEMORY(ctdb, *vnnmap);
1041         (*vnnmap)->generation = map->generation;
1042         (*vnnmap)->size       = map->size;
1043         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1044
1045         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1046         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1047         talloc_free(outdata.dptr);
1048                     
1049         return 0;
1050 }
1051
1052
1053 /*
1054   get the recovery mode of a remote node
1055  */
1056 struct ctdb_client_control_state *
1057 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1058 {
1059         return ctdb_control_send(ctdb, destnode, 0, 
1060                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1061                            mem_ctx, &timeout, NULL);
1062 }
1063
1064 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1065 {
1066         int ret;
1067         int32_t res;
1068
1069         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1072                 return -1;
1073         }
1074
1075         if (recmode) {
1076                 *recmode = (uint32_t)res;
1077         }
1078
1079         return 0;
1080 }
1081
1082 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1083 {
1084         struct ctdb_client_control_state *state;
1085
1086         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1087         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1088 }
1089
1090
1091
1092
1093 /*
1094   set the recovery mode of a remote node
1095  */
1096 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1097 {
1098         int ret;
1099         TDB_DATA data;
1100         int32_t res;
1101
1102         data.dsize = sizeof(uint32_t);
1103         data.dptr = (unsigned char *)&recmode;
1104
1105         ret = ctdb_control(ctdb, destnode, 0, 
1106                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1107                            NULL, NULL, &res, &timeout, NULL);
1108         if (ret != 0 || res != 0) {
1109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1110                 return -1;
1111         }
1112
1113         return 0;
1114 }
1115
1116
1117
1118 /*
1119   get the recovery master of a remote node
1120  */
1121 struct ctdb_client_control_state *
1122 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1123                         struct timeval timeout, uint32_t destnode)
1124 {
1125         return ctdb_control_send(ctdb, destnode, 0, 
1126                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1127                            mem_ctx, &timeout, NULL);
1128 }
1129
1130 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1131 {
1132         int ret;
1133         int32_t res;
1134
1135         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1136         if (ret != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1138                 return -1;
1139         }
1140
1141         if (recmaster) {
1142                 *recmaster = (uint32_t)res;
1143         }
1144
1145         return 0;
1146 }
1147
1148 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1149 {
1150         struct ctdb_client_control_state *state;
1151
1152         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1153         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1154 }
1155
1156
1157 /*
1158   set the recovery master of a remote node
1159  */
1160 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t res;
1165
1166         ZERO_STRUCT(data);
1167         data.dsize = sizeof(uint32_t);
1168         data.dptr = (unsigned char *)&recmaster;
1169
1170         ret = ctdb_control(ctdb, destnode, 0, 
1171                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1172                            NULL, NULL, &res, &timeout, NULL);
1173         if (ret != 0 || res != 0) {
1174                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1175                 return -1;
1176         }
1177
1178         return 0;
1179 }
1180
1181
1182 /*
1183   get a list of databases off a remote node
1184  */
1185 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1186                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1187 {
1188         int ret;
1189         TDB_DATA outdata;
1190         int32_t res;
1191
1192         ret = ctdb_control(ctdb, destnode, 0, 
1193                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1194                            mem_ctx, &outdata, &res, &timeout, NULL);
1195         if (ret != 0 || res != 0) {
1196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1197                 return -1;
1198         }
1199
1200         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1201         talloc_free(outdata.dptr);
1202                     
1203         return 0;
1204 }
1205
1206 /*
1207   get a list of nodes (vnn and flags ) from a remote node
1208  */
1209 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1210                 struct timeval timeout, uint32_t destnode, 
1211                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1212 {
1213         int ret;
1214         TDB_DATA outdata;
1215         int32_t res;
1216
1217         ret = ctdb_control(ctdb, destnode, 0, 
1218                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1219                            mem_ctx, &outdata, &res, &timeout, NULL);
1220         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1222                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1223         }
1224         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret, i, len;
1243         TDB_DATA outdata;
1244         struct ctdb_node_mapv4 *nodemapv4;
1245         int32_t res;
1246
1247         ret = ctdb_control(ctdb, destnode, 0, 
1248                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1249                            mem_ctx, &outdata, &res, &timeout, NULL);
1250         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1252                 return -1;
1253         }
1254
1255         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1256
1257         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1258         (*nodemap) = talloc_zero_size(mem_ctx, len);
1259         CTDB_NO_MEMORY(ctdb, (*nodemap));
1260
1261         (*nodemap)->num = nodemapv4->num;
1262         for (i=0; i<nodemapv4->num; i++) {
1263                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1264                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1265                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1266                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1267         }
1268                 
1269         talloc_free(outdata.dptr);
1270                     
1271         return 0;
1272 }
1273
1274 /*
1275   drop the transport, reload the nodes file and restart the transport
1276  */
1277 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1278                     struct timeval timeout, uint32_t destnode)
1279 {
1280         int ret;
1281         int32_t res;
1282
1283         ret = ctdb_control(ctdb, destnode, 0, 
1284                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1285                            NULL, NULL, &res, &timeout, NULL);
1286         if (ret != 0 || res != 0) {
1287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1288                 return -1;
1289         }
1290
1291         return 0;
1292 }
1293
1294
1295 /*
1296   set vnn map on a node
1297  */
1298 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1299                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1300 {
1301         int ret;
1302         TDB_DATA data;
1303         int32_t res;
1304         struct ctdb_vnn_map_wire *map;
1305         size_t len;
1306
1307         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1308         map = talloc_size(mem_ctx, len);
1309         CTDB_NO_MEMORY(ctdb, map);
1310
1311         map->generation = vnnmap->generation;
1312         map->size = vnnmap->size;
1313         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1314         
1315         data.dsize = len;
1316         data.dptr  = (uint8_t *)map;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1323                 return -1;
1324         }
1325
1326         talloc_free(map);
1327
1328         return 0;
1329 }
1330
1331
1332 /*
1333   async send for pull database
1334  */
1335 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1336         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1337         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1338 {
1339         TDB_DATA indata;
1340         struct ctdb_control_pulldb *pull;
1341         struct ctdb_client_control_state *state;
1342
1343         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1344         CTDB_NO_MEMORY_NULL(ctdb, pull);
1345
1346         pull->db_id   = dbid;
1347         pull->lmaster = lmaster;
1348
1349         indata.dsize = sizeof(struct ctdb_control_pulldb);
1350         indata.dptr  = (unsigned char *)pull;
1351
1352         state = ctdb_control_send(ctdb, destnode, 0, 
1353                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1354                                   mem_ctx, &timeout, NULL);
1355         talloc_free(pull);
1356
1357         return state;
1358 }
1359
1360 /*
1361   async recv for pull database
1362  */
1363 int ctdb_ctrl_pulldb_recv(
1364         struct ctdb_context *ctdb, 
1365         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1366         TDB_DATA *outdata)
1367 {
1368         int ret;
1369         int32_t res;
1370
1371         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1372         if ( (ret != 0) || (res != 0) ){
1373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1374                 return -1;
1375         }
1376
1377         return 0;
1378 }
1379
1380 /*
1381   pull all keys and records for a specific database on a node
1382  */
1383 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1384                 uint32_t dbid, uint32_t lmaster, 
1385                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1386                 TDB_DATA *outdata)
1387 {
1388         struct ctdb_client_control_state *state;
1389
1390         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1391                                       timeout);
1392         
1393         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1394 }
1395
1396
1397 /*
1398   change dmaster for all keys in the database to the new value
1399  */
1400 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1401                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1402 {
1403         int ret;
1404         TDB_DATA indata;
1405         int32_t res;
1406
1407         indata.dsize = 2*sizeof(uint32_t);
1408         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1409
1410         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1411         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424 /*
1425   ping a node, return number of clients connected
1426  */
1427 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1428 {
1429         int ret;
1430         int32_t res;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1433                            tdb_null, NULL, NULL, &res, NULL, NULL);
1434         if (ret != 0) {
1435                 return -1;
1436         }
1437         return res;
1438 }
1439
1440 /*
1441   find the real path to a ltdb 
1442  */
1443 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1444                    const char **path)
1445 {
1446         int ret;
1447         int32_t res;
1448         TDB_DATA data;
1449
1450         data.dptr = (uint8_t *)&dbid;
1451         data.dsize = sizeof(dbid);
1452
1453         ret = ctdb_control(ctdb, destnode, 0, 
1454                            CTDB_CONTROL_GETDBPATH, 0, data, 
1455                            mem_ctx, &data, &res, &timeout, NULL);
1456         if (ret != 0 || res != 0) {
1457                 return -1;
1458         }
1459
1460         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1461         if ((*path) == NULL) {
1462                 return -1;
1463         }
1464
1465         talloc_free(data.dptr);
1466
1467         return 0;
1468 }
1469
1470 /*
1471   find the name of a db 
1472  */
1473 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1474                    const char **name)
1475 {
1476         int ret;
1477         int32_t res;
1478         TDB_DATA data;
1479
1480         data.dptr = (uint8_t *)&dbid;
1481         data.dsize = sizeof(dbid);
1482
1483         ret = ctdb_control(ctdb, destnode, 0, 
1484                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1485                            mem_ctx, &data, &res, &timeout, NULL);
1486         if (ret != 0 || res != 0) {
1487                 return -1;
1488         }
1489
1490         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1491         if ((*name) == NULL) {
1492                 return -1;
1493         }
1494
1495         talloc_free(data.dptr);
1496
1497         return 0;
1498 }
1499
1500 /*
1501   create a database
1502  */
1503 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1504                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1505 {
1506         int ret;
1507         int32_t res;
1508         TDB_DATA data;
1509
1510         data.dptr = discard_const(name);
1511         data.dsize = strlen(name)+1;
1512
1513         ret = ctdb_control(ctdb, destnode, 0, 
1514                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1515                            0, data, 
1516                            mem_ctx, &data, &res, &timeout, NULL);
1517
1518         if (ret != 0 || res != 0) {
1519                 return -1;
1520         }
1521
1522         return 0;
1523 }
1524
1525 /*
1526   get debug level on a node
1527  */
1528 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1529 {
1530         int ret;
1531         int32_t res;
1532         TDB_DATA data;
1533
1534         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1535                            ctdb, &data, &res, NULL, NULL);
1536         if (ret != 0 || res != 0) {
1537                 return -1;
1538         }
1539         if (data.dsize != sizeof(int32_t)) {
1540                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1541                          (unsigned)data.dsize));
1542                 return -1;
1543         }
1544         *level = *(int32_t *)data.dptr;
1545         talloc_free(data.dptr);
1546         return 0;
1547 }
1548
1549 /*
1550   set debug level on a node
1551  */
1552 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1553 {
1554         int ret;
1555         int32_t res;
1556         TDB_DATA data;
1557
1558         data.dptr = (uint8_t *)&level;
1559         data.dsize = sizeof(level);
1560
1561         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1562                            NULL, NULL, &res, NULL, NULL);
1563         if (ret != 0 || res != 0) {
1564                 return -1;
1565         }
1566         return 0;
1567 }
1568
1569
1570 /*
1571   get a list of connected nodes
1572  */
1573 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1574                                 struct timeval timeout,
1575                                 TALLOC_CTX *mem_ctx,
1576                                 uint32_t *num_nodes)
1577 {
1578         struct ctdb_node_map *map=NULL;
1579         int ret, i;
1580         uint32_t *nodes;
1581
1582         *num_nodes = 0;
1583
1584         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1585         if (ret != 0) {
1586                 return NULL;
1587         }
1588
1589         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1590         if (nodes == NULL) {
1591                 return NULL;
1592         }
1593
1594         for (i=0;i<map->num;i++) {
1595                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1596                         nodes[*num_nodes] = map->nodes[i].pnn;
1597                         (*num_nodes)++;
1598                 }
1599         }
1600
1601         return nodes;
1602 }
1603
1604
1605 /*
1606   reset remote status
1607  */
1608 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1609 {
1610         int ret;
1611         int32_t res;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1615                            NULL, NULL, &res, NULL, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1618                 return -1;
1619         }
1620         return 0;
1621 }
1622
1623 /*
1624   this is the dummy null procedure that all databases support
1625 */
1626 static int ctdb_null_func(struct ctdb_call_info *call)
1627 {
1628         return 0;
1629 }
1630
1631 /*
1632   this is a plain fetch procedure that all databases support
1633 */
1634 static int ctdb_fetch_func(struct ctdb_call_info *call)
1635 {
1636         call->reply_data = &call->record_data;
1637         return 0;
1638 }
1639
1640 /*
1641   attach to a specific database - client call
1642 */
1643 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1644 {
1645         struct ctdb_db_context *ctdb_db;
1646         TDB_DATA data;
1647         int ret;
1648         int32_t res;
1649
1650         ctdb_db = ctdb_db_handle(ctdb, name);
1651         if (ctdb_db) {
1652                 return ctdb_db;
1653         }
1654
1655         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1656         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1657
1658         ctdb_db->ctdb = ctdb;
1659         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1660         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1661
1662         data.dptr = discard_const(name);
1663         data.dsize = strlen(name)+1;
1664
1665         /* tell ctdb daemon to attach */
1666         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1667                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1668                            0, data, ctdb_db, &data, &res, NULL, NULL);
1669         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1670                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1671                 talloc_free(ctdb_db);
1672                 return NULL;
1673         }
1674         
1675         ctdb_db->db_id = *(uint32_t *)data.dptr;
1676         talloc_free(data.dptr);
1677
1678         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1679         if (ret != 0) {
1680                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1681                 talloc_free(ctdb_db);
1682                 return NULL;
1683         }
1684
1685         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1686         if (!ctdb->do_setsched) {
1687                 tdb_flags |= TDB_NOMMAP;
1688         }
1689
1690         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1691         if (ctdb_db->ltdb == NULL) {
1692                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1693                 talloc_free(ctdb_db);
1694                 return NULL;
1695         }
1696
1697         ctdb_db->persistent = persistent;
1698
1699         DLIST_ADD(ctdb->db_list, ctdb_db);
1700
1701         /* add well known functions */
1702         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1703         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1704
1705         return ctdb_db;
1706 }
1707
1708
1709 /*
1710   setup a call for a database
1711  */
1712 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1713 {
1714         struct ctdb_registered_call *call;
1715
1716 #if 0
1717         TDB_DATA data;
1718         int32_t status;
1719         struct ctdb_control_set_call c;
1720         int ret;
1721
1722         /* this is no longer valid with the separate daemon architecture */
1723         c.db_id = ctdb_db->db_id;
1724         c.fn    = fn;
1725         c.id    = id;
1726
1727         data.dptr = (uint8_t *)&c;
1728         data.dsize = sizeof(c);
1729
1730         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1731                            data, NULL, NULL, &status, NULL, NULL);
1732         if (ret != 0 || status != 0) {
1733                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1734                 return -1;
1735         }
1736 #endif
1737
1738         /* also register locally */
1739         call = talloc(ctdb_db, struct ctdb_registered_call);
1740         call->fn = fn;
1741         call->id = id;
1742
1743         DLIST_ADD(ctdb_db->calls, call);        
1744         return 0;
1745 }
1746
1747
1748 struct traverse_state {
1749         bool done;
1750         uint32_t count;
1751         ctdb_traverse_func fn;
1752         void *private_data;
1753 };
1754
1755 /*
1756   called on each key during a ctdb_traverse
1757  */
1758 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1759 {
1760         struct traverse_state *state = (struct traverse_state *)p;
1761         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1762         TDB_DATA key;
1763
1764         if (data.dsize < sizeof(uint32_t) ||
1765             d->length != data.dsize) {
1766                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1767                 state->done = True;
1768                 return;
1769         }
1770
1771         key.dsize = d->keylen;
1772         key.dptr  = &d->data[0];
1773         data.dsize = d->datalen;
1774         data.dptr = &d->data[d->keylen];
1775
1776         if (key.dsize == 0 && data.dsize == 0) {
1777                 /* end of traverse */
1778                 state->done = True;
1779                 return;
1780         }
1781
1782         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1783                 /* empty records are deleted records in ctdb */
1784                 return;
1785         }
1786
1787         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1788                 state->done = True;
1789         }
1790
1791         state->count++;
1792 }
1793
1794
1795 /*
1796   start a cluster wide traverse, calling the supplied fn on each record
1797   return the number of records traversed, or -1 on error
1798  */
1799 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1800 {
1801         TDB_DATA data;
1802         struct ctdb_traverse_start t;
1803         int32_t status;
1804         int ret;
1805         uint64_t srvid = (getpid() | 0xFLL<<60);
1806         struct traverse_state state;
1807
1808         state.done = False;
1809         state.count = 0;
1810         state.private_data = private_data;
1811         state.fn = fn;
1812
1813         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1816                 return -1;
1817         }
1818
1819         t.db_id = ctdb_db->db_id;
1820         t.srvid = srvid;
1821         t.reqid = 0;
1822
1823         data.dptr = (uint8_t *)&t;
1824         data.dsize = sizeof(t);
1825
1826         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1827                            data, NULL, NULL, &status, NULL, NULL);
1828         if (ret != 0 || status != 0) {
1829                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1830                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1831                 return -1;
1832         }
1833
1834         while (!state.done) {
1835                 event_loop_once(ctdb_db->ctdb->ev);
1836         }
1837
1838         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1841                 return -1;
1842         }
1843
1844         return state.count;
1845 }
1846
1847 #define ISASCII(x) ((x>31)&&(x<128))
1848 /*
1849   called on each key during a catdb
1850  */
1851 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1852 {
1853         int i;
1854         FILE *f = (FILE *)p;
1855         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1856
1857         fprintf(f, "dmaster: %u\n", h->dmaster);
1858         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1859
1860         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1861         for (i=0;i<key.dsize;i++) {
1862                 if (ISASCII(key.dptr[i])) {
1863                         fprintf(f, "%c", key.dptr[i]);
1864                 } else {
1865                         fprintf(f, "\\%02X", key.dptr[i]);
1866                 }
1867         }
1868         fprintf(f, "\"\n");
1869
1870         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1871         for (i=sizeof(*h);i<data.dsize;i++) {
1872                 if (ISASCII(data.dptr[i])) {
1873                         fprintf(f, "%c", data.dptr[i]);
1874                 } else {
1875                         fprintf(f, "\\%02X", data.dptr[i]);
1876                 }
1877         }
1878         fprintf(f, "\"\n");
1879
1880         return 0;
1881 }
1882
1883 /*
1884   convenience function to list all keys to stdout
1885  */
1886 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1887 {
1888         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1889 }
1890
1891 /*
1892   get the pid of a ctdb daemon
1893  */
1894 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1895 {
1896         int ret;
1897         int32_t res;
1898
1899         ret = ctdb_control(ctdb, destnode, 0, 
1900                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1901                            NULL, NULL, &res, &timeout, NULL);
1902         if (ret != 0) {
1903                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1904                 return -1;
1905         }
1906
1907         *pid = res;
1908
1909         return 0;
1910 }
1911
1912
1913 /*
1914   async freeze send control
1915  */
1916 struct ctdb_client_control_state *
1917 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1918 {
1919         return ctdb_control_send(ctdb, destnode, priority, 
1920                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1921                            mem_ctx, &timeout, NULL);
1922 }
1923
1924 /* 
1925    async freeze recv control
1926 */
1927 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1928 {
1929         int ret;
1930         int32_t res;
1931
1932         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1933         if ( (ret != 0) || (res != 0) ){
1934                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1935                 return -1;
1936         }
1937
1938         return 0;
1939 }
1940
1941 /*
1942   freeze databases of a certain priority
1943  */
1944 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1945 {
1946         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1947         struct ctdb_client_control_state *state;
1948         int ret;
1949
1950         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1951         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1952         talloc_free(tmp_ctx);
1953
1954         return ret;
1955 }
1956
1957 /* Freeze all databases */
1958 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1959 {
1960         int i;
1961
1962         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1963                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1964                         return -1;
1965                 }
1966         }
1967         return 0;
1968 }
1969
1970 /*
1971   thaw databases of a certain priority
1972  */
1973 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1974 {
1975         int ret;
1976         int32_t res;
1977
1978         ret = ctdb_control(ctdb, destnode, priority, 
1979                            CTDB_CONTROL_THAW, 0, tdb_null, 
1980                            NULL, NULL, &res, &timeout, NULL);
1981         if (ret != 0 || res != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1983                 return -1;
1984         }
1985
1986         return 0;
1987 }
1988
1989 /* thaw all databases */
1990 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1991 {
1992         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1993 }
1994
1995 /*
1996   get pnn of a node, or -1
1997  */
1998 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1999 {
2000         int ret;
2001         int32_t res;
2002
2003         ret = ctdb_control(ctdb, destnode, 0, 
2004                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2005                            NULL, NULL, &res, &timeout, NULL);
2006         if (ret != 0) {
2007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2008                 return -1;
2009         }
2010
2011         return res;
2012 }
2013
2014 /*
2015   get the monitoring mode of a remote node
2016  */
2017 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2018 {
2019         int ret;
2020         int32_t res;
2021
2022         ret = ctdb_control(ctdb, destnode, 0, 
2023                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2024                            NULL, NULL, &res, &timeout, NULL);
2025         if (ret != 0) {
2026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2027                 return -1;
2028         }
2029
2030         *monmode = res;
2031
2032         return 0;
2033 }
2034
2035
2036 /*
2037  set the monitoring mode of a remote node to active
2038  */
2039 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2040 {
2041         int ret;
2042         
2043
2044         ret = ctdb_control(ctdb, destnode, 0, 
2045                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2046                            NULL, NULL,NULL, &timeout, NULL);
2047         if (ret != 0) {
2048                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2049                 return -1;
2050         }
2051
2052         
2053
2054         return 0;
2055 }
2056
2057 /*
2058   set the monitoring mode of a remote node to disable
2059  */
2060 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2061 {
2062         int ret;
2063         
2064
2065         ret = ctdb_control(ctdb, destnode, 0, 
2066                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2067                            NULL, NULL, NULL, &timeout, NULL);
2068         if (ret != 0) {
2069                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2070                 return -1;
2071         }
2072
2073         
2074
2075         return 0;
2076 }
2077
2078
2079
2080 /* 
2081   sent to a node to make it take over an ip address
2082 */
2083 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2084                           uint32_t destnode, struct ctdb_public_ip *ip)
2085 {
2086         TDB_DATA data;
2087         struct ctdb_public_ipv4 ipv4;
2088         int ret;
2089         int32_t res;
2090
2091         if (ip->addr.sa.sa_family == AF_INET) {
2092                 ipv4.pnn = ip->pnn;
2093                 ipv4.sin = ip->addr.ip;
2094
2095                 data.dsize = sizeof(ipv4);
2096                 data.dptr  = (uint8_t *)&ipv4;
2097
2098                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2099                            NULL, &res, &timeout, NULL);
2100         } else {
2101                 data.dsize = sizeof(*ip);
2102                 data.dptr  = (uint8_t *)ip;
2103
2104                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2105                            NULL, &res, &timeout, NULL);
2106         }
2107
2108         if (ret != 0 || res != 0) {
2109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2110                 return -1;
2111         }
2112
2113         return 0;       
2114 }
2115
2116
2117 /* 
2118   sent to a node to make it release an ip address
2119 */
2120 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2121                          uint32_t destnode, struct ctdb_public_ip *ip)
2122 {
2123         TDB_DATA data;
2124         struct ctdb_public_ipv4 ipv4;
2125         int ret;
2126         int32_t res;
2127
2128         if (ip->addr.sa.sa_family == AF_INET) {
2129                 ipv4.pnn = ip->pnn;
2130                 ipv4.sin = ip->addr.ip;
2131
2132                 data.dsize = sizeof(ipv4);
2133                 data.dptr  = (uint8_t *)&ipv4;
2134
2135                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2136                                    NULL, &res, &timeout, NULL);
2137         } else {
2138                 data.dsize = sizeof(*ip);
2139                 data.dptr  = (uint8_t *)ip;
2140
2141                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2142                                    NULL, &res, &timeout, NULL);
2143         }
2144
2145         if (ret != 0 || res != 0) {
2146                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2147                 return -1;
2148         }
2149
2150         return 0;       
2151 }
2152
2153
2154 /*
2155   get a tunable
2156  */
2157 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2158                           struct timeval timeout, 
2159                           uint32_t destnode,
2160                           const char *name, uint32_t *value)
2161 {
2162         struct ctdb_control_get_tunable *t;
2163         TDB_DATA data, outdata;
2164         int32_t res;
2165         int ret;
2166
2167         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2168         data.dptr  = talloc_size(ctdb, data.dsize);
2169         CTDB_NO_MEMORY(ctdb, data.dptr);
2170
2171         t = (struct ctdb_control_get_tunable *)data.dptr;
2172         t->length = strlen(name)+1;
2173         memcpy(t->name, name, t->length);
2174
2175         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2176                            &outdata, &res, &timeout, NULL);
2177         talloc_free(data.dptr);
2178         if (ret != 0 || res != 0) {
2179                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2180                 return -1;
2181         }
2182
2183         if (outdata.dsize != sizeof(uint32_t)) {
2184                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2185                 talloc_free(outdata.dptr);
2186                 return -1;
2187         }
2188         
2189         *value = *(uint32_t *)outdata.dptr;
2190         talloc_free(outdata.dptr);
2191
2192         return 0;
2193 }
2194
2195 /*
2196   set a tunable
2197  */
2198 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2199                           struct timeval timeout, 
2200                           uint32_t destnode,
2201                           const char *name, uint32_t value)
2202 {
2203         struct ctdb_control_set_tunable *t;
2204         TDB_DATA data;
2205         int32_t res;
2206         int ret;
2207
2208         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2209         data.dptr  = talloc_size(ctdb, data.dsize);
2210         CTDB_NO_MEMORY(ctdb, data.dptr);
2211
2212         t = (struct ctdb_control_set_tunable *)data.dptr;
2213         t->length = strlen(name)+1;
2214         memcpy(t->name, name, t->length);
2215         t->value = value;
2216
2217         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2218                            NULL, &res, &timeout, NULL);
2219         talloc_free(data.dptr);
2220         if (ret != 0 || res != 0) {
2221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2222                 return -1;
2223         }
2224
2225         return 0;
2226 }
2227
2228 /*
2229   list tunables
2230  */
2231 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2232                             struct timeval timeout, 
2233                             uint32_t destnode,
2234                             TALLOC_CTX *mem_ctx,
2235                             const char ***list, uint32_t *count)
2236 {
2237         TDB_DATA outdata;
2238         int32_t res;
2239         int ret;
2240         struct ctdb_control_list_tunable *t;
2241         char *p, *s, *ptr;
2242
2243         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2244                            mem_ctx, &outdata, &res, &timeout, NULL);
2245         if (ret != 0 || res != 0) {
2246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2247                 return -1;
2248         }
2249
2250         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2251         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2252             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2253                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2254                 talloc_free(outdata.dptr);
2255                 return -1;              
2256         }
2257         
2258         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2259         CTDB_NO_MEMORY(ctdb, p);
2260
2261         talloc_free(outdata.dptr);
2262         
2263         (*list) = NULL;
2264         (*count) = 0;
2265
2266         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2267                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2268                 CTDB_NO_MEMORY(ctdb, *list);
2269                 (*list)[*count] = talloc_strdup(*list, s);
2270                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2271                 (*count)++;
2272         }
2273
2274         talloc_free(p);
2275
2276         return 0;
2277 }
2278
2279
2280 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2281                         struct timeval timeout, uint32_t destnode, 
2282                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2283 {
2284         int ret;
2285         TDB_DATA outdata;
2286         int32_t res;
2287
2288         ret = ctdb_control(ctdb, destnode, 0, 
2289                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2290                            mem_ctx, &outdata, &res, &timeout, NULL);
2291         if (ret == 0 && res == -1) {
2292                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2293                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2294         }
2295         if (ret != 0 || res != 0) {
2296           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2297                 return -1;
2298         }
2299
2300         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2301         talloc_free(outdata.dptr);
2302                     
2303         return 0;
2304 }
2305
2306 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2307                         struct timeval timeout, uint32_t destnode, 
2308                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2309 {
2310         int ret, i, len;
2311         TDB_DATA outdata;
2312         int32_t res;
2313         struct ctdb_all_public_ipsv4 *ipsv4;
2314
2315         ret = ctdb_control(ctdb, destnode, 0, 
2316                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2317                            mem_ctx, &outdata, &res, &timeout, NULL);
2318         if (ret != 0 || res != 0) {
2319                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2320                 return -1;
2321         }
2322
2323         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2324         len = offsetof(struct ctdb_all_public_ips, ips) +
2325                 ipsv4->num*sizeof(struct ctdb_public_ip);
2326         *ips = talloc_zero_size(mem_ctx, len);
2327         CTDB_NO_MEMORY(ctdb, *ips);
2328         (*ips)->num = ipsv4->num;
2329         for (i=0; i<ipsv4->num; i++) {
2330                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2331                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2332         }
2333
2334         talloc_free(outdata.dptr);
2335                     
2336         return 0;
2337 }
2338
2339 /*
2340   set/clear the permanent disabled bit on a remote node
2341  */
2342 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2343                        uint32_t set, uint32_t clear)
2344 {
2345         int ret;
2346         TDB_DATA data;
2347         struct ctdb_node_map *nodemap=NULL;
2348         struct ctdb_node_flag_change c;
2349         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2350         uint32_t recmaster;
2351         uint32_t *nodes;
2352
2353
2354         /* find the recovery master */
2355         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2356         if (ret != 0) {
2357                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2358                 talloc_free(tmp_ctx);
2359                 return ret;
2360         }
2361
2362
2363         /* read the node flags from the recmaster */
2364         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2367                 talloc_free(tmp_ctx);
2368                 return -1;
2369         }
2370         if (destnode >= nodemap->num) {
2371                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2372                 talloc_free(tmp_ctx);
2373                 return -1;
2374         }
2375
2376         c.pnn       = destnode;
2377         c.old_flags = nodemap->nodes[destnode].flags;
2378         c.new_flags = c.old_flags;
2379         c.new_flags |= set;
2380         c.new_flags &= ~clear;
2381
2382         data.dsize = sizeof(c);
2383         data.dptr = (unsigned char *)&c;
2384
2385         /* send the flags update to all connected nodes */
2386         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2387
2388         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2389                                         nodes, 0,
2390                                         timeout, false, data,
2391                                         NULL, NULL,
2392                                         NULL) != 0) {
2393                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2394
2395                 talloc_free(tmp_ctx);
2396                 return -1;
2397         }
2398
2399         talloc_free(tmp_ctx);
2400         return 0;
2401 }
2402
2403
2404 /*
2405   get all tunables
2406  */
2407 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2408                                struct timeval timeout, 
2409                                uint32_t destnode,
2410                                struct ctdb_tunable *tunables)
2411 {
2412         TDB_DATA outdata;
2413         int ret;
2414         int32_t res;
2415
2416         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2417                            &outdata, &res, &timeout, NULL);
2418         if (ret != 0 || res != 0) {
2419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2420                 return -1;
2421         }
2422
2423         if (outdata.dsize != sizeof(*tunables)) {
2424                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2425                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2426                 return -1;              
2427         }
2428
2429         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2430         talloc_free(outdata.dptr);
2431         return 0;
2432 }
2433
2434 /*
2435   add a public address to a node
2436  */
2437 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2438                       struct timeval timeout, 
2439                       uint32_t destnode,
2440                       struct ctdb_control_ip_iface *pub)
2441 {
2442         TDB_DATA data;
2443         int32_t res;
2444         int ret;
2445
2446         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2447         data.dptr  = (unsigned char *)pub;
2448
2449         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2450                            NULL, &res, &timeout, NULL);
2451         if (ret != 0 || res != 0) {
2452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2453                 return -1;
2454         }
2455
2456         return 0;
2457 }
2458
2459 /*
2460   delete a public address from a node
2461  */
2462 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2463                       struct timeval timeout, 
2464                       uint32_t destnode,
2465                       struct ctdb_control_ip_iface *pub)
2466 {
2467         TDB_DATA data;
2468         int32_t res;
2469         int ret;
2470
2471         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2472         data.dptr  = (unsigned char *)pub;
2473
2474         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2475                            NULL, &res, &timeout, NULL);
2476         if (ret != 0 || res != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2478                 return -1;
2479         }
2480
2481         return 0;
2482 }
2483
2484 /*
2485   kill a tcp connection
2486  */
2487 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2488                       struct timeval timeout, 
2489                       uint32_t destnode,
2490                       struct ctdb_control_killtcp *killtcp)
2491 {
2492         TDB_DATA data;
2493         int32_t res;
2494         int ret;
2495
2496         data.dsize = sizeof(struct ctdb_control_killtcp);
2497         data.dptr  = (unsigned char *)killtcp;
2498
2499         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2500                            NULL, &res, &timeout, NULL);
2501         if (ret != 0 || res != 0) {
2502                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2503                 return -1;
2504         }
2505
2506         return 0;
2507 }
2508
2509 /*
2510   send a gratious arp
2511  */
2512 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2513                       struct timeval timeout, 
2514                       uint32_t destnode,
2515                       ctdb_sock_addr *addr,
2516                       const char *ifname)
2517 {
2518         TDB_DATA data;
2519         int32_t res;
2520         int ret, len;
2521         struct ctdb_control_gratious_arp *gratious_arp;
2522         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2523
2524
2525         len = strlen(ifname)+1;
2526         gratious_arp = talloc_size(tmp_ctx, 
2527                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2528         CTDB_NO_MEMORY(ctdb, gratious_arp);
2529
2530         gratious_arp->addr = *addr;
2531         gratious_arp->len = len;
2532         memcpy(&gratious_arp->iface[0], ifname, len);
2533
2534
2535         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2536         data.dptr  = (unsigned char *)gratious_arp;
2537
2538         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2539                            NULL, &res, &timeout, NULL);
2540         if (ret != 0 || res != 0) {
2541                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2542                 talloc_free(tmp_ctx);
2543                 return -1;
2544         }
2545
2546         talloc_free(tmp_ctx);
2547         return 0;
2548 }
2549
2550 /*
2551   get a list of all tcp tickles that a node knows about for a particular vnn
2552  */
2553 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2554                               struct timeval timeout, uint32_t destnode, 
2555                               TALLOC_CTX *mem_ctx, 
2556                               ctdb_sock_addr *addr,
2557                               struct ctdb_control_tcp_tickle_list **list)
2558 {
2559         int ret;
2560         TDB_DATA data, outdata;
2561         int32_t status;
2562
2563         data.dptr = (uint8_t*)addr;
2564         data.dsize = sizeof(ctdb_sock_addr);
2565
2566         ret = ctdb_control(ctdb, destnode, 0, 
2567                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2568                            mem_ctx, &outdata, &status, NULL, NULL);
2569         if (ret != 0 || status != 0) {
2570                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2571                 return -1;
2572         }
2573
2574         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2575
2576         return status;
2577 }
2578
2579 /*
2580   register a server id
2581  */
2582 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2583                       struct timeval timeout, 
2584                       struct ctdb_server_id *id)
2585 {
2586         TDB_DATA data;
2587         int32_t res;
2588         int ret;
2589
2590         data.dsize = sizeof(struct ctdb_server_id);
2591         data.dptr  = (unsigned char *)id;
2592
2593         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2594                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2595                         0, data, NULL,
2596                         NULL, &res, &timeout, NULL);
2597         if (ret != 0 || res != 0) {
2598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2599                 return -1;
2600         }
2601
2602         return 0;
2603 }
2604
2605 /*
2606   unregister a server id
2607  */
2608 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2609                       struct timeval timeout, 
2610                       struct ctdb_server_id *id)
2611 {
2612         TDB_DATA data;
2613         int32_t res;
2614         int ret;
2615
2616         data.dsize = sizeof(struct ctdb_server_id);
2617         data.dptr  = (unsigned char *)id;
2618
2619         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2620                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2621                         0, data, NULL,
2622                         NULL, &res, &timeout, NULL);
2623         if (ret != 0 || res != 0) {
2624                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2625                 return -1;
2626         }
2627
2628         return 0;
2629 }
2630
2631
2632 /*
2633   check if a server id exists
2634
2635   if a server id does exist, return *status == 1, otherwise *status == 0
2636  */
2637 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2638                       struct timeval timeout, 
2639                       uint32_t destnode,
2640                       struct ctdb_server_id *id,
2641                       uint32_t *status)
2642 {
2643         TDB_DATA data;
2644         int32_t res;
2645         int ret;
2646
2647         data.dsize = sizeof(struct ctdb_server_id);
2648         data.dptr  = (unsigned char *)id;
2649
2650         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2651                         0, data, NULL,
2652                         NULL, &res, &timeout, NULL);
2653         if (ret != 0) {
2654                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2655                 return -1;
2656         }
2657
2658         if (res) {
2659                 *status = 1;
2660         } else {
2661                 *status = 0;
2662         }
2663
2664         return 0;
2665 }
2666
2667 /*
2668    get the list of server ids that are registered on a node
2669 */
2670 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2671                 TALLOC_CTX *mem_ctx,
2672                 struct timeval timeout, uint32_t destnode, 
2673                 struct ctdb_server_id_list **svid_list)
2674 {
2675         int ret;
2676         TDB_DATA outdata;
2677         int32_t res;
2678
2679         ret = ctdb_control(ctdb, destnode, 0, 
2680                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2681                            mem_ctx, &outdata, &res, &timeout, NULL);
2682         if (ret != 0 || res != 0) {
2683                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2684                 return -1;
2685         }
2686
2687         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2688                     
2689         return 0;
2690 }
2691
2692 /*
2693   initialise the ctdb daemon for client applications
2694
2695   NOTE: In current code the daemon does not fork. This is for testing purposes only
2696   and to simplify the code.
2697 */
2698 struct ctdb_context *ctdb_init(struct event_context *ev)
2699 {
2700         int ret;
2701         struct ctdb_context *ctdb;
2702
2703         ctdb = talloc_zero(ev, struct ctdb_context);
2704         if (ctdb == NULL) {
2705                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2706                 return NULL;
2707         }
2708         ctdb->ev  = ev;
2709         ctdb->idr = idr_init(ctdb);
2710         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2711
2712         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2713         if (ret != 0) {
2714                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2715                 talloc_free(ctdb);
2716                 return NULL;
2717         }
2718
2719         return ctdb;
2720 }
2721
2722
2723 /*
2724   set some ctdb flags
2725 */
2726 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2727 {
2728         ctdb->flags |= flags;
2729 }
2730
2731 /*
2732   setup the local socket name
2733 */
2734 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2735 {
2736         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2737         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2738
2739         return 0;
2740 }
2741
2742 /*
2743   return the pnn of this node
2744 */
2745 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2746 {
2747         return ctdb->pnn;
2748 }
2749
2750
2751 /*
2752   get the uptime of a remote node
2753  */
2754 struct ctdb_client_control_state *
2755 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2756 {
2757         return ctdb_control_send(ctdb, destnode, 0, 
2758                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2759                            mem_ctx, &timeout, NULL);
2760 }
2761
2762 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2763 {
2764         int ret;
2765         int32_t res;
2766         TDB_DATA outdata;
2767
2768         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2769         if (ret != 0 || res != 0) {
2770                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2771                 return -1;
2772         }
2773
2774         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2775
2776         return 0;
2777 }
2778
2779 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2780 {
2781         struct ctdb_client_control_state *state;
2782
2783         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2784         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2785 }
2786
2787 /*
2788   send a control to execute the "recovered" event script on a node
2789  */
2790 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2791 {
2792         int ret;
2793         int32_t status;
2794
2795         ret = ctdb_control(ctdb, destnode, 0, 
2796                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2797                            NULL, NULL, &status, &timeout, NULL);
2798         if (ret != 0 || status != 0) {
2799                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2800                 return -1;
2801         }
2802
2803         return 0;
2804 }
2805
2806 /* 
2807   callback for the async helpers used when sending the same control
2808   to multiple nodes in parallell.
2809 */
2810 static void async_callback(struct ctdb_client_control_state *state)
2811 {
2812         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2813         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2814         int ret;
2815         TDB_DATA outdata;
2816         int32_t res;
2817         uint32_t destnode = state->c->hdr.destnode;
2818
2819         /* one more node has responded with recmode data */
2820         data->count--;
2821
2822         /* if we failed to push the db, then return an error and let
2823            the main loop try again.
2824         */
2825         if (state->state != CTDB_CONTROL_DONE) {
2826                 if ( !data->dont_log_errors) {
2827                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2828                 }
2829                 data->fail_count++;
2830                 if (data->fail_callback) {
2831                         data->fail_callback(ctdb, destnode, res, outdata,
2832                                         data->callback_data);
2833                 }
2834                 return;
2835         }
2836         
2837         state->async.fn = NULL;
2838
2839         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2840         if ((ret != 0) || (res != 0)) {
2841                 if ( !data->dont_log_errors) {
2842                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2843                 }
2844                 data->fail_count++;
2845                 if (data->fail_callback) {
2846                         data->fail_callback(ctdb, destnode, res, outdata,
2847                                         data->callback_data);
2848                 }
2849         }
2850         if ((ret == 0) && (data->callback != NULL)) {
2851                 data->callback(ctdb, destnode, res, outdata,
2852                                         data->callback_data);
2853         }
2854 }
2855
2856
2857 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2858 {
2859         /* set up the callback functions */
2860         state->async.fn = async_callback;
2861         state->async.private_data = data;
2862         
2863         /* one more control to wait for to complete */
2864         data->count++;
2865 }
2866
2867
2868 /* wait for up to the maximum number of seconds allowed
2869    or until all nodes we expect a response from has replied
2870 */
2871 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2872 {
2873         while (data->count > 0) {
2874                 event_loop_once(ctdb->ev);
2875         }
2876         if (data->fail_count != 0) {
2877                 if (!data->dont_log_errors) {
2878                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2879                                  data->fail_count));
2880                 }
2881                 return -1;
2882         }
2883         return 0;
2884 }
2885
2886
2887 /* 
2888    perform a simple control on the listed nodes
2889    The control cannot return data
2890  */
2891 int ctdb_client_async_control(struct ctdb_context *ctdb,
2892                                 enum ctdb_controls opcode,
2893                                 uint32_t *nodes,
2894                                 uint64_t srvid,
2895                                 struct timeval timeout,
2896                                 bool dont_log_errors,
2897                                 TDB_DATA data,
2898                                 client_async_callback client_callback,
2899                                 client_async_callback fail_callback,
2900                                 void *callback_data)
2901 {
2902         struct client_async_data *async_data;
2903         struct ctdb_client_control_state *state;
2904         int j, num_nodes;
2905
2906         async_data = talloc_zero(ctdb, struct client_async_data);
2907         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2908         async_data->dont_log_errors = dont_log_errors;
2909         async_data->callback = client_callback;
2910         async_data->fail_callback = fail_callback;
2911         async_data->callback_data = callback_data;
2912         async_data->opcode        = opcode;
2913
2914         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2915
2916         /* loop over all nodes and send an async control to each of them */
2917         for (j=0; j<num_nodes; j++) {
2918                 uint32_t pnn = nodes[j];
2919
2920                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2921                                           0, data, async_data, &timeout, NULL);
2922                 if (state == NULL) {
2923                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2924                         talloc_free(async_data);
2925                         return -1;
2926                 }
2927                 
2928                 ctdb_client_async_add(async_data, state);
2929         }
2930
2931         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2932                 talloc_free(async_data);
2933                 return -1;
2934         }
2935
2936         talloc_free(async_data);
2937         return 0;
2938 }
2939
2940 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2941                                 struct ctdb_vnn_map *vnn_map,
2942                                 TALLOC_CTX *mem_ctx,
2943                                 bool include_self)
2944 {
2945         int i, j, num_nodes;
2946         uint32_t *nodes;
2947
2948         for (i=num_nodes=0;i<vnn_map->size;i++) {
2949                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2950                         continue;
2951                 }
2952                 num_nodes++;
2953         } 
2954
2955         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2956         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2957
2958         for (i=j=0;i<vnn_map->size;i++) {
2959                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2960                         continue;
2961                 }
2962                 nodes[j++] = vnn_map->map[i];
2963         } 
2964
2965         return nodes;
2966 }
2967
2968 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2969                                 struct ctdb_node_map *node_map,
2970                                 TALLOC_CTX *mem_ctx,
2971                                 bool include_self)
2972 {
2973         int i, j, num_nodes;
2974         uint32_t *nodes;
2975
2976         for (i=num_nodes=0;i<node_map->num;i++) {
2977                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2978                         continue;
2979                 }
2980                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2981                         continue;
2982                 }
2983                 num_nodes++;
2984         } 
2985
2986         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2987         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2988
2989         for (i=j=0;i<node_map->num;i++) {
2990                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2991                         continue;
2992                 }
2993                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2994                         continue;
2995                 }
2996                 nodes[j++] = node_map->nodes[i].pnn;
2997         } 
2998
2999         return nodes;
3000 }
3001
3002 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3003                                 struct ctdb_node_map *node_map,
3004                                 TALLOC_CTX *mem_ctx,
3005                                 uint32_t pnn)
3006 {
3007         int i, j, num_nodes;
3008         uint32_t *nodes;
3009
3010         for (i=num_nodes=0;i<node_map->num;i++) {
3011                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3012                         continue;
3013                 }
3014                 if (node_map->nodes[i].pnn == pnn) {
3015                         continue;
3016                 }
3017                 num_nodes++;
3018         } 
3019
3020         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3021         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3022
3023         for (i=j=0;i<node_map->num;i++) {
3024                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3025                         continue;
3026                 }
3027                 if (node_map->nodes[i].pnn == pnn) {
3028                         continue;
3029                 }
3030                 nodes[j++] = node_map->nodes[i].pnn;
3031         } 
3032
3033         return nodes;
3034 }
3035
3036 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3037                                 struct ctdb_node_map *node_map,
3038                                 TALLOC_CTX *mem_ctx,
3039                                 bool include_self)
3040 {
3041         int i, j, num_nodes;
3042         uint32_t *nodes;
3043
3044         for (i=num_nodes=0;i<node_map->num;i++) {
3045                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3046                         continue;
3047                 }
3048                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3049                         continue;
3050                 }
3051                 num_nodes++;
3052         } 
3053
3054         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3055         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3056
3057         for (i=j=0;i<node_map->num;i++) {
3058                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3059                         continue;
3060                 }
3061                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3062                         continue;
3063                 }
3064                 nodes[j++] = node_map->nodes[i].pnn;
3065         } 
3066
3067         return nodes;
3068 }
3069
3070 /* 
3071   this is used to test if a pnn lock exists and if it exists will return
3072   the number of connections that pnn has reported or -1 if that recovery
3073   daemon is not running.
3074 */
3075 int
3076 ctdb_read_pnn_lock(int fd, int32_t pnn)
3077 {
3078         struct flock lock;
3079         char c;
3080
3081         lock.l_type = F_WRLCK;
3082         lock.l_whence = SEEK_SET;
3083         lock.l_start = pnn;
3084         lock.l_len = 1;
3085         lock.l_pid = 0;
3086
3087         if (fcntl(fd, F_GETLK, &lock) != 0) {
3088                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3089                 return -1;
3090         }
3091
3092         if (lock.l_type == F_UNLCK) {
3093                 return -1;
3094         }
3095
3096         if (pread(fd, &c, 1, pnn) == -1) {
3097                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3098                 return -1;
3099         }
3100
3101         return c;
3102 }
3103
3104 /*
3105   get capabilities of a remote node
3106  */
3107 struct ctdb_client_control_state *
3108 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3109 {
3110         return ctdb_control_send(ctdb, destnode, 0, 
3111                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3112                            mem_ctx, &timeout, NULL);
3113 }
3114
3115 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3116 {
3117         int ret;
3118         int32_t res;
3119         TDB_DATA outdata;
3120
3121         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3122         if ( (ret != 0) || (res != 0) ) {
3123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3124                 return -1;
3125         }
3126
3127         if (capabilities) {
3128                 *capabilities = *((uint32_t *)outdata.dptr);
3129         }
3130
3131         return 0;
3132 }
3133
3134 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3135 {
3136         struct ctdb_client_control_state *state;
3137         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3138         int ret;
3139
3140         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3141         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3142         talloc_free(tmp_ctx);
3143         return ret;
3144 }
3145
3146 /**
3147  * check whether a transaction is active on a given db on a given node
3148  */
3149 static int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3150                                             uint32_t destnode,
3151                                             uint32_t db_id)
3152 {
3153         int32_t status;
3154         int ret;
3155         TDB_DATA indata;
3156
3157         indata.dptr = (uint8_t *)&db_id;
3158         indata.dsize = sizeof(db_id);
3159
3160         ret = ctdb_control(ctdb, destnode, 0,
3161                            CTDB_CONTROL_TRANS2_ACTIVE,
3162                            0, indata, NULL, NULL, &status,
3163                            NULL, NULL);
3164
3165         if (ret != 0) {
3166                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3167                 return -1;
3168         }
3169
3170         return status;
3171 }
3172
3173
3174 struct ctdb_transaction_handle {
3175         struct ctdb_db_context *ctdb_db;
3176         bool in_replay;
3177         /*
3178          * we store the reads and writes done under a transaction:
3179          * - one list stores both reads and writes (m_all),
3180          * - the other just writes (m_write)
3181          */
3182         struct ctdb_marshall_buffer *m_all;
3183         struct ctdb_marshall_buffer *m_write;
3184 };
3185
3186 /* start a transaction on a database */
3187 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3188 {
3189         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3190         return 0;
3191 }
3192
3193 /* start a transaction on a database */
3194 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3195 {
3196         struct ctdb_record_handle *rh;
3197         TDB_DATA key;
3198         TDB_DATA data;
3199         struct ctdb_ltdb_header header;
3200         TALLOC_CTX *tmp_ctx;
3201         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3202         int ret;
3203         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3204         pid_t pid;
3205         int32_t status;
3206
3207         key.dptr = discard_const(keyname);
3208         key.dsize = strlen(keyname);
3209
3210         if (!ctdb_db->persistent) {
3211                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3212                 return -1;
3213         }
3214
3215 again:
3216         tmp_ctx = talloc_new(h);
3217
3218         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3219         if (rh == NULL) {
3220                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3221                 talloc_free(tmp_ctx);
3222                 return -1;
3223         }
3224
3225         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3226                                               CTDB_CURRENT_NODE,
3227                                               ctdb_db->db_id);
3228         if (status == 1) {
3229                 DEBUG(DEBUG_NOTICE, (__location__ " transaction is active "
3230                                      "on db_id[0x%08x]. waiting for 1 second\n",
3231                                      ctdb_db->db_id));
3232                 talloc_free(tmp_ctx);
3233                 sleep(1);
3234                 goto again;
3235         }
3236
3237         /*
3238          * store the pid in the database:
3239          * it is not enough that the node is dmaster...
3240          */
3241         pid = getpid();
3242         data.dptr = (unsigned char *)&pid;
3243         data.dsize = sizeof(pid_t);
3244         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3245         if (ret != 0) {
3246                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3247                                   "transaction record\n"));
3248                 talloc_free(tmp_ctx);
3249                 return -1;
3250         }
3251
3252         talloc_free(rh);
3253
3254         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3255         if (ret != 0) {
3256                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3257                 talloc_free(tmp_ctx);
3258                 return -1;
3259         }
3260
3261         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3262         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3263                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3264                 talloc_free(tmp_ctx);
3265                 goto again;
3266         }
3267
3268         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3269                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3270                 talloc_free(tmp_ctx);
3271                 goto again;
3272         }
3273
3274         talloc_free(tmp_ctx);
3275
3276         return 0;
3277 }
3278
3279
3280 /* start a transaction on a database */
3281 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3282                                                        TALLOC_CTX *mem_ctx)
3283 {
3284         struct ctdb_transaction_handle *h;
3285         int ret;
3286
3287         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3288         if (h == NULL) {
3289                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3290                 return NULL;
3291         }
3292
3293         h->ctdb_db = ctdb_db;
3294
3295         ret = ctdb_transaction_fetch_start(h);
3296         if (ret != 0) {
3297                 talloc_free(h);
3298                 return NULL;
3299         }
3300
3301         talloc_set_destructor(h, ctdb_transaction_destructor);
3302
3303         return h;
3304 }
3305
3306
3307
3308 /*
3309   fetch a record inside a transaction
3310  */
3311 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3312                            TALLOC_CTX *mem_ctx, 
3313                            TDB_DATA key, TDB_DATA *data)
3314 {
3315         struct ctdb_ltdb_header header;
3316         int ret;
3317
3318         ZERO_STRUCT(header);
3319
3320         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3321         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3322                 /* record doesn't exist yet */
3323                 *data = tdb_null;
3324                 ret = 0;
3325         }
3326         
3327         if (ret != 0) {
3328                 return ret;
3329         }
3330
3331         if (!h->in_replay) {
3332                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3333                 if (h->m_all == NULL) {
3334                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3335                         return -1;
3336                 }
3337         }
3338
3339         return 0;
3340 }
3341
3342 /*
3343   stores a record inside a transaction
3344  */
3345 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3346                            TDB_DATA key, TDB_DATA data)
3347 {
3348         TALLOC_CTX *tmp_ctx = talloc_new(h);
3349         struct ctdb_ltdb_header header;
3350         TDB_DATA olddata;
3351         int ret;
3352
3353         ZERO_STRUCT(header);
3354
3355         /* we need the header so we can update the RSN */
3356         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3357         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3358                 /* the record doesn't exist - create one with us as dmaster.
3359                    This is only safe because we are in a transaction and this
3360                    is a persistent database */
3361                 ZERO_STRUCT(header);
3362         } else if (ret != 0) {
3363                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3364                 talloc_free(tmp_ctx);
3365                 return ret;
3366         }
3367
3368         if (data.dsize == olddata.dsize &&
3369             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3370                 /* save writing the same data */
3371                 talloc_free(tmp_ctx);
3372                 return 0;
3373         }
3374
3375         header.dmaster = h->ctdb_db->ctdb->pnn;
3376         header.rsn++;
3377
3378         if (!h->in_replay) {
3379                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3380                 if (h->m_all == NULL) {
3381                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3382                         talloc_free(tmp_ctx);
3383                         return -1;
3384                 }
3385         }               
3386
3387         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3388         if (h->m_write == NULL) {
3389                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3390                 talloc_free(tmp_ctx);
3391                 return -1;
3392         }
3393         
3394         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3395
3396         talloc_free(tmp_ctx);
3397         
3398         return ret;
3399 }
3400
3401 /*
3402   replay a transaction
3403  */
3404 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3405 {
3406         int ret, i;
3407         struct ctdb_rec_data *rec = NULL;
3408
3409         h->in_replay = true;
3410         talloc_free(h->m_write);
3411         h->m_write = NULL;
3412
3413         ret = ctdb_transaction_fetch_start(h);
3414         if (ret != 0) {
3415                 return ret;
3416         }
3417
3418         for (i=0;i<h->m_all->count;i++) {
3419                 TDB_DATA key, data;
3420
3421                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3422                 if (rec == NULL) {
3423                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3424                         goto failed;
3425                 }
3426
3427                 if (rec->reqid == 0) {
3428                         /* its a store */
3429                         if (ctdb_transaction_store(h, key, data) != 0) {
3430                                 goto failed;
3431                         }
3432                 } else {
3433                         TDB_DATA data2;
3434                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3435
3436                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3437                                 talloc_free(tmp_ctx);
3438                                 goto failed;
3439                         }
3440                         if (data2.dsize != data.dsize ||
3441                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3442                                 /* the record has changed on us - we have to give up */
3443                                 talloc_free(tmp_ctx);
3444                                 goto failed;
3445                         }
3446                         talloc_free(tmp_ctx);
3447                 }
3448         }
3449         
3450         return 0;
3451
3452 failed:
3453         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3454         return -1;
3455 }
3456
3457
3458 /*
3459   commit a transaction
3460  */
3461 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3462 {
3463         int ret, retries=0;
3464         int32_t status;
3465         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3466         struct timeval timeout;
3467         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3468
3469         talloc_set_destructor(h, NULL);
3470
3471         /* our commit strategy is quite complex.
3472
3473            - we first try to commit the changes to all other nodes
3474
3475            - if that works, then we commit locally and we are done
3476
3477            - if a commit on another node fails, then we need to cancel
3478              the transaction, then restart the transaction (thus
3479              opening a window of time for a pending recovery to
3480              complete), then replay the transaction, checking all the
3481              reads and writes (checking that reads give the same data,
3482              and writes succeed). Then we retry the transaction to the
3483              other nodes
3484         */
3485
3486 again:
3487         if (h->m_write == NULL) {
3488                 /* no changes were made */
3489                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3490                 talloc_free(h);
3491                 return 0;
3492         }
3493
3494         /* tell ctdbd to commit to the other nodes */
3495         timeout = timeval_current_ofs(1, 0);
3496         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3497                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3498                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3499                            &timeout, NULL);
3500         if (ret != 0 || status != 0) {
3501                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3502                 DEBUG(DEBUG_WARNING, (__location__ " transaction commit%s failed"
3503                                       ", retrying after 1 second...\n",
3504                                       (retries==0)?"":"retry "));
3505                 sleep(1);
3506
3507                 if (ret != 0) {
3508                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3509                 } else {
3510                         /* work out what error code we will give if we 
3511                            have to fail the operation */
3512                         switch ((enum ctdb_trans2_commit_error)status) {
3513                         case CTDB_TRANS2_COMMIT_SUCCESS:
3514                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3515                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3516                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3517                                 break;
3518                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3519                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3520                                 break;
3521                         }
3522                 }
3523
3524                 if (++retries == 10) {
3525                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3526                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3527                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3528                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3529                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3530                         talloc_free(h);
3531                         return -1;
3532                 }               
3533
3534                 if (ctdb_replay_transaction(h) != 0) {
3535                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3536                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3537                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3538                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3539                         talloc_free(h);
3540                         return -1;
3541                 }
3542                 goto again;
3543         } else {
3544                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3545         }
3546
3547         /* do the real commit locally */
3548         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3549         if (ret != 0) {
3550                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3551                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3552                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3553                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3554                 talloc_free(h);
3555                 return ret;
3556         }
3557
3558         /* tell ctdbd that we are finished with our local commit */
3559         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3560                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3561                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3562         talloc_free(h);
3563         return 0;
3564 }
3565
3566 /*
3567   recovery daemon ping to main daemon
3568  */
3569 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3570 {
3571         int ret;
3572         int32_t res;
3573
3574         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3575                            ctdb, NULL, &res, NULL, NULL);
3576         if (ret != 0 || res != 0) {
3577                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3578                 return -1;
3579         }
3580
3581         return 0;
3582 }
3583
3584 /* when forking the main daemon and the child process needs to connect back
3585  * to the daemon as a client process, this function can be used to change
3586  * the ctdb context from daemon into client mode
3587  */
3588 int switch_from_server_to_client(struct ctdb_context *ctdb)
3589 {
3590         int ret;
3591
3592         /* shutdown the transport */
3593         if (ctdb->methods) {
3594                 ctdb->methods->shutdown(ctdb);
3595         }
3596
3597         /* get a new event context */
3598         talloc_free(ctdb->ev);
3599         ctdb->ev = event_context_init(ctdb);
3600
3601         close(ctdb->daemon.sd);
3602         ctdb->daemon.sd = -1;
3603
3604         /* the client does not need to be realtime */
3605         if (ctdb->do_setsched) {
3606                 ctdb_restore_scheduler(ctdb);
3607         }
3608
3609         /* initialise ctdb */
3610         ret = ctdb_socket_connect(ctdb);
3611         if (ret != 0) {
3612                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3613                 return -1;
3614         }
3615
3616          return 0;
3617 }
3618
3619 /*
3620   tell the main daemon we are starting a new monitor event script
3621  */
3622 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3623 {
3624         int ret;
3625         int32_t res;
3626
3627         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3628                            ctdb, NULL, &res, NULL, NULL);
3629         if (ret != 0 || res != 0) {
3630                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3631                 return -1;
3632         }
3633
3634         return 0;
3635 }
3636
3637 /*
3638   tell the main daemon we are starting a new monitor event script
3639  */
3640 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3641 {
3642         int ret;
3643         int32_t res;
3644
3645         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3646                            ctdb, NULL, &res, NULL, NULL);
3647         if (ret != 0 || res != 0) {
3648                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3649                 return -1;
3650         }
3651
3652         return 0;
3653 }
3654
3655 /*
3656   tell the main daemon we are starting to run an eventscript
3657  */
3658 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3659 {
3660         int ret;
3661         int32_t res;
3662         TDB_DATA data;
3663
3664         data.dptr = discard_const(name);
3665         data.dsize = strlen(name)+1;
3666
3667         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3668                            ctdb, NULL, &res, NULL, NULL);
3669         if (ret != 0 || res != 0) {
3670                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3671                 return -1;
3672         }
3673
3674         return 0;
3675 }
3676
3677 /*
3678   tell the main daemon the status of the script we ran
3679  */
3680 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3681 {
3682         int ret;
3683         int32_t res;
3684         TDB_DATA data;
3685
3686         data.dptr = (uint8_t *)&result;
3687         data.dsize = sizeof(result);
3688
3689         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3690                            ctdb, NULL, &res, NULL, NULL);
3691         if (ret != 0 || res != 0) {
3692                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3693                 return -1;
3694         }
3695
3696         return 0;
3697 }
3698
3699 /*
3700   tell the main daemon a script was disabled
3701  */
3702 int ctdb_ctrl_event_script_disabled(struct ctdb_context *ctdb, const char *name)
3703 {
3704         int ret;
3705         int32_t res;
3706         TDB_DATA data;
3707
3708         data.dptr = discard_const(name);
3709         data.dsize = strlen(name)+1;
3710
3711         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_DISABLED, 0, data, 
3712                            ctdb, NULL, &res, NULL, NULL);
3713         if (ret != 0 || res != 0) {
3714                 DEBUG(DEBUG_ERR,("Failed to send event_script_disabeld\n"));
3715                 return -1;
3716         }
3717
3718         return 0;
3719 }
3720
3721 /*
3722   get the status of running the monitor eventscripts
3723  */
3724 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3725                 struct timeval timeout, uint32_t destnode, 
3726                 TALLOC_CTX *mem_ctx,
3727                 struct ctdb_monitoring_wire **script_status)
3728 {
3729         int ret;
3730         TDB_DATA outdata;
3731         int32_t res;
3732
3733         ret = ctdb_control(ctdb, destnode, 0, 
3734                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3735                            mem_ctx, &outdata, &res, &timeout, NULL);
3736         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3737                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3738                 return -1;
3739         }
3740
3741         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3742         talloc_free(outdata.dptr);
3743                     
3744         return 0;
3745 }
3746
3747 /*
3748   tell the main daemon how long it took to lock the reclock file
3749  */
3750 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3751 {
3752         int ret;
3753         int32_t res;
3754         TDB_DATA data;
3755
3756         data.dptr = (uint8_t *)&latency;
3757         data.dsize = sizeof(latency);
3758
3759         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3760                            ctdb, NULL, &res, NULL, NULL);
3761         if (ret != 0 || res != 0) {
3762                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3763                 return -1;
3764         }
3765
3766         return 0;
3767 }
3768
3769 /*
3770   get the name of the reclock file
3771  */
3772 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3773                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3774                          const char **name)
3775 {
3776         int ret;
3777         int32_t res;
3778         TDB_DATA data;
3779
3780         ret = ctdb_control(ctdb, destnode, 0, 
3781                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3782                            mem_ctx, &data, &res, &timeout, NULL);
3783         if (ret != 0 || res != 0) {
3784                 return -1;
3785         }
3786
3787         if (data.dsize == 0) {
3788                 *name = NULL;
3789         } else {
3790                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3791         }
3792         talloc_free(data.dptr);
3793
3794         return 0;
3795 }
3796
3797 /*
3798   set the reclock filename for a node
3799  */
3800 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3801 {
3802         int ret;
3803         TDB_DATA data;
3804         int32_t res;
3805
3806         if (reclock == NULL) {
3807                 data.dsize = 0;
3808                 data.dptr  = NULL;
3809         } else {
3810                 data.dsize = strlen(reclock) + 1;
3811                 data.dptr  = discard_const(reclock);
3812         }
3813
3814         ret = ctdb_control(ctdb, destnode, 0, 
3815                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3816                            NULL, NULL, &res, &timeout, NULL);
3817         if (ret != 0 || res != 0) {
3818                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3819                 return -1;
3820         }
3821
3822         return 0;
3823 }
3824
3825 /*
3826   stop a node
3827  */
3828 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3829 {
3830         int ret;
3831         int32_t res;
3832
3833         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3834                            ctdb, NULL, &res, &timeout, NULL);
3835         if (ret != 0 || res != 0) {
3836                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3837                 return -1;
3838         }
3839
3840         return 0;
3841 }
3842
3843 /*
3844   continue a node
3845  */
3846 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3847 {
3848         int ret;
3849
3850         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3851                            ctdb, NULL, NULL, &timeout, NULL);
3852         if (ret != 0) {
3853                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3854                 return -1;
3855         }
3856
3857         return 0;
3858 }
3859
3860 /*
3861   set the natgw state for a node
3862  */
3863 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3864 {
3865         int ret;
3866         TDB_DATA data;
3867         int32_t res;
3868
3869         data.dsize = sizeof(natgwstate);
3870         data.dptr  = (uint8_t *)&natgwstate;
3871
3872         ret = ctdb_control(ctdb, destnode, 0, 
3873                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3874                            NULL, NULL, &res, &timeout, NULL);
3875         if (ret != 0 || res != 0) {
3876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3877                 return -1;
3878         }
3879
3880         return 0;
3881 }
3882
3883 /*
3884   set the lmaster role for a node
3885  */
3886 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3887 {
3888         int ret;
3889         TDB_DATA data;
3890         int32_t res;
3891
3892         data.dsize = sizeof(lmasterrole);
3893         data.dptr  = (uint8_t *)&lmasterrole;
3894
3895         ret = ctdb_control(ctdb, destnode, 0, 
3896                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3897                            NULL, NULL, &res, &timeout, NULL);
3898         if (ret != 0 || res != 0) {
3899                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3900                 return -1;
3901         }
3902
3903         return 0;
3904 }
3905
3906 /*
3907   set the recmaster role for a node
3908  */
3909 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3910 {
3911         int ret;
3912         TDB_DATA data;
3913         int32_t res;
3914
3915         data.dsize = sizeof(recmasterrole);
3916         data.dptr  = (uint8_t *)&recmasterrole;
3917
3918         ret = ctdb_control(ctdb, destnode, 0, 
3919                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3920                            NULL, NULL, &res, &timeout, NULL);
3921         if (ret != 0 || res != 0) {
3922                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3923                 return -1;
3924         }
3925
3926         return 0;
3927 }
3928
3929 /* enable an eventscript
3930  */
3931 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3932 {
3933         int ret;
3934         TDB_DATA data;
3935         int32_t res;
3936
3937         data.dsize = strlen(script) + 1;
3938         data.dptr  = discard_const(script);
3939
3940         ret = ctdb_control(ctdb, destnode, 0, 
3941                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3942                            NULL, NULL, &res, &timeout, NULL);
3943         if (ret != 0 || res != 0) {
3944                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3945                 return -1;
3946         }
3947
3948         return 0;
3949 }
3950
3951 /* disable an eventscript
3952  */
3953 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3954 {
3955         int ret;
3956         TDB_DATA data;
3957         int32_t res;
3958
3959         data.dsize = strlen(script) + 1;
3960         data.dptr  = discard_const(script);
3961
3962         ret = ctdb_control(ctdb, destnode, 0, 
3963                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3964                            NULL, NULL, &res, &timeout, NULL);
3965         if (ret != 0 || res != 0) {
3966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3967                 return -1;
3968         }
3969
3970         return 0;
3971 }
3972
3973
3974 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3975 {
3976         int ret;
3977         TDB_DATA data;
3978         int32_t res;
3979
3980         data.dsize = sizeof(*bantime);
3981         data.dptr  = (uint8_t *)bantime;
3982
3983         ret = ctdb_control(ctdb, destnode, 0, 
3984                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3985                            NULL, NULL, &res, &timeout, NULL);
3986         if (ret != 0 || res != 0) {
3987                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3988                 return -1;
3989         }
3990
3991         return 0;
3992 }
3993
3994
3995 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3996 {
3997         int ret;
3998         TDB_DATA outdata;
3999         int32_t res;
4000         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4001
4002         ret = ctdb_control(ctdb, destnode, 0, 
4003                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4004                            tmp_ctx, &outdata, &res, &timeout, NULL);
4005         if (ret != 0 || res != 0) {
4006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4007                 talloc_free(tmp_ctx);
4008                 return -1;
4009         }
4010
4011         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4012         talloc_free(tmp_ctx);
4013
4014         return 0;
4015 }
4016
4017
4018 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4019 {
4020         int ret;
4021         int32_t res;
4022         TDB_DATA data;
4023         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4024
4025         data.dptr = (uint8_t*)db_prio;
4026         data.dsize = sizeof(*db_prio);
4027
4028         ret = ctdb_control(ctdb, destnode, 0, 
4029                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4030                            tmp_ctx, NULL, &res, &timeout, NULL);
4031         if (ret != 0 || res != 0) {
4032                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4033                 talloc_free(tmp_ctx);
4034                 return -1;
4035         }
4036
4037         talloc_free(tmp_ctx);
4038
4039         return 0;
4040 }
4041
4042 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4043 {
4044         int ret;
4045         int32_t res;
4046         TDB_DATA data;
4047         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4048
4049         data.dptr = (uint8_t*)&db_id;
4050         data.dsize = sizeof(db_id);
4051
4052         ret = ctdb_control(ctdb, destnode, 0, 
4053                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4054                            tmp_ctx, NULL, &res, &timeout, NULL);
4055         if (ret != 0 || res < 0) {
4056                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4057                 talloc_free(tmp_ctx);
4058                 return -1;
4059         }
4060
4061         if (priority) {
4062                 *priority = res;
4063         }
4064
4065         talloc_free(tmp_ctx);
4066
4067         return 0;
4068 }