Revert "update the "uptime" command to indicate the "time since last" is the time...
[metze/ctdb/wip.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return 0;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372
373         return state;
374 }
375
376 /*
377   make a ctdb call to the local daemon - async send. Called from client context.
378
379   This constructs a ctdb_call request and queues it for processing. 
380   This call never blocks.
381 */
382 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
383                                               struct ctdb_call *call)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         struct ctdb_ltdb_header header;
388         TDB_DATA data;
389         int ret;
390         size_t len;
391         struct ctdb_req_call *c;
392
393         /* if the domain socket is not yet open, open it */
394         if (ctdb->daemon.sd==-1) {
395                 ctdb_socket_connect(ctdb);
396         }
397
398         ret = ctdb_ltdb_lock(ctdb_db, call->key);
399         if (ret != 0) {
400                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
401                 return NULL;
402         }
403
404         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
405
406         if (ret == 0 && header.dmaster == ctdb->pnn) {
407                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
408                 talloc_free(data.dptr);
409                 ctdb_ltdb_unlock(ctdb_db, call->key);
410                 return state;
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, call->key);
414         talloc_free(data.dptr);
415
416         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
419                 return NULL;
420         }
421         state->call = talloc_zero(state, struct ctdb_call);
422         if (state->call == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
424                 return NULL;
425         }
426
427         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
428         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
429         if (c == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
431                 return NULL;
432         }
433
434         state->reqid     = ctdb_reqid_new(ctdb, state);
435         state->ctdb_db = ctdb_db;
436         talloc_set_destructor(state, ctdb_client_call_destructor);
437
438         c->hdr.reqid     = state->reqid;
439         c->flags         = call->flags;
440         c->db_id         = ctdb_db->db_id;
441         c->callid        = call->call_id;
442         c->hopcount      = 0;
443         c->keylen        = call->key.dsize;
444         c->calldatalen   = call->call_data.dsize;
445         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
446         memcpy(&c->data[call->key.dsize], 
447                call->call_data.dptr, call->call_data.dsize);
448         *(state->call)              = *call;
449         state->call->call_data.dptr = &c->data[call->key.dsize];
450         state->call->key.dptr       = &c->data[0];
451
452         state->state  = CTDB_CALL_WAIT;
453
454
455         ctdb_client_queue_pkt(ctdb, &c->hdr);
456
457         return state;
458 }
459
460
461 /*
462   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
463 */
464 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
465 {
466         struct ctdb_client_call_state *state;
467
468         state = ctdb_call_send(ctdb_db, call);
469         return ctdb_call_recv(state, call);
470 }
471
472
473 /*
474   tell the daemon what messaging srvid we will use, and register the message
475   handler function in the client
476 */
477 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
478                              ctdb_message_fn_t handler,
479                              void *private_data)
480                                     
481 {
482         int res;
483         int32_t status;
484         
485         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
486                            tdb_null, NULL, NULL, &status, NULL, NULL);
487         if (res != 0 || status != 0) {
488                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
489                 return -1;
490         }
491
492         /* also need to register the handler with our own ctdb structure */
493         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
494 }
495
496 /*
497   tell the daemon we no longer want a srvid
498 */
499 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         ctdb_deregister_message_handler(ctdb, srvid, private_data);
513         return 0;
514 }
515
516
517 /*
518   send a message - from client context
519  */
520 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
521                       uint64_t srvid, TDB_DATA data)
522 {
523         struct ctdb_req_message *r;
524         int len, res;
525
526         len = offsetof(struct ctdb_req_message, data) + data.dsize;
527         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
528                                len, struct ctdb_req_message);
529         CTDB_NO_MEMORY(ctdb, r);
530
531         r->hdr.destnode  = pnn;
532         r->srvid         = srvid;
533         r->datalen       = data.dsize;
534         memcpy(&r->data[0], data.dptr, data.dsize);
535         
536         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
537         if (res != 0) {
538                 return res;
539         }
540
541         talloc_free(r);
542         return 0;
543 }
544
545
546 /*
547   cancel a ctdb_fetch_lock operation, releasing the lock
548  */
549 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 {
551         ctdb_ltdb_unlock(h->ctdb_db, h->key);
552         return 0;
553 }
554
555 /*
556   force the migration of a record to this node
557  */
558 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
559 {
560         struct ctdb_call call;
561         ZERO_STRUCT(call);
562         call.call_id = CTDB_NULL_FUNC;
563         call.key = key;
564         call.flags = CTDB_IMMEDIATE_MIGRATION;
565         return ctdb_call(ctdb_db, &call);
566 }
567
568 /*
569   get a lock on a record, and return the records data. Blocks until it gets the lock
570  */
571 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
572                                            TDB_DATA key, TDB_DATA *data)
573 {
574         int ret;
575         struct ctdb_record_handle *h;
576
577         /*
578           procedure is as follows:
579
580           1) get the chain lock. 
581           2) check if we are dmaster
582           3) if we are the dmaster then return handle 
583           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
584              reply from ctdbd
585           5) when we get the reply, goto (1)
586          */
587
588         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589         if (h == NULL) {
590                 return NULL;
591         }
592
593         h->ctdb_db = ctdb_db;
594         h->key     = key;
595         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
596         if (h->key.dptr == NULL) {
597                 talloc_free(h);
598                 return NULL;
599         }
600         h->data    = data;
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
603                  (const char *)key.dptr));
604
605 again:
606         /* step 1 - get the chain lock */
607         ret = ctdb_ltdb_lock(ctdb_db, key);
608         if (ret != 0) {
609                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610                 talloc_free(h);
611                 return NULL;
612         }
613
614         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
615
616         talloc_set_destructor(h, fetch_lock_destructor);
617
618         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
619
620         /* when torturing, ensure we test the remote path */
621         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
622             random() % 5 == 0) {
623                 h->header.dmaster = (uint32_t)-1;
624         }
625
626
627         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
628
629         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
630                 ctdb_ltdb_unlock(ctdb_db, key);
631                 ret = ctdb_client_force_migration(ctdb_db, key);
632                 if (ret != 0) {
633                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634                         talloc_free(h);
635                         return NULL;
636                 }
637                 goto again;
638         }
639
640         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641         return h;
642 }
643
644 /*
645   store some data to the record that was locked with ctdb_fetch_lock()
646 */
647 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
648 {
649         if (h->ctdb_db->persistent) {
650                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
651                 return -1;
652         }
653
654         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655 }
656
657 /*
658   non-locking fetch of a record
659  */
660 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
661                TDB_DATA key, TDB_DATA *data)
662 {
663         struct ctdb_call call;
664         int ret;
665
666         call.call_id = CTDB_FETCH_FUNC;
667         call.call_data.dptr = NULL;
668         call.call_data.dsize = 0;
669
670         ret = ctdb_call(ctdb_db, &call);
671
672         if (ret == 0) {
673                 *data = call.reply_data;
674                 talloc_steal(mem_ctx, data->dptr);
675         }
676
677         return ret;
678 }
679
680
681
682 /*
683    called when a control completes or timesout to invoke the callback
684    function the user provided
685 */
686 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
687         struct timeval t, void *private_data)
688 {
689         struct ctdb_client_control_state *state;
690         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691         int ret;
692
693         state = talloc_get_type(private_data, struct ctdb_client_control_state);
694         talloc_steal(tmp_ctx, state);
695
696         ret = ctdb_control_recv(state->ctdb, state, state,
697                         NULL, 
698                         NULL, 
699                         NULL);
700
701         talloc_free(tmp_ctx);
702 }
703
704 /*
705   called when a CTDB_REPLY_CONTROL packet comes in in the client
706
707   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
708   contains any reply data from the control
709 */
710 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
711                                       struct ctdb_req_header *hdr)
712 {
713         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
714         struct ctdb_client_control_state *state;
715
716         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
717         if (state == NULL) {
718                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
719                 return;
720         }
721
722         if (hdr->reqid != state->reqid) {
723                 /* we found a record  but it was the wrong one */
724                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
725                 return;
726         }
727
728         state->outdata.dptr = c->data;
729         state->outdata.dsize = c->datalen;
730         state->status = c->status;
731         if (c->errorlen) {
732                 state->errormsg = talloc_strndup(state, 
733                                                  (char *)&c->data[c->datalen], 
734                                                  c->errorlen);
735         }
736
737         /* state->outdata now uses resources from c so we dont want c
738            to just dissappear from under us while state is still alive
739         */
740         talloc_steal(state, c);
741
742         state->state = CTDB_CONTROL_DONE;
743
744         /* if we had a callback registered for this control, pull the response
745            and call the callback.
746         */
747         if (state->async.fn) {
748                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
749         }
750 }
751
752
753 /*
754   destroy a ctdb_control in client
755 */
756 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
757 {
758         ctdb_reqid_remove(state->ctdb, state->reqid);
759         return 0;
760 }
761
762
763 /* time out handler for ctdb_control */
764 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
765         struct timeval t, void *private_data)
766 {
767         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
768
769         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
770
771         state->state = CTDB_CONTROL_TIMEOUT;
772
773         /* if we had a callback registered for this control, pull the response
774            and call the callback.
775         */
776         if (state->async.fn) {
777                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
778         }
779 }
780
781 /* async version of send control request */
782 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
783                 uint32_t destnode, uint64_t srvid, 
784                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
785                 TALLOC_CTX *mem_ctx,
786                 struct timeval *timeout,
787                 char **errormsg)
788 {
789         struct ctdb_client_control_state *state;
790         size_t len;
791         struct ctdb_req_control *c;
792         int ret;
793
794         if (errormsg) {
795                 *errormsg = NULL;
796         }
797
798         /* if the domain socket is not yet open, open it */
799         if (ctdb->daemon.sd==-1) {
800                 ctdb_socket_connect(ctdb);
801         }
802
803         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
804         CTDB_NO_MEMORY_NULL(ctdb, state);
805
806         state->ctdb       = ctdb;
807         state->reqid      = ctdb_reqid_new(ctdb, state);
808         state->state      = CTDB_CONTROL_WAIT;
809         state->errormsg   = NULL;
810
811         talloc_set_destructor(state, ctdb_control_destructor);
812
813         len = offsetof(struct ctdb_req_control, data) + data.dsize;
814         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
815                                len, struct ctdb_req_control);
816         state->c            = c;        
817         CTDB_NO_MEMORY_NULL(ctdb, c);
818         c->hdr.reqid        = state->reqid;
819         c->hdr.destnode     = destnode;
820         c->opcode           = opcode;
821         c->client_id        = 0;
822         c->flags            = flags;
823         c->srvid            = srvid;
824         c->datalen          = data.dsize;
825         if (data.dsize) {
826                 memcpy(&c->data[0], data.dptr, data.dsize);
827         }
828
829         /* timeout */
830         if (timeout && !timeval_is_zero(timeout)) {
831                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
832         }
833
834         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
835         if (ret != 0) {
836                 talloc_free(state);
837                 return NULL;
838         }
839
840         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
841                 talloc_free(state);
842                 return NULL;
843         }
844
845         return state;
846 }
847
848
849 /* async version of receive control reply */
850 int ctdb_control_recv(struct ctdb_context *ctdb, 
851                 struct ctdb_client_control_state *state, 
852                 TALLOC_CTX *mem_ctx,
853                 TDB_DATA *outdata, int32_t *status, char **errormsg)
854 {
855         TALLOC_CTX *tmp_ctx;
856
857         if (status != NULL) {
858                 *status = -1;
859         }
860         if (errormsg != NULL) {
861                 *errormsg = NULL;
862         }
863
864         if (state == NULL) {
865                 return -1;
866         }
867
868         /* prevent double free of state */
869         tmp_ctx = talloc_new(ctdb);
870         talloc_steal(tmp_ctx, state);
871
872         /* loop one event at a time until we either timeout or the control
873            completes.
874         */
875         while (state->state == CTDB_CONTROL_WAIT) {
876                 event_loop_once(ctdb->ev);
877         }
878
879         if (state->state != CTDB_CONTROL_DONE) {
880                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
881                 if (state->async.fn) {
882                         state->async.fn(state);
883                 }
884                 talloc_free(tmp_ctx);
885                 return -1;
886         }
887
888         if (state->errormsg) {
889                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
890                 if (errormsg) {
891                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
892                 }
893                 if (state->async.fn) {
894                         state->async.fn(state);
895                 }
896                 talloc_free(tmp_ctx);
897                 return -1;
898         }
899
900         if (outdata) {
901                 *outdata = state->outdata;
902                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
903         }
904
905         if (status) {
906                 *status = state->status;
907         }
908
909         if (state->async.fn) {
910                 state->async.fn(state);
911         }
912
913         talloc_free(tmp_ctx);
914         return 0;
915 }
916
917
918
919 /*
920   send a ctdb control message
921   timeout specifies how long we should wait for a reply.
922   if timeout is NULL we wait indefinitely
923  */
924 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
925                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
926                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
927                  struct timeval *timeout,
928                  char **errormsg)
929 {
930         struct ctdb_client_control_state *state;
931
932         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
933                         flags, data, mem_ctx,
934                         timeout, errormsg);
935         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
936                         errormsg);
937 }
938
939
940
941
942 /*
943   a process exists call. Returns 0 if process exists, -1 otherwise
944  */
945 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
946 {
947         int ret;
948         TDB_DATA data;
949         int32_t status;
950
951         data.dptr = (uint8_t*)&pid;
952         data.dsize = sizeof(pid);
953
954         ret = ctdb_control(ctdb, destnode, 0, 
955                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
956                            NULL, NULL, &status, NULL, NULL);
957         if (ret != 0) {
958                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
959                 return -1;
960         }
961
962         return status;
963 }
964
965 /*
966   get remote statistics
967  */
968 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
969 {
970         int ret;
971         TDB_DATA data;
972         int32_t res;
973
974         ret = ctdb_control(ctdb, destnode, 0, 
975                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
976                            ctdb, &data, &res, NULL, NULL);
977         if (ret != 0 || res != 0) {
978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
979                 return -1;
980         }
981
982         if (data.dsize != sizeof(struct ctdb_statistics)) {
983                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
984                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
985                       return -1;
986         }
987
988         *status = *(struct ctdb_statistics *)data.dptr;
989         talloc_free(data.dptr);
990                         
991         return 0;
992 }
993
994 /*
995   shutdown a remote ctdb node
996  */
997 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
998 {
999         struct ctdb_client_control_state *state;
1000
1001         state = ctdb_control_send(ctdb, destnode, 0, 
1002                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1003                            NULL, &timeout, NULL);
1004         if (state == NULL) {
1005                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1006                 return -1;
1007         }
1008
1009         return 0;
1010 }
1011
1012 /*
1013   get vnn map from a remote node
1014  */
1015 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1016 {
1017         int ret;
1018         TDB_DATA outdata;
1019         int32_t res;
1020         struct ctdb_vnn_map_wire *map;
1021
1022         ret = ctdb_control(ctdb, destnode, 0, 
1023                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1024                            mem_ctx, &outdata, &res, &timeout, NULL);
1025         if (ret != 0 || res != 0) {
1026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1027                 return -1;
1028         }
1029         
1030         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1031         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1032             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1033                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1034                 return -1;
1035         }
1036
1037         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1038         CTDB_NO_MEMORY(ctdb, *vnnmap);
1039         (*vnnmap)->generation = map->generation;
1040         (*vnnmap)->size       = map->size;
1041         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1042
1043         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1044         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1045         talloc_free(outdata.dptr);
1046                     
1047         return 0;
1048 }
1049
1050
1051 /*
1052   get the recovery mode of a remote node
1053  */
1054 struct ctdb_client_control_state *
1055 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1056 {
1057         return ctdb_control_send(ctdb, destnode, 0, 
1058                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1059                            mem_ctx, &timeout, NULL);
1060 }
1061
1062 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1063 {
1064         int ret;
1065         int32_t res;
1066
1067         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1068         if (ret != 0) {
1069                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1070                 return -1;
1071         }
1072
1073         if (recmode) {
1074                 *recmode = (uint32_t)res;
1075         }
1076
1077         return 0;
1078 }
1079
1080 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1081 {
1082         struct ctdb_client_control_state *state;
1083
1084         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1085         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1086 }
1087
1088
1089
1090
1091 /*
1092   set the recovery mode of a remote node
1093  */
1094 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1095 {
1096         int ret;
1097         TDB_DATA data;
1098         int32_t res;
1099
1100         data.dsize = sizeof(uint32_t);
1101         data.dptr = (unsigned char *)&recmode;
1102
1103         ret = ctdb_control(ctdb, destnode, 0, 
1104                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1105                            NULL, NULL, &res, &timeout, NULL);
1106         if (ret != 0 || res != 0) {
1107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1108                 return -1;
1109         }
1110
1111         return 0;
1112 }
1113
1114
1115
1116 /*
1117   get the recovery master of a remote node
1118  */
1119 struct ctdb_client_control_state *
1120 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1121                         struct timeval timeout, uint32_t destnode)
1122 {
1123         return ctdb_control_send(ctdb, destnode, 0, 
1124                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1125                            mem_ctx, &timeout, NULL);
1126 }
1127
1128 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1129 {
1130         int ret;
1131         int32_t res;
1132
1133         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1134         if (ret != 0) {
1135                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1136                 return -1;
1137         }
1138
1139         if (recmaster) {
1140                 *recmaster = (uint32_t)res;
1141         }
1142
1143         return 0;
1144 }
1145
1146 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1147 {
1148         struct ctdb_client_control_state *state;
1149
1150         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1151         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1152 }
1153
1154
1155 /*
1156   set the recovery master of a remote node
1157  */
1158 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1159 {
1160         int ret;
1161         TDB_DATA data;
1162         int32_t res;
1163
1164         ZERO_STRUCT(data);
1165         data.dsize = sizeof(uint32_t);
1166         data.dptr = (unsigned char *)&recmaster;
1167
1168         ret = ctdb_control(ctdb, destnode, 0, 
1169                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1170                            NULL, NULL, &res, &timeout, NULL);
1171         if (ret != 0 || res != 0) {
1172                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1173                 return -1;
1174         }
1175
1176         return 0;
1177 }
1178
1179
1180 /*
1181   get a list of databases off a remote node
1182  */
1183 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1184                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1185 {
1186         int ret;
1187         TDB_DATA outdata;
1188         int32_t res;
1189
1190         ret = ctdb_control(ctdb, destnode, 0, 
1191                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1192                            mem_ctx, &outdata, &res, &timeout, NULL);
1193         if (ret != 0 || res != 0) {
1194                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1195                 return -1;
1196         }
1197
1198         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1199         talloc_free(outdata.dptr);
1200                     
1201         return 0;
1202 }
1203
1204 /*
1205   get a list of nodes (vnn and flags ) from a remote node
1206  */
1207 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1208                 struct timeval timeout, uint32_t destnode, 
1209                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1210 {
1211         int ret;
1212         TDB_DATA outdata;
1213         int32_t res;
1214
1215         ret = ctdb_control(ctdb, destnode, 0, 
1216                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1217                            mem_ctx, &outdata, &res, &timeout, NULL);
1218         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1219                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1220                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1221         }
1222         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1223                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1224                 return -1;
1225         }
1226
1227         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1228         talloc_free(outdata.dptr);
1229                     
1230         return 0;
1231 }
1232
1233 /*
1234   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1235  */
1236 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1237                 struct timeval timeout, uint32_t destnode, 
1238                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1239 {
1240         int ret, i, len;
1241         TDB_DATA outdata;
1242         struct ctdb_node_mapv4 *nodemapv4;
1243         int32_t res;
1244
1245         ret = ctdb_control(ctdb, destnode, 0, 
1246                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1247                            mem_ctx, &outdata, &res, &timeout, NULL);
1248         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1250                 return -1;
1251         }
1252
1253         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1254
1255         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1256         (*nodemap) = talloc_zero_size(mem_ctx, len);
1257         CTDB_NO_MEMORY(ctdb, (*nodemap));
1258
1259         (*nodemap)->num = nodemapv4->num;
1260         for (i=0; i<nodemapv4->num; i++) {
1261                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1262                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1263                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1264                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1265         }
1266                 
1267         talloc_free(outdata.dptr);
1268                     
1269         return 0;
1270 }
1271
1272 /*
1273   drop the transport, reload the nodes file and restart the transport
1274  */
1275 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1276                     struct timeval timeout, uint32_t destnode)
1277 {
1278         int ret;
1279         int32_t res;
1280
1281         ret = ctdb_control(ctdb, destnode, 0, 
1282                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1283                            NULL, NULL, &res, &timeout, NULL);
1284         if (ret != 0 || res != 0) {
1285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1286                 return -1;
1287         }
1288
1289         return 0;
1290 }
1291
1292
1293 /*
1294   set vnn map on a node
1295  */
1296 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1297                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1298 {
1299         int ret;
1300         TDB_DATA data;
1301         int32_t res;
1302         struct ctdb_vnn_map_wire *map;
1303         size_t len;
1304
1305         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1306         map = talloc_size(mem_ctx, len);
1307         CTDB_NO_MEMORY(ctdb, map);
1308
1309         map->generation = vnnmap->generation;
1310         map->size = vnnmap->size;
1311         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1312         
1313         data.dsize = len;
1314         data.dptr  = (uint8_t *)map;
1315
1316         ret = ctdb_control(ctdb, destnode, 0, 
1317                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1318                            NULL, NULL, &res, &timeout, NULL);
1319         if (ret != 0 || res != 0) {
1320                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1321                 return -1;
1322         }
1323
1324         talloc_free(map);
1325
1326         return 0;
1327 }
1328
1329
1330 /*
1331   async send for pull database
1332  */
1333 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1334         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1335         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1336 {
1337         TDB_DATA indata;
1338         struct ctdb_control_pulldb *pull;
1339         struct ctdb_client_control_state *state;
1340
1341         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1342         CTDB_NO_MEMORY_NULL(ctdb, pull);
1343
1344         pull->db_id   = dbid;
1345         pull->lmaster = lmaster;
1346
1347         indata.dsize = sizeof(struct ctdb_control_pulldb);
1348         indata.dptr  = (unsigned char *)pull;
1349
1350         state = ctdb_control_send(ctdb, destnode, 0, 
1351                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1352                                   mem_ctx, &timeout, NULL);
1353         talloc_free(pull);
1354
1355         return state;
1356 }
1357
1358 /*
1359   async recv for pull database
1360  */
1361 int ctdb_ctrl_pulldb_recv(
1362         struct ctdb_context *ctdb, 
1363         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1364         TDB_DATA *outdata)
1365 {
1366         int ret;
1367         int32_t res;
1368
1369         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1370         if ( (ret != 0) || (res != 0) ){
1371                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1372                 return -1;
1373         }
1374
1375         return 0;
1376 }
1377
1378 /*
1379   pull all keys and records for a specific database on a node
1380  */
1381 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1382                 uint32_t dbid, uint32_t lmaster, 
1383                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1384                 TDB_DATA *outdata)
1385 {
1386         struct ctdb_client_control_state *state;
1387
1388         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1389                                       timeout);
1390         
1391         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1392 }
1393
1394
1395 /*
1396   change dmaster for all keys in the database to the new value
1397  */
1398 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1399                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1400 {
1401         int ret;
1402         TDB_DATA indata;
1403         int32_t res;
1404
1405         indata.dsize = 2*sizeof(uint32_t);
1406         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1407
1408         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1409         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1410
1411         ret = ctdb_control(ctdb, destnode, 0, 
1412                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1413                            NULL, NULL, &res, &timeout, NULL);
1414         if (ret != 0 || res != 0) {
1415                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1416                 return -1;
1417         }
1418
1419         return 0;
1420 }
1421
1422 /*
1423   ping a node, return number of clients connected
1424  */
1425 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1431                            tdb_null, NULL, NULL, &res, NULL, NULL);
1432         if (ret != 0) {
1433                 return -1;
1434         }
1435         return res;
1436 }
1437
1438 /*
1439   find the real path to a ltdb 
1440  */
1441 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1442                    const char **path)
1443 {
1444         int ret;
1445         int32_t res;
1446         TDB_DATA data;
1447
1448         data.dptr = (uint8_t *)&dbid;
1449         data.dsize = sizeof(dbid);
1450
1451         ret = ctdb_control(ctdb, destnode, 0, 
1452                            CTDB_CONTROL_GETDBPATH, 0, data, 
1453                            mem_ctx, &data, &res, &timeout, NULL);
1454         if (ret != 0 || res != 0) {
1455                 return -1;
1456         }
1457
1458         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1459         if ((*path) == NULL) {
1460                 return -1;
1461         }
1462
1463         talloc_free(data.dptr);
1464
1465         return 0;
1466 }
1467
1468 /*
1469   find the name of a db 
1470  */
1471 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1472                    const char **name)
1473 {
1474         int ret;
1475         int32_t res;
1476         TDB_DATA data;
1477
1478         data.dptr = (uint8_t *)&dbid;
1479         data.dsize = sizeof(dbid);
1480
1481         ret = ctdb_control(ctdb, destnode, 0, 
1482                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1483                            mem_ctx, &data, &res, &timeout, NULL);
1484         if (ret != 0 || res != 0) {
1485                 return -1;
1486         }
1487
1488         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1489         if ((*name) == NULL) {
1490                 return -1;
1491         }
1492
1493         talloc_free(data.dptr);
1494
1495         return 0;
1496 }
1497
1498 /*
1499   create a database
1500  */
1501 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1502                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1503 {
1504         int ret;
1505         int32_t res;
1506         TDB_DATA data;
1507
1508         data.dptr = discard_const(name);
1509         data.dsize = strlen(name)+1;
1510
1511         ret = ctdb_control(ctdb, destnode, 0, 
1512                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1513                            0, data, 
1514                            mem_ctx, &data, &res, &timeout, NULL);
1515
1516         if (ret != 0 || res != 0) {
1517                 return -1;
1518         }
1519
1520         return 0;
1521 }
1522
1523 /*
1524   get debug level on a node
1525  */
1526 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1527 {
1528         int ret;
1529         int32_t res;
1530         TDB_DATA data;
1531
1532         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1533                            ctdb, &data, &res, NULL, NULL);
1534         if (ret != 0 || res != 0) {
1535                 return -1;
1536         }
1537         if (data.dsize != sizeof(int32_t)) {
1538                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1539                          (unsigned)data.dsize));
1540                 return -1;
1541         }
1542         *level = *(int32_t *)data.dptr;
1543         talloc_free(data.dptr);
1544         return 0;
1545 }
1546
1547 /*
1548   set debug level on a node
1549  */
1550 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1551 {
1552         int ret;
1553         int32_t res;
1554         TDB_DATA data;
1555
1556         data.dptr = (uint8_t *)&level;
1557         data.dsize = sizeof(level);
1558
1559         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1560                            NULL, NULL, &res, NULL, NULL);
1561         if (ret != 0 || res != 0) {
1562                 return -1;
1563         }
1564         return 0;
1565 }
1566
1567
1568 /*
1569   get a list of connected nodes
1570  */
1571 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1572                                 struct timeval timeout,
1573                                 TALLOC_CTX *mem_ctx,
1574                                 uint32_t *num_nodes)
1575 {
1576         struct ctdb_node_map *map=NULL;
1577         int ret, i;
1578         uint32_t *nodes;
1579
1580         *num_nodes = 0;
1581
1582         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1583         if (ret != 0) {
1584                 return NULL;
1585         }
1586
1587         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1588         if (nodes == NULL) {
1589                 return NULL;
1590         }
1591
1592         for (i=0;i<map->num;i++) {
1593                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1594                         nodes[*num_nodes] = map->nodes[i].pnn;
1595                         (*num_nodes)++;
1596                 }
1597         }
1598
1599         return nodes;
1600 }
1601
1602
1603 /*
1604   reset remote status
1605  */
1606 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1607 {
1608         int ret;
1609         int32_t res;
1610
1611         ret = ctdb_control(ctdb, destnode, 0, 
1612                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1613                            NULL, NULL, &res, NULL, NULL);
1614         if (ret != 0 || res != 0) {
1615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1616                 return -1;
1617         }
1618         return 0;
1619 }
1620
1621 /*
1622   this is the dummy null procedure that all databases support
1623 */
1624 static int ctdb_null_func(struct ctdb_call_info *call)
1625 {
1626         return 0;
1627 }
1628
1629 /*
1630   this is a plain fetch procedure that all databases support
1631 */
1632 static int ctdb_fetch_func(struct ctdb_call_info *call)
1633 {
1634         call->reply_data = &call->record_data;
1635         return 0;
1636 }
1637
1638 /*
1639   attach to a specific database - client call
1640 */
1641 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1642 {
1643         struct ctdb_db_context *ctdb_db;
1644         TDB_DATA data;
1645         int ret;
1646         int32_t res;
1647
1648         ctdb_db = ctdb_db_handle(ctdb, name);
1649         if (ctdb_db) {
1650                 return ctdb_db;
1651         }
1652
1653         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1654         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1655
1656         ctdb_db->ctdb = ctdb;
1657         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1658         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1659
1660         data.dptr = discard_const(name);
1661         data.dsize = strlen(name)+1;
1662
1663         /* tell ctdb daemon to attach */
1664         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1665                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1666                            0, data, ctdb_db, &data, &res, NULL, NULL);
1667         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1668                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1669                 talloc_free(ctdb_db);
1670                 return NULL;
1671         }
1672         
1673         ctdb_db->db_id = *(uint32_t *)data.dptr;
1674         talloc_free(data.dptr);
1675
1676         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1677         if (ret != 0) {
1678                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1679                 talloc_free(ctdb_db);
1680                 return NULL;
1681         }
1682
1683         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1684         if (!ctdb->do_setsched) {
1685                 tdb_flags |= TDB_NOMMAP;
1686         }
1687
1688         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1689         if (ctdb_db->ltdb == NULL) {
1690                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1691                 talloc_free(ctdb_db);
1692                 return NULL;
1693         }
1694
1695         ctdb_db->persistent = persistent;
1696
1697         DLIST_ADD(ctdb->db_list, ctdb_db);
1698
1699         /* add well known functions */
1700         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1701         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1702
1703         return ctdb_db;
1704 }
1705
1706
1707 /*
1708   setup a call for a database
1709  */
1710 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1711 {
1712         struct ctdb_registered_call *call;
1713
1714 #if 0
1715         TDB_DATA data;
1716         int32_t status;
1717         struct ctdb_control_set_call c;
1718         int ret;
1719
1720         /* this is no longer valid with the separate daemon architecture */
1721         c.db_id = ctdb_db->db_id;
1722         c.fn    = fn;
1723         c.id    = id;
1724
1725         data.dptr = (uint8_t *)&c;
1726         data.dsize = sizeof(c);
1727
1728         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1729                            data, NULL, NULL, &status, NULL, NULL);
1730         if (ret != 0 || status != 0) {
1731                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1732                 return -1;
1733         }
1734 #endif
1735
1736         /* also register locally */
1737         call = talloc(ctdb_db, struct ctdb_registered_call);
1738         call->fn = fn;
1739         call->id = id;
1740
1741         DLIST_ADD(ctdb_db->calls, call);        
1742         return 0;
1743 }
1744
1745
1746 struct traverse_state {
1747         bool done;
1748         uint32_t count;
1749         ctdb_traverse_func fn;
1750         void *private_data;
1751 };
1752
1753 /*
1754   called on each key during a ctdb_traverse
1755  */
1756 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1757 {
1758         struct traverse_state *state = (struct traverse_state *)p;
1759         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1760         TDB_DATA key;
1761
1762         if (data.dsize < sizeof(uint32_t) ||
1763             d->length != data.dsize) {
1764                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1765                 state->done = True;
1766                 return;
1767         }
1768
1769         key.dsize = d->keylen;
1770         key.dptr  = &d->data[0];
1771         data.dsize = d->datalen;
1772         data.dptr = &d->data[d->keylen];
1773
1774         if (key.dsize == 0 && data.dsize == 0) {
1775                 /* end of traverse */
1776                 state->done = True;
1777                 return;
1778         }
1779
1780         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1781                 /* empty records are deleted records in ctdb */
1782                 return;
1783         }
1784
1785         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1786                 state->done = True;
1787         }
1788
1789         state->count++;
1790 }
1791
1792
1793 /*
1794   start a cluster wide traverse, calling the supplied fn on each record
1795   return the number of records traversed, or -1 on error
1796  */
1797 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1798 {
1799         TDB_DATA data;
1800         struct ctdb_traverse_start t;
1801         int32_t status;
1802         int ret;
1803         uint64_t srvid = (getpid() | 0xFLL<<60);
1804         struct traverse_state state;
1805
1806         state.done = False;
1807         state.count = 0;
1808         state.private_data = private_data;
1809         state.fn = fn;
1810
1811         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1812         if (ret != 0) {
1813                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1814                 return -1;
1815         }
1816
1817         t.db_id = ctdb_db->db_id;
1818         t.srvid = srvid;
1819         t.reqid = 0;
1820
1821         data.dptr = (uint8_t *)&t;
1822         data.dsize = sizeof(t);
1823
1824         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1825                            data, NULL, NULL, &status, NULL, NULL);
1826         if (ret != 0 || status != 0) {
1827                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1828                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1829                 return -1;
1830         }
1831
1832         while (!state.done) {
1833                 event_loop_once(ctdb_db->ctdb->ev);
1834         }
1835
1836         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1837         if (ret != 0) {
1838                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1839                 return -1;
1840         }
1841
1842         return state.count;
1843 }
1844
1845 #define ISASCII(x) ((x>31)&&(x<128))
1846 /*
1847   called on each key during a catdb
1848  */
1849 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1850 {
1851         int i;
1852         FILE *f = (FILE *)p;
1853         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1854
1855         fprintf(f, "dmaster: %u\n", h->dmaster);
1856         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1857
1858         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1859         for (i=0;i<key.dsize;i++) {
1860                 if (ISASCII(key.dptr[i])) {
1861                         fprintf(f, "%c", key.dptr[i]);
1862                 } else {
1863                         fprintf(f, "\\%02X", key.dptr[i]);
1864                 }
1865         }
1866         fprintf(f, "\"\n");
1867
1868         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1869         for (i=sizeof(*h);i<data.dsize;i++) {
1870                 if (ISASCII(data.dptr[i])) {
1871                         fprintf(f, "%c", data.dptr[i]);
1872                 } else {
1873                         fprintf(f, "\\%02X", data.dptr[i]);
1874                 }
1875         }
1876         fprintf(f, "\"\n");
1877
1878         return 0;
1879 }
1880
1881 /*
1882   convenience function to list all keys to stdout
1883  */
1884 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1885 {
1886         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1887 }
1888
1889 /*
1890   get the pid of a ctdb daemon
1891  */
1892 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1893 {
1894         int ret;
1895         int32_t res;
1896
1897         ret = ctdb_control(ctdb, destnode, 0, 
1898                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1899                            NULL, NULL, &res, &timeout, NULL);
1900         if (ret != 0) {
1901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1902                 return -1;
1903         }
1904
1905         *pid = res;
1906
1907         return 0;
1908 }
1909
1910
1911 /*
1912   async freeze send control
1913  */
1914 struct ctdb_client_control_state *
1915 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1916 {
1917         return ctdb_control_send(ctdb, destnode, priority, 
1918                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1919                            mem_ctx, &timeout, NULL);
1920 }
1921
1922 /* 
1923    async freeze recv control
1924 */
1925 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1926 {
1927         int ret;
1928         int32_t res;
1929
1930         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1931         if ( (ret != 0) || (res != 0) ){
1932                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1933                 return -1;
1934         }
1935
1936         return 0;
1937 }
1938
1939 /*
1940   freeze databases of a certain priority
1941  */
1942 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1943 {
1944         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1945         struct ctdb_client_control_state *state;
1946         int ret;
1947
1948         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1949         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1950         talloc_free(tmp_ctx);
1951
1952         return ret;
1953 }
1954
1955 /* Freeze all databases */
1956 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1957 {
1958         int i;
1959
1960         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1961                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1962                         return -1;
1963                 }
1964         }
1965         return 0;
1966 }
1967
1968 /*
1969   thaw databases of a certain priority
1970  */
1971 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1972 {
1973         int ret;
1974         int32_t res;
1975
1976         ret = ctdb_control(ctdb, destnode, priority, 
1977                            CTDB_CONTROL_THAW, 0, tdb_null, 
1978                            NULL, NULL, &res, &timeout, NULL);
1979         if (ret != 0 || res != 0) {
1980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1981                 return -1;
1982         }
1983
1984         return 0;
1985 }
1986
1987 /* thaw all databases */
1988 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1989 {
1990         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1991 }
1992
1993 /*
1994   get pnn of a node, or -1
1995  */
1996 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1997 {
1998         int ret;
1999         int32_t res;
2000
2001         ret = ctdb_control(ctdb, destnode, 0, 
2002                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2003                            NULL, NULL, &res, &timeout, NULL);
2004         if (ret != 0) {
2005                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2006                 return -1;
2007         }
2008
2009         return res;
2010 }
2011
2012 /*
2013   get the monitoring mode of a remote node
2014  */
2015 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2016 {
2017         int ret;
2018         int32_t res;
2019
2020         ret = ctdb_control(ctdb, destnode, 0, 
2021                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2022                            NULL, NULL, &res, &timeout, NULL);
2023         if (ret != 0) {
2024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2025                 return -1;
2026         }
2027
2028         *monmode = res;
2029
2030         return 0;
2031 }
2032
2033
2034 /*
2035  set the monitoring mode of a remote node to active
2036  */
2037 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2038 {
2039         int ret;
2040         
2041
2042         ret = ctdb_control(ctdb, destnode, 0, 
2043                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2044                            NULL, NULL,NULL, &timeout, NULL);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2047                 return -1;
2048         }
2049
2050         
2051
2052         return 0;
2053 }
2054
2055 /*
2056   set the monitoring mode of a remote node to disable
2057  */
2058 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2059 {
2060         int ret;
2061         
2062
2063         ret = ctdb_control(ctdb, destnode, 0, 
2064                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2065                            NULL, NULL, NULL, &timeout, NULL);
2066         if (ret != 0) {
2067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2068                 return -1;
2069         }
2070
2071         
2072
2073         return 0;
2074 }
2075
2076
2077
2078 /* 
2079   sent to a node to make it take over an ip address
2080 */
2081 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2082                           uint32_t destnode, struct ctdb_public_ip *ip)
2083 {
2084         TDB_DATA data;
2085         struct ctdb_public_ipv4 ipv4;
2086         int ret;
2087         int32_t res;
2088
2089         if (ip->addr.sa.sa_family == AF_INET) {
2090                 ipv4.pnn = ip->pnn;
2091                 ipv4.sin = ip->addr.ip;
2092
2093                 data.dsize = sizeof(ipv4);
2094                 data.dptr  = (uint8_t *)&ipv4;
2095
2096                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2097                            NULL, &res, &timeout, NULL);
2098         } else {
2099                 data.dsize = sizeof(*ip);
2100                 data.dptr  = (uint8_t *)ip;
2101
2102                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2103                            NULL, &res, &timeout, NULL);
2104         }
2105
2106         if (ret != 0 || res != 0) {
2107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2108                 return -1;
2109         }
2110
2111         return 0;       
2112 }
2113
2114
2115 /* 
2116   sent to a node to make it release an ip address
2117 */
2118 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2119                          uint32_t destnode, struct ctdb_public_ip *ip)
2120 {
2121         TDB_DATA data;
2122         struct ctdb_public_ipv4 ipv4;
2123         int ret;
2124         int32_t res;
2125
2126         if (ip->addr.sa.sa_family == AF_INET) {
2127                 ipv4.pnn = ip->pnn;
2128                 ipv4.sin = ip->addr.ip;
2129
2130                 data.dsize = sizeof(ipv4);
2131                 data.dptr  = (uint8_t *)&ipv4;
2132
2133                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2134                                    NULL, &res, &timeout, NULL);
2135         } else {
2136                 data.dsize = sizeof(*ip);
2137                 data.dptr  = (uint8_t *)ip;
2138
2139                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2140                                    NULL, &res, &timeout, NULL);
2141         }
2142
2143         if (ret != 0 || res != 0) {
2144                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2145                 return -1;
2146         }
2147
2148         return 0;       
2149 }
2150
2151
2152 /*
2153   get a tunable
2154  */
2155 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2156                           struct timeval timeout, 
2157                           uint32_t destnode,
2158                           const char *name, uint32_t *value)
2159 {
2160         struct ctdb_control_get_tunable *t;
2161         TDB_DATA data, outdata;
2162         int32_t res;
2163         int ret;
2164
2165         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2166         data.dptr  = talloc_size(ctdb, data.dsize);
2167         CTDB_NO_MEMORY(ctdb, data.dptr);
2168
2169         t = (struct ctdb_control_get_tunable *)data.dptr;
2170         t->length = strlen(name)+1;
2171         memcpy(t->name, name, t->length);
2172
2173         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2174                            &outdata, &res, &timeout, NULL);
2175         talloc_free(data.dptr);
2176         if (ret != 0 || res != 0) {
2177                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2178                 return -1;
2179         }
2180
2181         if (outdata.dsize != sizeof(uint32_t)) {
2182                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2183                 talloc_free(outdata.dptr);
2184                 return -1;
2185         }
2186         
2187         *value = *(uint32_t *)outdata.dptr;
2188         talloc_free(outdata.dptr);
2189
2190         return 0;
2191 }
2192
2193 /*
2194   set a tunable
2195  */
2196 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2197                           struct timeval timeout, 
2198                           uint32_t destnode,
2199                           const char *name, uint32_t value)
2200 {
2201         struct ctdb_control_set_tunable *t;
2202         TDB_DATA data;
2203         int32_t res;
2204         int ret;
2205
2206         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2207         data.dptr  = talloc_size(ctdb, data.dsize);
2208         CTDB_NO_MEMORY(ctdb, data.dptr);
2209
2210         t = (struct ctdb_control_set_tunable *)data.dptr;
2211         t->length = strlen(name)+1;
2212         memcpy(t->name, name, t->length);
2213         t->value = value;
2214
2215         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2216                            NULL, &res, &timeout, NULL);
2217         talloc_free(data.dptr);
2218         if (ret != 0 || res != 0) {
2219                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2220                 return -1;
2221         }
2222
2223         return 0;
2224 }
2225
2226 /*
2227   list tunables
2228  */
2229 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2230                             struct timeval timeout, 
2231                             uint32_t destnode,
2232                             TALLOC_CTX *mem_ctx,
2233                             const char ***list, uint32_t *count)
2234 {
2235         TDB_DATA outdata;
2236         int32_t res;
2237         int ret;
2238         struct ctdb_control_list_tunable *t;
2239         char *p, *s, *ptr;
2240
2241         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2242                            mem_ctx, &outdata, &res, &timeout, NULL);
2243         if (ret != 0 || res != 0) {
2244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2245                 return -1;
2246         }
2247
2248         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2249         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2250             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2251                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2252                 talloc_free(outdata.dptr);
2253                 return -1;              
2254         }
2255         
2256         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2257         CTDB_NO_MEMORY(ctdb, p);
2258
2259         talloc_free(outdata.dptr);
2260         
2261         (*list) = NULL;
2262         (*count) = 0;
2263
2264         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2265                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2266                 CTDB_NO_MEMORY(ctdb, *list);
2267                 (*list)[*count] = talloc_strdup(*list, s);
2268                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2269                 (*count)++;
2270         }
2271
2272         talloc_free(p);
2273
2274         return 0;
2275 }
2276
2277
2278 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2279                         struct timeval timeout, uint32_t destnode, 
2280                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2281 {
2282         int ret;
2283         TDB_DATA outdata;
2284         int32_t res;
2285
2286         ret = ctdb_control(ctdb, destnode, 0, 
2287                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2288                            mem_ctx, &outdata, &res, &timeout, NULL);
2289         if (ret == 0 && res == -1) {
2290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2291                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2292         }
2293         if (ret != 0 || res != 0) {
2294           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2295                 return -1;
2296         }
2297
2298         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2299         talloc_free(outdata.dptr);
2300                     
2301         return 0;
2302 }
2303
2304 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2305                         struct timeval timeout, uint32_t destnode, 
2306                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2307 {
2308         int ret, i, len;
2309         TDB_DATA outdata;
2310         int32_t res;
2311         struct ctdb_all_public_ipsv4 *ipsv4;
2312
2313         ret = ctdb_control(ctdb, destnode, 0, 
2314                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2315                            mem_ctx, &outdata, &res, &timeout, NULL);
2316         if (ret != 0 || res != 0) {
2317                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2318                 return -1;
2319         }
2320
2321         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2322         len = offsetof(struct ctdb_all_public_ips, ips) +
2323                 ipsv4->num*sizeof(struct ctdb_public_ip);
2324         *ips = talloc_zero_size(mem_ctx, len);
2325         CTDB_NO_MEMORY(ctdb, *ips);
2326         (*ips)->num = ipsv4->num;
2327         for (i=0; i<ipsv4->num; i++) {
2328                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2329                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2330         }
2331
2332         talloc_free(outdata.dptr);
2333                     
2334         return 0;
2335 }
2336
2337 /*
2338   set/clear the permanent disabled bit on a remote node
2339  */
2340 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2341                        uint32_t set, uint32_t clear)
2342 {
2343         int ret;
2344         TDB_DATA data;
2345         struct ctdb_node_map *nodemap=NULL;
2346         struct ctdb_node_flag_change c;
2347         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2348         uint32_t recmaster;
2349         uint32_t *nodes;
2350
2351
2352         /* find the recovery master */
2353         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2354         if (ret != 0) {
2355                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2356                 talloc_free(tmp_ctx);
2357                 return ret;
2358         }
2359
2360
2361         /* read the node flags from the recmaster */
2362         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2363         if (ret != 0) {
2364                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2365                 talloc_free(tmp_ctx);
2366                 return -1;
2367         }
2368         if (destnode >= nodemap->num) {
2369                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2370                 talloc_free(tmp_ctx);
2371                 return -1;
2372         }
2373
2374         c.pnn       = destnode;
2375         c.old_flags = nodemap->nodes[destnode].flags;
2376         c.new_flags = c.old_flags;
2377         c.new_flags |= set;
2378         c.new_flags &= ~clear;
2379
2380         data.dsize = sizeof(c);
2381         data.dptr = (unsigned char *)&c;
2382
2383         /* send the flags update to all connected nodes */
2384         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2385
2386         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2387                                         nodes, 0,
2388                                         timeout, false, data,
2389                                         NULL, NULL,
2390                                         NULL) != 0) {
2391                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2392
2393                 talloc_free(tmp_ctx);
2394                 return -1;
2395         }
2396
2397         talloc_free(tmp_ctx);
2398         return 0;
2399 }
2400
2401
2402 /*
2403   get all tunables
2404  */
2405 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2406                                struct timeval timeout, 
2407                                uint32_t destnode,
2408                                struct ctdb_tunable *tunables)
2409 {
2410         TDB_DATA outdata;
2411         int ret;
2412         int32_t res;
2413
2414         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2415                            &outdata, &res, &timeout, NULL);
2416         if (ret != 0 || res != 0) {
2417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2418                 return -1;
2419         }
2420
2421         if (outdata.dsize != sizeof(*tunables)) {
2422                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2423                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2424                 return -1;              
2425         }
2426
2427         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2428         talloc_free(outdata.dptr);
2429         return 0;
2430 }
2431
2432 /*
2433   add a public address to a node
2434  */
2435 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2436                       struct timeval timeout, 
2437                       uint32_t destnode,
2438                       struct ctdb_control_ip_iface *pub)
2439 {
2440         TDB_DATA data;
2441         int32_t res;
2442         int ret;
2443
2444         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2445         data.dptr  = (unsigned char *)pub;
2446
2447         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2448                            NULL, &res, &timeout, NULL);
2449         if (ret != 0 || res != 0) {
2450                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2451                 return -1;
2452         }
2453
2454         return 0;
2455 }
2456
2457 /*
2458   delete a public address from a node
2459  */
2460 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2461                       struct timeval timeout, 
2462                       uint32_t destnode,
2463                       struct ctdb_control_ip_iface *pub)
2464 {
2465         TDB_DATA data;
2466         int32_t res;
2467         int ret;
2468
2469         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2470         data.dptr  = (unsigned char *)pub;
2471
2472         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2473                            NULL, &res, &timeout, NULL);
2474         if (ret != 0 || res != 0) {
2475                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2476                 return -1;
2477         }
2478
2479         return 0;
2480 }
2481
2482 /*
2483   kill a tcp connection
2484  */
2485 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2486                       struct timeval timeout, 
2487                       uint32_t destnode,
2488                       struct ctdb_control_killtcp *killtcp)
2489 {
2490         TDB_DATA data;
2491         int32_t res;
2492         int ret;
2493
2494         data.dsize = sizeof(struct ctdb_control_killtcp);
2495         data.dptr  = (unsigned char *)killtcp;
2496
2497         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2498                            NULL, &res, &timeout, NULL);
2499         if (ret != 0 || res != 0) {
2500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2501                 return -1;
2502         }
2503
2504         return 0;
2505 }
2506
2507 /*
2508   send a gratious arp
2509  */
2510 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2511                       struct timeval timeout, 
2512                       uint32_t destnode,
2513                       ctdb_sock_addr *addr,
2514                       const char *ifname)
2515 {
2516         TDB_DATA data;
2517         int32_t res;
2518         int ret, len;
2519         struct ctdb_control_gratious_arp *gratious_arp;
2520         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2521
2522
2523         len = strlen(ifname)+1;
2524         gratious_arp = talloc_size(tmp_ctx, 
2525                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2526         CTDB_NO_MEMORY(ctdb, gratious_arp);
2527
2528         gratious_arp->addr = *addr;
2529         gratious_arp->len = len;
2530         memcpy(&gratious_arp->iface[0], ifname, len);
2531
2532
2533         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2534         data.dptr  = (unsigned char *)gratious_arp;
2535
2536         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2537                            NULL, &res, &timeout, NULL);
2538         if (ret != 0 || res != 0) {
2539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2540                 talloc_free(tmp_ctx);
2541                 return -1;
2542         }
2543
2544         talloc_free(tmp_ctx);
2545         return 0;
2546 }
2547
2548 /*
2549   get a list of all tcp tickles that a node knows about for a particular vnn
2550  */
2551 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2552                               struct timeval timeout, uint32_t destnode, 
2553                               TALLOC_CTX *mem_ctx, 
2554                               ctdb_sock_addr *addr,
2555                               struct ctdb_control_tcp_tickle_list **list)
2556 {
2557         int ret;
2558         TDB_DATA data, outdata;
2559         int32_t status;
2560
2561         data.dptr = (uint8_t*)addr;
2562         data.dsize = sizeof(ctdb_sock_addr);
2563
2564         ret = ctdb_control(ctdb, destnode, 0, 
2565                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2566                            mem_ctx, &outdata, &status, NULL, NULL);
2567         if (ret != 0 || status != 0) {
2568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2569                 return -1;
2570         }
2571
2572         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2573
2574         return status;
2575 }
2576
2577 /*
2578   register a server id
2579  */
2580 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2581                       struct timeval timeout, 
2582                       struct ctdb_server_id *id)
2583 {
2584         TDB_DATA data;
2585         int32_t res;
2586         int ret;
2587
2588         data.dsize = sizeof(struct ctdb_server_id);
2589         data.dptr  = (unsigned char *)id;
2590
2591         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2592                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2593                         0, data, NULL,
2594                         NULL, &res, &timeout, NULL);
2595         if (ret != 0 || res != 0) {
2596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2597                 return -1;
2598         }
2599
2600         return 0;
2601 }
2602
2603 /*
2604   unregister a server id
2605  */
2606 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2607                       struct timeval timeout, 
2608                       struct ctdb_server_id *id)
2609 {
2610         TDB_DATA data;
2611         int32_t res;
2612         int ret;
2613
2614         data.dsize = sizeof(struct ctdb_server_id);
2615         data.dptr  = (unsigned char *)id;
2616
2617         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2618                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2619                         0, data, NULL,
2620                         NULL, &res, &timeout, NULL);
2621         if (ret != 0 || res != 0) {
2622                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2623                 return -1;
2624         }
2625
2626         return 0;
2627 }
2628
2629
2630 /*
2631   check if a server id exists
2632
2633   if a server id does exist, return *status == 1, otherwise *status == 0
2634  */
2635 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2636                       struct timeval timeout, 
2637                       uint32_t destnode,
2638                       struct ctdb_server_id *id,
2639                       uint32_t *status)
2640 {
2641         TDB_DATA data;
2642         int32_t res;
2643         int ret;
2644
2645         data.dsize = sizeof(struct ctdb_server_id);
2646         data.dptr  = (unsigned char *)id;
2647
2648         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2649                         0, data, NULL,
2650                         NULL, &res, &timeout, NULL);
2651         if (ret != 0) {
2652                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2653                 return -1;
2654         }
2655
2656         if (res) {
2657                 *status = 1;
2658         } else {
2659                 *status = 0;
2660         }
2661
2662         return 0;
2663 }
2664
2665 /*
2666    get the list of server ids that are registered on a node
2667 */
2668 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2669                 TALLOC_CTX *mem_ctx,
2670                 struct timeval timeout, uint32_t destnode, 
2671                 struct ctdb_server_id_list **svid_list)
2672 {
2673         int ret;
2674         TDB_DATA outdata;
2675         int32_t res;
2676
2677         ret = ctdb_control(ctdb, destnode, 0, 
2678                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2679                            mem_ctx, &outdata, &res, &timeout, NULL);
2680         if (ret != 0 || res != 0) {
2681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2682                 return -1;
2683         }
2684
2685         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2686                     
2687         return 0;
2688 }
2689
2690 /*
2691   initialise the ctdb daemon for client applications
2692
2693   NOTE: In current code the daemon does not fork. This is for testing purposes only
2694   and to simplify the code.
2695 */
2696 struct ctdb_context *ctdb_init(struct event_context *ev)
2697 {
2698         int ret;
2699         struct ctdb_context *ctdb;
2700
2701         ctdb = talloc_zero(ev, struct ctdb_context);
2702         if (ctdb == NULL) {
2703                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2704                 return NULL;
2705         }
2706         ctdb->ev  = ev;
2707         ctdb->idr = idr_init(ctdb);
2708         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2709
2710         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2711         if (ret != 0) {
2712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2713                 talloc_free(ctdb);
2714                 return NULL;
2715         }
2716
2717         return ctdb;
2718 }
2719
2720
2721 /*
2722   set some ctdb flags
2723 */
2724 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2725 {
2726         ctdb->flags |= flags;
2727 }
2728
2729 /*
2730   setup the local socket name
2731 */
2732 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2733 {
2734         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2735         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2736
2737         return 0;
2738 }
2739
2740 /*
2741   return the pnn of this node
2742 */
2743 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2744 {
2745         return ctdb->pnn;
2746 }
2747
2748
2749 /*
2750   get the uptime of a remote node
2751  */
2752 struct ctdb_client_control_state *
2753 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2754 {
2755         return ctdb_control_send(ctdb, destnode, 0, 
2756                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2757                            mem_ctx, &timeout, NULL);
2758 }
2759
2760 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2761 {
2762         int ret;
2763         int32_t res;
2764         TDB_DATA outdata;
2765
2766         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2767         if (ret != 0 || res != 0) {
2768                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2769                 return -1;
2770         }
2771
2772         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2773
2774         return 0;
2775 }
2776
2777 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2778 {
2779         struct ctdb_client_control_state *state;
2780
2781         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2782         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2783 }
2784
2785 /*
2786   send a control to execute the "recovered" event script on a node
2787  */
2788 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2789 {
2790         int ret;
2791         int32_t status;
2792
2793         ret = ctdb_control(ctdb, destnode, 0, 
2794                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2795                            NULL, NULL, &status, &timeout, NULL);
2796         if (ret != 0 || status != 0) {
2797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2798                 return -1;
2799         }
2800
2801         return 0;
2802 }
2803
2804 /* 
2805   callback for the async helpers used when sending the same control
2806   to multiple nodes in parallell.
2807 */
2808 static void async_callback(struct ctdb_client_control_state *state)
2809 {
2810         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2811         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2812         int ret;
2813         TDB_DATA outdata;
2814         int32_t res;
2815         uint32_t destnode = state->c->hdr.destnode;
2816
2817         /* one more node has responded with recmode data */
2818         data->count--;
2819
2820         /* if we failed to push the db, then return an error and let
2821            the main loop try again.
2822         */
2823         if (state->state != CTDB_CONTROL_DONE) {
2824                 if ( !data->dont_log_errors) {
2825                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2826                 }
2827                 data->fail_count++;
2828                 if (data->fail_callback) {
2829                         data->fail_callback(ctdb, destnode, res, outdata,
2830                                         data->callback_data);
2831                 }
2832                 return;
2833         }
2834         
2835         state->async.fn = NULL;
2836
2837         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2838         if ((ret != 0) || (res != 0)) {
2839                 if ( !data->dont_log_errors) {
2840                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2841                 }
2842                 data->fail_count++;
2843                 if (data->fail_callback) {
2844                         data->fail_callback(ctdb, destnode, res, outdata,
2845                                         data->callback_data);
2846                 }
2847         }
2848         if ((ret == 0) && (data->callback != NULL)) {
2849                 data->callback(ctdb, destnode, res, outdata,
2850                                         data->callback_data);
2851         }
2852 }
2853
2854
2855 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2856 {
2857         /* set up the callback functions */
2858         state->async.fn = async_callback;
2859         state->async.private_data = data;
2860         
2861         /* one more control to wait for to complete */
2862         data->count++;
2863 }
2864
2865
2866 /* wait for up to the maximum number of seconds allowed
2867    or until all nodes we expect a response from has replied
2868 */
2869 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2870 {
2871         while (data->count > 0) {
2872                 event_loop_once(ctdb->ev);
2873         }
2874         if (data->fail_count != 0) {
2875                 if (!data->dont_log_errors) {
2876                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2877                                  data->fail_count));
2878                 }
2879                 return -1;
2880         }
2881         return 0;
2882 }
2883
2884
2885 /* 
2886    perform a simple control on the listed nodes
2887    The control cannot return data
2888  */
2889 int ctdb_client_async_control(struct ctdb_context *ctdb,
2890                                 enum ctdb_controls opcode,
2891                                 uint32_t *nodes,
2892                                 uint64_t srvid,
2893                                 struct timeval timeout,
2894                                 bool dont_log_errors,
2895                                 TDB_DATA data,
2896                                 client_async_callback client_callback,
2897                                 client_async_callback fail_callback,
2898                                 void *callback_data)
2899 {
2900         struct client_async_data *async_data;
2901         struct ctdb_client_control_state *state;
2902         int j, num_nodes;
2903
2904         async_data = talloc_zero(ctdb, struct client_async_data);
2905         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2906         async_data->dont_log_errors = dont_log_errors;
2907         async_data->callback = client_callback;
2908         async_data->fail_callback = fail_callback;
2909         async_data->callback_data = callback_data;
2910         async_data->opcode        = opcode;
2911
2912         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2913
2914         /* loop over all nodes and send an async control to each of them */
2915         for (j=0; j<num_nodes; j++) {
2916                 uint32_t pnn = nodes[j];
2917
2918                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2919                                           0, data, async_data, &timeout, NULL);
2920                 if (state == NULL) {
2921                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2922                         talloc_free(async_data);
2923                         return -1;
2924                 }
2925                 
2926                 ctdb_client_async_add(async_data, state);
2927         }
2928
2929         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2930                 talloc_free(async_data);
2931                 return -1;
2932         }
2933
2934         talloc_free(async_data);
2935         return 0;
2936 }
2937
2938 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2939                                 struct ctdb_vnn_map *vnn_map,
2940                                 TALLOC_CTX *mem_ctx,
2941                                 bool include_self)
2942 {
2943         int i, j, num_nodes;
2944         uint32_t *nodes;
2945
2946         for (i=num_nodes=0;i<vnn_map->size;i++) {
2947                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2948                         continue;
2949                 }
2950                 num_nodes++;
2951         } 
2952
2953         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2954         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2955
2956         for (i=j=0;i<vnn_map->size;i++) {
2957                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2958                         continue;
2959                 }
2960                 nodes[j++] = vnn_map->map[i];
2961         } 
2962
2963         return nodes;
2964 }
2965
2966 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2967                                 struct ctdb_node_map *node_map,
2968                                 TALLOC_CTX *mem_ctx,
2969                                 bool include_self)
2970 {
2971         int i, j, num_nodes;
2972         uint32_t *nodes;
2973
2974         for (i=num_nodes=0;i<node_map->num;i++) {
2975                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2976                         continue;
2977                 }
2978                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2979                         continue;
2980                 }
2981                 num_nodes++;
2982         } 
2983
2984         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2985         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2986
2987         for (i=j=0;i<node_map->num;i++) {
2988                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2989                         continue;
2990                 }
2991                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2992                         continue;
2993                 }
2994                 nodes[j++] = node_map->nodes[i].pnn;
2995         } 
2996
2997         return nodes;
2998 }
2999
3000 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3001                                 struct ctdb_node_map *node_map,
3002                                 TALLOC_CTX *mem_ctx,
3003                                 uint32_t pnn)
3004 {
3005         int i, j, num_nodes;
3006         uint32_t *nodes;
3007
3008         for (i=num_nodes=0;i<node_map->num;i++) {
3009                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3010                         continue;
3011                 }
3012                 if (node_map->nodes[i].pnn == pnn) {
3013                         continue;
3014                 }
3015                 num_nodes++;
3016         } 
3017
3018         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3019         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3020
3021         for (i=j=0;i<node_map->num;i++) {
3022                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3023                         continue;
3024                 }
3025                 if (node_map->nodes[i].pnn == pnn) {
3026                         continue;
3027                 }
3028                 nodes[j++] = node_map->nodes[i].pnn;
3029         } 
3030
3031         return nodes;
3032 }
3033
3034 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3035                                 struct ctdb_node_map *node_map,
3036                                 TALLOC_CTX *mem_ctx,
3037                                 bool include_self)
3038 {
3039         int i, j, num_nodes;
3040         uint32_t *nodes;
3041
3042         for (i=num_nodes=0;i<node_map->num;i++) {
3043                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3044                         continue;
3045                 }
3046                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3047                         continue;
3048                 }
3049                 num_nodes++;
3050         } 
3051
3052         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3053         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3054
3055         for (i=j=0;i<node_map->num;i++) {
3056                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3057                         continue;
3058                 }
3059                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3060                         continue;
3061                 }
3062                 nodes[j++] = node_map->nodes[i].pnn;
3063         } 
3064
3065         return nodes;
3066 }
3067
3068 /* 
3069   this is used to test if a pnn lock exists and if it exists will return
3070   the number of connections that pnn has reported or -1 if that recovery
3071   daemon is not running.
3072 */
3073 int
3074 ctdb_read_pnn_lock(int fd, int32_t pnn)
3075 {
3076         struct flock lock;
3077         char c;
3078
3079         lock.l_type = F_WRLCK;
3080         lock.l_whence = SEEK_SET;
3081         lock.l_start = pnn;
3082         lock.l_len = 1;
3083         lock.l_pid = 0;
3084
3085         if (fcntl(fd, F_GETLK, &lock) != 0) {
3086                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3087                 return -1;
3088         }
3089
3090         if (lock.l_type == F_UNLCK) {
3091                 return -1;
3092         }
3093
3094         if (pread(fd, &c, 1, pnn) == -1) {
3095                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3096                 return -1;
3097         }
3098
3099         return c;
3100 }
3101
3102 /*
3103   get capabilities of a remote node
3104  */
3105 struct ctdb_client_control_state *
3106 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3107 {
3108         return ctdb_control_send(ctdb, destnode, 0, 
3109                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3110                            mem_ctx, &timeout, NULL);
3111 }
3112
3113 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3114 {
3115         int ret;
3116         int32_t res;
3117         TDB_DATA outdata;
3118
3119         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3120         if ( (ret != 0) || (res != 0) ) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3122                 return -1;
3123         }
3124
3125         if (capabilities) {
3126                 *capabilities = *((uint32_t *)outdata.dptr);
3127         }
3128
3129         return 0;
3130 }
3131
3132 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3133 {
3134         struct ctdb_client_control_state *state;
3135         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3136         int ret;
3137
3138         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3139         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3140         talloc_free(tmp_ctx);
3141         return ret;
3142 }
3143
3144 /**
3145  * check whether a transaction is active on a given db on a given node
3146  */
3147 static int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3148                                             uint32_t destnode,
3149                                             uint32_t db_id)
3150 {
3151         int32_t status;
3152         int ret;
3153         TDB_DATA indata;
3154
3155         indata.dptr = (uint8_t *)&db_id;
3156         indata.dsize = sizeof(db_id);
3157
3158         ret = ctdb_control(ctdb, destnode, 0,
3159                            CTDB_CONTROL_TRANS2_ACTIVE,
3160                            0, indata, NULL, NULL, &status,
3161                            NULL, NULL);
3162
3163         if (ret != 0) {
3164                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3165                 return -1;
3166         }
3167
3168         return status;
3169 }
3170
3171
3172 struct ctdb_transaction_handle {
3173         struct ctdb_db_context *ctdb_db;
3174         bool in_replay;
3175         /*
3176          * we store the reads and writes done under a transaction:
3177          * - one list stores both reads and writes (m_all),
3178          * - the other just writes (m_write)
3179          */
3180         struct ctdb_marshall_buffer *m_all;
3181         struct ctdb_marshall_buffer *m_write;
3182 };
3183
3184 /* start a transaction on a database */
3185 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3186 {
3187         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3188         return 0;
3189 }
3190
3191 /* start a transaction on a database */
3192 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3193 {
3194         struct ctdb_record_handle *rh;
3195         TDB_DATA key;
3196         TDB_DATA data;
3197         struct ctdb_ltdb_header header;
3198         TALLOC_CTX *tmp_ctx;
3199         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3200         int ret;
3201         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3202         pid_t pid;
3203         int32_t status;
3204
3205         key.dptr = discard_const(keyname);
3206         key.dsize = strlen(keyname);
3207
3208         if (!ctdb_db->persistent) {
3209                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3210                 return -1;
3211         }
3212
3213 again:
3214         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3215                                               CTDB_CURRENT_NODE,
3216                                               ctdb_db->db_id);
3217         if (status == 1) {
3218                 DEBUG(DEBUG_NOTICE, (__location__ " transaction is active "
3219                                      "on db_id[%u]. waiting for 1 second\n",
3220                                      ctdb_db->db_id));
3221                 sleep(1);
3222                 goto again;
3223         }
3224
3225         tmp_ctx = talloc_new(h);
3226
3227         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3228         if (rh == NULL) {
3229                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3230                 talloc_free(tmp_ctx);
3231                 return -1;
3232         }
3233         /*
3234          * store the pid in the database:
3235          * it is not enough that the node is dmaster...
3236          */
3237         pid = getpid();
3238         data.dptr = (unsigned char *)&pid;
3239         data.dsize = sizeof(pid_t);
3240         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3241         if (ret != 0) {
3242                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3243                                   "transaction record\n"));
3244                 talloc_free(tmp_ctx);
3245                 return -1;
3246         }
3247
3248         talloc_free(rh);
3249
3250         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3251         if (ret != 0) {
3252                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3253                 talloc_free(tmp_ctx);
3254                 return -1;
3255         }
3256
3257         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3258         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3259                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3260                 talloc_free(tmp_ctx);
3261                 goto again;
3262         }
3263
3264         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3265                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3266                 talloc_free(tmp_ctx);
3267                 goto again;
3268         }
3269
3270         talloc_free(tmp_ctx);
3271
3272         return 0;
3273 }
3274
3275
3276 /* start a transaction on a database */
3277 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3278                                                        TALLOC_CTX *mem_ctx)
3279 {
3280         struct ctdb_transaction_handle *h;
3281         int ret;
3282
3283         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3284         if (h == NULL) {
3285                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3286                 return NULL;
3287         }
3288
3289         h->ctdb_db = ctdb_db;
3290
3291         ret = ctdb_transaction_fetch_start(h);
3292         if (ret != 0) {
3293                 talloc_free(h);
3294                 return NULL;
3295         }
3296
3297         talloc_set_destructor(h, ctdb_transaction_destructor);
3298
3299         return h;
3300 }
3301
3302
3303
3304 /*
3305   fetch a record inside a transaction
3306  */
3307 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3308                            TALLOC_CTX *mem_ctx, 
3309                            TDB_DATA key, TDB_DATA *data)
3310 {
3311         struct ctdb_ltdb_header header;
3312         int ret;
3313
3314         ZERO_STRUCT(header);
3315
3316         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3317         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3318                 /* record doesn't exist yet */
3319                 *data = tdb_null;
3320                 ret = 0;
3321         }
3322         
3323         if (ret != 0) {
3324                 return ret;
3325         }
3326
3327         if (!h->in_replay) {
3328                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3329                 if (h->m_all == NULL) {
3330                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3331                         return -1;
3332                 }
3333         }
3334
3335         return 0;
3336 }
3337
3338 /*
3339   stores a record inside a transaction
3340  */
3341 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3342                            TDB_DATA key, TDB_DATA data)
3343 {
3344         TALLOC_CTX *tmp_ctx = talloc_new(h);
3345         struct ctdb_ltdb_header header;
3346         TDB_DATA olddata;
3347         int ret;
3348
3349         ZERO_STRUCT(header);
3350
3351         /* we need the header so we can update the RSN */
3352         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3353         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3354                 /* the record doesn't exist - create one with us as dmaster.
3355                    This is only safe because we are in a transaction and this
3356                    is a persistent database */
3357                 ZERO_STRUCT(header);
3358         } else if (ret != 0) {
3359                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3360                 talloc_free(tmp_ctx);
3361                 return ret;
3362         }
3363
3364         if (data.dsize == olddata.dsize &&
3365             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3366                 /* save writing the same data */
3367                 talloc_free(tmp_ctx);
3368                 return 0;
3369         }
3370
3371         header.dmaster = h->ctdb_db->ctdb->pnn;
3372         header.rsn++;
3373
3374         if (!h->in_replay) {
3375                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3376                 if (h->m_all == NULL) {
3377                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3378                         talloc_free(tmp_ctx);
3379                         return -1;
3380                 }
3381         }               
3382
3383         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3384         if (h->m_write == NULL) {
3385                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3386                 talloc_free(tmp_ctx);
3387                 return -1;
3388         }
3389         
3390         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3391
3392         talloc_free(tmp_ctx);
3393         
3394         return ret;
3395 }
3396
3397 /*
3398   replay a transaction
3399  */
3400 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3401 {
3402         int ret, i;
3403         struct ctdb_rec_data *rec = NULL;
3404
3405         h->in_replay = true;
3406         talloc_free(h->m_write);
3407         h->m_write = NULL;
3408
3409         ret = ctdb_transaction_fetch_start(h);
3410         if (ret != 0) {
3411                 return ret;
3412         }
3413
3414         for (i=0;i<h->m_all->count;i++) {
3415                 TDB_DATA key, data;
3416
3417                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3418                 if (rec == NULL) {
3419                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3420                         goto failed;
3421                 }
3422
3423                 if (rec->reqid == 0) {
3424                         /* its a store */
3425                         if (ctdb_transaction_store(h, key, data) != 0) {
3426                                 goto failed;
3427                         }
3428                 } else {
3429                         TDB_DATA data2;
3430                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3431
3432                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3433                                 talloc_free(tmp_ctx);
3434                                 goto failed;
3435                         }
3436                         if (data2.dsize != data.dsize ||
3437                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3438                                 /* the record has changed on us - we have to give up */
3439                                 talloc_free(tmp_ctx);
3440                                 goto failed;
3441                         }
3442                         talloc_free(tmp_ctx);
3443                 }
3444         }
3445         
3446         return 0;
3447
3448 failed:
3449         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3450         return -1;
3451 }
3452
3453
3454 /*
3455   commit a transaction
3456  */
3457 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3458 {
3459         int ret, retries=0;
3460         int32_t status;
3461         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3462         struct timeval timeout;
3463         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3464
3465         talloc_set_destructor(h, NULL);
3466
3467         /* our commit strategy is quite complex.
3468
3469            - we first try to commit the changes to all other nodes
3470
3471            - if that works, then we commit locally and we are done
3472
3473            - if a commit on another node fails, then we need to cancel
3474              the transaction, then restart the transaction (thus
3475              opening a window of time for a pending recovery to
3476              complete), then replay the transaction, checking all the
3477              reads and writes (checking that reads give the same data,
3478              and writes succeed). Then we retry the transaction to the
3479              other nodes
3480         */
3481
3482 again:
3483         if (h->m_write == NULL) {
3484                 /* no changes were made */
3485                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3486                 talloc_free(h);
3487                 return 0;
3488         }
3489
3490         /* tell ctdbd to commit to the other nodes */
3491         timeout = timeval_current_ofs(1, 0);
3492         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3493                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3494                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3495                            &timeout, NULL);
3496         if (ret != 0 || status != 0) {
3497                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3498                 sleep(1);
3499
3500                 if (ret != 0) {
3501                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3502                 } else {
3503                         /* work out what error code we will give if we 
3504                            have to fail the operation */
3505                         switch ((enum ctdb_trans2_commit_error)status) {
3506                         case CTDB_TRANS2_COMMIT_SUCCESS:
3507                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3508                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3509                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3510                                 break;
3511                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3512                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3513                                 break;
3514                         }
3515                 }
3516
3517                 if (++retries == 10) {
3518                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3519                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3520                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3521                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3522                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3523                         talloc_free(h);
3524                         return -1;
3525                 }               
3526
3527                 if (ctdb_replay_transaction(h) != 0) {
3528                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3529                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3530                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3531                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3532                         talloc_free(h);
3533                         return -1;
3534                 }
3535                 goto again;
3536         } else {
3537                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3538         }
3539
3540         /* do the real commit locally */
3541         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3542         if (ret != 0) {
3543                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3544                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3545                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3546                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3547                 talloc_free(h);
3548                 return ret;
3549         }
3550
3551         /* tell ctdbd that we are finished with our local commit */
3552         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3553                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3554                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3555         talloc_free(h);
3556         return 0;
3557 }
3558
3559 /*
3560   recovery daemon ping to main daemon
3561  */
3562 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3563 {
3564         int ret;
3565         int32_t res;
3566
3567         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3568                            ctdb, NULL, &res, NULL, NULL);
3569         if (ret != 0 || res != 0) {
3570                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3571                 return -1;
3572         }
3573
3574         return 0;
3575 }
3576
3577 /* when forking the main daemon and the child process needs to connect back
3578  * to the daemon as a client process, this function can be used to change
3579  * the ctdb context from daemon into client mode
3580  */
3581 int switch_from_server_to_client(struct ctdb_context *ctdb)
3582 {
3583         int ret;
3584
3585         /* shutdown the transport */
3586         if (ctdb->methods) {
3587                 ctdb->methods->shutdown(ctdb);
3588         }
3589
3590         /* get a new event context */
3591         talloc_free(ctdb->ev);
3592         ctdb->ev = event_context_init(ctdb);
3593
3594         close(ctdb->daemon.sd);
3595         ctdb->daemon.sd = -1;
3596
3597         /* the client does not need to be realtime */
3598         if (ctdb->do_setsched) {
3599                 ctdb_restore_scheduler(ctdb);
3600         }
3601
3602         /* initialise ctdb */
3603         ret = ctdb_socket_connect(ctdb);
3604         if (ret != 0) {
3605                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3606                 return -1;
3607         }
3608
3609          return 0;
3610 }
3611
3612 /*
3613   tell the main daemon we are starting a new monitor event script
3614  */
3615 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3616 {
3617         int ret;
3618         int32_t res;
3619
3620         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3621                            ctdb, NULL, &res, NULL, NULL);
3622         if (ret != 0 || res != 0) {
3623                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3624                 return -1;
3625         }
3626
3627         return 0;
3628 }
3629
3630 /*
3631   tell the main daemon we are starting a new monitor event script
3632  */
3633 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3634 {
3635         int ret;
3636         int32_t res;
3637
3638         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3639                            ctdb, NULL, &res, NULL, NULL);
3640         if (ret != 0 || res != 0) {
3641                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3642                 return -1;
3643         }
3644
3645         return 0;
3646 }
3647
3648 /*
3649   tell the main daemon we are starting to run an eventscript
3650  */
3651 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3652 {
3653         int ret;
3654         int32_t res;
3655         TDB_DATA data;
3656
3657         data.dptr = discard_const(name);
3658         data.dsize = strlen(name)+1;
3659
3660         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3661                            ctdb, NULL, &res, NULL, NULL);
3662         if (ret != 0 || res != 0) {
3663                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3664                 return -1;
3665         }
3666
3667         return 0;
3668 }
3669
3670 /*
3671   tell the main daemon the status of the script we ran
3672  */
3673 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3674 {
3675         int ret;
3676         int32_t res;
3677         TDB_DATA data;
3678
3679         data.dptr = (uint8_t *)&result;
3680         data.dsize = sizeof(result);
3681
3682         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3683                            ctdb, NULL, &res, NULL, NULL);
3684         if (ret != 0 || res != 0) {
3685                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3686                 return -1;
3687         }
3688
3689         return 0;
3690 }
3691
3692 /*
3693   tell the main daemon a script was disabled
3694  */
3695 int ctdb_ctrl_event_script_disabled(struct ctdb_context *ctdb, const char *name)
3696 {
3697         int ret;
3698         int32_t res;
3699         TDB_DATA data;
3700
3701         data.dptr = discard_const(name);
3702         data.dsize = strlen(name)+1;
3703
3704         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_DISABLED, 0, data, 
3705                            ctdb, NULL, &res, NULL, NULL);
3706         if (ret != 0 || res != 0) {
3707                 DEBUG(DEBUG_ERR,("Failed to send event_script_disabeld\n"));
3708                 return -1;
3709         }
3710
3711         return 0;
3712 }
3713
3714 /*
3715   get the status of running the monitor eventscripts
3716  */
3717 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3718                 struct timeval timeout, uint32_t destnode, 
3719                 TALLOC_CTX *mem_ctx,
3720                 struct ctdb_monitoring_wire **script_status)
3721 {
3722         int ret;
3723         TDB_DATA outdata;
3724         int32_t res;
3725
3726         ret = ctdb_control(ctdb, destnode, 0, 
3727                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3728                            mem_ctx, &outdata, &res, &timeout, NULL);
3729         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3730                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3731                 return -1;
3732         }
3733
3734         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3735         talloc_free(outdata.dptr);
3736                     
3737         return 0;
3738 }
3739
3740 /*
3741   tell the main daemon how long it took to lock the reclock file
3742  */
3743 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3744 {
3745         int ret;
3746         int32_t res;
3747         TDB_DATA data;
3748
3749         data.dptr = (uint8_t *)&latency;
3750         data.dsize = sizeof(latency);
3751
3752         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3753                            ctdb, NULL, &res, NULL, NULL);
3754         if (ret != 0 || res != 0) {
3755                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3756                 return -1;
3757         }
3758
3759         return 0;
3760 }
3761
3762 /*
3763   get the name of the reclock file
3764  */
3765 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3766                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3767                          const char **name)
3768 {
3769         int ret;
3770         int32_t res;
3771         TDB_DATA data;
3772
3773         ret = ctdb_control(ctdb, destnode, 0, 
3774                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3775                            mem_ctx, &data, &res, &timeout, NULL);
3776         if (ret != 0 || res != 0) {
3777                 return -1;
3778         }
3779
3780         if (data.dsize == 0) {
3781                 *name = NULL;
3782         } else {
3783                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3784         }
3785         talloc_free(data.dptr);
3786
3787         return 0;
3788 }
3789
3790 /*
3791   set the reclock filename for a node
3792  */
3793 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3794 {
3795         int ret;
3796         TDB_DATA data;
3797         int32_t res;
3798
3799         if (reclock == NULL) {
3800                 data.dsize = 0;
3801                 data.dptr  = NULL;
3802         } else {
3803                 data.dsize = strlen(reclock) + 1;
3804                 data.dptr  = discard_const(reclock);
3805         }
3806
3807         ret = ctdb_control(ctdb, destnode, 0, 
3808                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3809                            NULL, NULL, &res, &timeout, NULL);
3810         if (ret != 0 || res != 0) {
3811                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3812                 return -1;
3813         }
3814
3815         return 0;
3816 }
3817
3818 /*
3819   stop a node
3820  */
3821 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3822 {
3823         int ret;
3824         int32_t res;
3825
3826         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3827                            ctdb, NULL, &res, &timeout, NULL);
3828         if (ret != 0 || res != 0) {
3829                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3830                 return -1;
3831         }
3832
3833         return 0;
3834 }
3835
3836 /*
3837   continue a node
3838  */
3839 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3840 {
3841         int ret;
3842
3843         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3844                            ctdb, NULL, NULL, &timeout, NULL);
3845         if (ret != 0) {
3846                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3847                 return -1;
3848         }
3849
3850         return 0;
3851 }
3852
3853 /*
3854   set the natgw state for a node
3855  */
3856 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3857 {
3858         int ret;
3859         TDB_DATA data;
3860         int32_t res;
3861
3862         data.dsize = sizeof(natgwstate);
3863         data.dptr  = (uint8_t *)&natgwstate;
3864
3865         ret = ctdb_control(ctdb, destnode, 0, 
3866                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3867                            NULL, NULL, &res, &timeout, NULL);
3868         if (ret != 0 || res != 0) {
3869                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3870                 return -1;
3871         }
3872
3873         return 0;
3874 }
3875
3876 /*
3877   set the lmaster role for a node
3878  */
3879 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3880 {
3881         int ret;
3882         TDB_DATA data;
3883         int32_t res;
3884
3885         data.dsize = sizeof(lmasterrole);
3886         data.dptr  = (uint8_t *)&lmasterrole;
3887
3888         ret = ctdb_control(ctdb, destnode, 0, 
3889                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3890                            NULL, NULL, &res, &timeout, NULL);
3891         if (ret != 0 || res != 0) {
3892                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3893                 return -1;
3894         }
3895
3896         return 0;
3897 }
3898
3899 /*
3900   set the recmaster role for a node
3901  */
3902 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3903 {
3904         int ret;
3905         TDB_DATA data;
3906         int32_t res;
3907
3908         data.dsize = sizeof(recmasterrole);
3909         data.dptr  = (uint8_t *)&recmasterrole;
3910
3911         ret = ctdb_control(ctdb, destnode, 0, 
3912                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3913                            NULL, NULL, &res, &timeout, NULL);
3914         if (ret != 0 || res != 0) {
3915                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3916                 return -1;
3917         }
3918
3919         return 0;
3920 }
3921
3922 /* enable an eventscript
3923  */
3924 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3925 {
3926         int ret;
3927         TDB_DATA data;
3928         int32_t res;
3929
3930         data.dsize = strlen(script) + 1;
3931         data.dptr  = discard_const(script);
3932
3933         ret = ctdb_control(ctdb, destnode, 0, 
3934                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3935                            NULL, NULL, &res, &timeout, NULL);
3936         if (ret != 0 || res != 0) {
3937                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3938                 return -1;
3939         }
3940
3941         return 0;
3942 }
3943
3944 /* disable an eventscript
3945  */
3946 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3947 {
3948         int ret;
3949         TDB_DATA data;
3950         int32_t res;
3951
3952         data.dsize = strlen(script) + 1;
3953         data.dptr  = discard_const(script);
3954
3955         ret = ctdb_control(ctdb, destnode, 0, 
3956                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3957                            NULL, NULL, &res, &timeout, NULL);
3958         if (ret != 0 || res != 0) {
3959                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3960                 return -1;
3961         }
3962
3963         return 0;
3964 }
3965
3966
3967 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3968 {
3969         int ret;
3970         TDB_DATA data;
3971         int32_t res;
3972
3973         data.dsize = sizeof(*bantime);
3974         data.dptr  = (uint8_t *)bantime;
3975
3976         ret = ctdb_control(ctdb, destnode, 0, 
3977                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3978                            NULL, NULL, &res, &timeout, NULL);
3979         if (ret != 0 || res != 0) {
3980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3981                 return -1;
3982         }
3983
3984         return 0;
3985 }
3986
3987
3988 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3989 {
3990         int ret;
3991         TDB_DATA outdata;
3992         int32_t res;
3993         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3994
3995         ret = ctdb_control(ctdb, destnode, 0, 
3996                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3997                            tmp_ctx, &outdata, &res, &timeout, NULL);
3998         if (ret != 0 || res != 0) {
3999                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4000                 talloc_free(tmp_ctx);
4001                 return -1;
4002         }
4003
4004         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4005         talloc_free(tmp_ctx);
4006
4007         return 0;
4008 }
4009
4010
4011 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4012 {
4013         int ret;
4014         int32_t res;
4015         TDB_DATA data;
4016         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4017
4018         data.dptr = (uint8_t*)db_prio;
4019         data.dsize = sizeof(*db_prio);
4020
4021         ret = ctdb_control(ctdb, destnode, 0, 
4022                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4023                            tmp_ctx, NULL, &res, &timeout, NULL);
4024         if (ret != 0 || res != 0) {
4025                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4026                 talloc_free(tmp_ctx);
4027                 return -1;
4028         }
4029
4030         talloc_free(tmp_ctx);
4031
4032         return 0;
4033 }
4034
4035 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4036 {
4037         int ret;
4038         int32_t res;
4039         TDB_DATA data;
4040         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4041
4042         data.dptr = (uint8_t*)&db_id;
4043         data.dsize = sizeof(db_id);
4044
4045         ret = ctdb_control(ctdb, destnode, 0, 
4046                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4047                            tmp_ctx, NULL, &res, &timeout, NULL);
4048         if (ret != 0 || res < 0) {
4049                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4050                 talloc_free(tmp_ctx);
4051                 return -1;
4052         }
4053
4054         if (priority) {
4055                 *priority = res;
4056         }
4057
4058         talloc_free(tmp_ctx);
4059
4060         return 0;
4061 }