0b8cd5ba2e3e6e94ff0d63c7febb7379cbc073f3
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, uint32_t caller)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         set_nonblocking(ctdb->daemon.sd);
273         set_close_on_exec(ctdb->daemon.sd);
274         
275         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
276                 close(ctdb->daemon.sd);
277                 ctdb->daemon.sd = -1;
278                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
279                 return -1;
280         }
281
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb);
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return 0;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
372
373         return state;
374 }
375
376 /*
377   make a ctdb call to the local daemon - async send. Called from client context.
378
379   This constructs a ctdb_call request and queues it for processing. 
380   This call never blocks.
381 */
382 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
383                                               struct ctdb_call *call)
384 {
385         struct ctdb_client_call_state *state;
386         struct ctdb_context *ctdb = ctdb_db->ctdb;
387         struct ctdb_ltdb_header header;
388         TDB_DATA data;
389         int ret;
390         size_t len;
391         struct ctdb_req_call *c;
392
393         /* if the domain socket is not yet open, open it */
394         if (ctdb->daemon.sd==-1) {
395                 ctdb_socket_connect(ctdb);
396         }
397
398         ret = ctdb_ltdb_lock(ctdb_db, call->key);
399         if (ret != 0) {
400                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
401                 return NULL;
402         }
403
404         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
405
406         if (ret == 0 && header.dmaster == ctdb->pnn) {
407                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
408                 talloc_free(data.dptr);
409                 ctdb_ltdb_unlock(ctdb_db, call->key);
410                 return state;
411         }
412
413         ctdb_ltdb_unlock(ctdb_db, call->key);
414         talloc_free(data.dptr);
415
416         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
417         if (state == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
419                 return NULL;
420         }
421         state->call = talloc_zero(state, struct ctdb_call);
422         if (state->call == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
424                 return NULL;
425         }
426
427         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
428         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
429         if (c == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
431                 return NULL;
432         }
433
434         state->reqid     = ctdb_reqid_new(ctdb, state);
435         state->ctdb_db = ctdb_db;
436         talloc_set_destructor(state, ctdb_client_call_destructor);
437
438         c->hdr.reqid     = state->reqid;
439         c->flags         = call->flags;
440         c->db_id         = ctdb_db->db_id;
441         c->callid        = call->call_id;
442         c->hopcount      = 0;
443         c->keylen        = call->key.dsize;
444         c->calldatalen   = call->call_data.dsize;
445         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
446         memcpy(&c->data[call->key.dsize], 
447                call->call_data.dptr, call->call_data.dsize);
448         *(state->call)              = *call;
449         state->call->call_data.dptr = &c->data[call->key.dsize];
450         state->call->key.dptr       = &c->data[0];
451
452         state->state  = CTDB_CALL_WAIT;
453
454
455         ctdb_client_queue_pkt(ctdb, &c->hdr);
456
457         return state;
458 }
459
460
461 /*
462   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
463 */
464 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
465 {
466         struct ctdb_client_call_state *state;
467
468         state = ctdb_call_send(ctdb_db, call);
469         return ctdb_call_recv(state, call);
470 }
471
472
473 /*
474   tell the daemon what messaging srvid we will use, and register the message
475   handler function in the client
476 */
477 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
478                              ctdb_message_fn_t handler,
479                              void *private_data)
480                                     
481 {
482         int res;
483         int32_t status;
484         
485         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
486                            tdb_null, NULL, NULL, &status, NULL, NULL);
487         if (res != 0 || status != 0) {
488                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
489                 return -1;
490         }
491
492         /* also need to register the handler with our own ctdb structure */
493         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
494 }
495
496 /*
497   tell the daemon we no longer want a srvid
498 */
499 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         ctdb_deregister_message_handler(ctdb, srvid, private_data);
513         return 0;
514 }
515
516
517 /*
518   send a message - from client context
519  */
520 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
521                       uint64_t srvid, TDB_DATA data)
522 {
523         struct ctdb_req_message *r;
524         int len, res;
525
526         len = offsetof(struct ctdb_req_message, data) + data.dsize;
527         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
528                                len, struct ctdb_req_message);
529         CTDB_NO_MEMORY(ctdb, r);
530
531         r->hdr.destnode  = pnn;
532         r->srvid         = srvid;
533         r->datalen       = data.dsize;
534         memcpy(&r->data[0], data.dptr, data.dsize);
535         
536         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
537         if (res != 0) {
538                 return res;
539         }
540
541         talloc_free(r);
542         return 0;
543 }
544
545
546 /*
547   cancel a ctdb_fetch_lock operation, releasing the lock
548  */
549 static int fetch_lock_destructor(struct ctdb_record_handle *h)
550 {
551         ctdb_ltdb_unlock(h->ctdb_db, h->key);
552         return 0;
553 }
554
555 /*
556   force the migration of a record to this node
557  */
558 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
559 {
560         struct ctdb_call call;
561         ZERO_STRUCT(call);
562         call.call_id = CTDB_NULL_FUNC;
563         call.key = key;
564         call.flags = CTDB_IMMEDIATE_MIGRATION;
565         return ctdb_call(ctdb_db, &call);
566 }
567
568 /*
569   get a lock on a record, and return the records data. Blocks until it gets the lock
570  */
571 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
572                                            TDB_DATA key, TDB_DATA *data)
573 {
574         int ret;
575         struct ctdb_record_handle *h;
576
577         /*
578           procedure is as follows:
579
580           1) get the chain lock. 
581           2) check if we are dmaster
582           3) if we are the dmaster then return handle 
583           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
584              reply from ctdbd
585           5) when we get the reply, goto (1)
586          */
587
588         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
589         if (h == NULL) {
590                 return NULL;
591         }
592
593         h->ctdb_db = ctdb_db;
594         h->key     = key;
595         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
596         if (h->key.dptr == NULL) {
597                 talloc_free(h);
598                 return NULL;
599         }
600         h->data    = data;
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
603                  (const char *)key.dptr));
604
605 again:
606         /* step 1 - get the chain lock */
607         ret = ctdb_ltdb_lock(ctdb_db, key);
608         if (ret != 0) {
609                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
610                 talloc_free(h);
611                 return NULL;
612         }
613
614         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
615
616         talloc_set_destructor(h, fetch_lock_destructor);
617
618         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
619
620         /* when torturing, ensure we test the remote path */
621         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
622             random() % 5 == 0) {
623                 h->header.dmaster = (uint32_t)-1;
624         }
625
626
627         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
628
629         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
630                 ctdb_ltdb_unlock(ctdb_db, key);
631                 ret = ctdb_client_force_migration(ctdb_db, key);
632                 if (ret != 0) {
633                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
634                         talloc_free(h);
635                         return NULL;
636                 }
637                 goto again;
638         }
639
640         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
641         return h;
642 }
643
644 /*
645   store some data to the record that was locked with ctdb_fetch_lock()
646 */
647 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
648 {
649         if (h->ctdb_db->persistent) {
650                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
651                 return -1;
652         }
653
654         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655 }
656
657 /*
658   non-locking fetch of a record
659  */
660 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
661                TDB_DATA key, TDB_DATA *data)
662 {
663         struct ctdb_call call;
664         int ret;
665
666         call.call_id = CTDB_FETCH_FUNC;
667         call.call_data.dptr = NULL;
668         call.call_data.dsize = 0;
669
670         ret = ctdb_call(ctdb_db, &call);
671
672         if (ret == 0) {
673                 *data = call.reply_data;
674                 talloc_steal(mem_ctx, data->dptr);
675         }
676
677         return ret;
678 }
679
680
681
682 /*
683    called when a control completes or timesout to invoke the callback
684    function the user provided
685 */
686 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
687         struct timeval t, void *private_data)
688 {
689         struct ctdb_client_control_state *state;
690         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
691         int ret;
692
693         state = talloc_get_type(private_data, struct ctdb_client_control_state);
694         talloc_steal(tmp_ctx, state);
695
696         ret = ctdb_control_recv(state->ctdb, state, state,
697                         NULL, 
698                         NULL, 
699                         NULL);
700
701         talloc_free(tmp_ctx);
702 }
703
704 /*
705   called when a CTDB_REPLY_CONTROL packet comes in in the client
706
707   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
708   contains any reply data from the control
709 */
710 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
711                                       struct ctdb_req_header *hdr)
712 {
713         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
714         struct ctdb_client_control_state *state;
715
716         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
717         if (state == NULL) {
718                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
719                 return;
720         }
721
722         if (hdr->reqid != state->reqid) {
723                 /* we found a record  but it was the wrong one */
724                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
725                 return;
726         }
727
728         state->outdata.dptr = c->data;
729         state->outdata.dsize = c->datalen;
730         state->status = c->status;
731         if (c->errorlen) {
732                 state->errormsg = talloc_strndup(state, 
733                                                  (char *)&c->data[c->datalen], 
734                                                  c->errorlen);
735         }
736
737         /* state->outdata now uses resources from c so we dont want c
738            to just dissappear from under us while state is still alive
739         */
740         talloc_steal(state, c);
741
742         state->state = CTDB_CONTROL_DONE;
743
744         /* if we had a callback registered for this control, pull the response
745            and call the callback.
746         */
747         if (state->async.fn) {
748                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
749         }
750 }
751
752
753 /*
754   destroy a ctdb_control in client
755 */
756 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
757 {
758         ctdb_reqid_remove(state->ctdb, state->reqid);
759         return 0;
760 }
761
762
763 /* time out handler for ctdb_control */
764 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
765         struct timeval t, void *private_data)
766 {
767         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
768
769         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
770                          "dstnode:%u\n", state->reqid, state->c->opcode,
771                          state->c->hdr.destnode));
772
773         state->state = CTDB_CONTROL_TIMEOUT;
774
775         /* if we had a callback registered for this control, pull the response
776            and call the callback.
777         */
778         if (state->async.fn) {
779                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
780         }
781 }
782
783 /* async version of send control request */
784 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
785                 uint32_t destnode, uint64_t srvid, 
786                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
787                 TALLOC_CTX *mem_ctx,
788                 struct timeval *timeout,
789                 char **errormsg)
790 {
791         struct ctdb_client_control_state *state;
792         size_t len;
793         struct ctdb_req_control *c;
794         int ret;
795
796         if (errormsg) {
797                 *errormsg = NULL;
798         }
799
800         /* if the domain socket is not yet open, open it */
801         if (ctdb->daemon.sd==-1) {
802                 ctdb_socket_connect(ctdb);
803         }
804
805         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
806         CTDB_NO_MEMORY_NULL(ctdb, state);
807
808         state->ctdb       = ctdb;
809         state->reqid      = ctdb_reqid_new(ctdb, state);
810         state->state      = CTDB_CONTROL_WAIT;
811         state->errormsg   = NULL;
812
813         talloc_set_destructor(state, ctdb_control_destructor);
814
815         len = offsetof(struct ctdb_req_control, data) + data.dsize;
816         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
817                                len, struct ctdb_req_control);
818         state->c            = c;        
819         CTDB_NO_MEMORY_NULL(ctdb, c);
820         c->hdr.reqid        = state->reqid;
821         c->hdr.destnode     = destnode;
822         c->opcode           = opcode;
823         c->client_id        = 0;
824         c->flags            = flags;
825         c->srvid            = srvid;
826         c->datalen          = data.dsize;
827         if (data.dsize) {
828                 memcpy(&c->data[0], data.dptr, data.dsize);
829         }
830
831         /* timeout */
832         if (timeout && !timeval_is_zero(timeout)) {
833                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
834         }
835
836         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
837         if (ret != 0) {
838                 talloc_free(state);
839                 return NULL;
840         }
841
842         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
843                 talloc_free(state);
844                 return NULL;
845         }
846
847         return state;
848 }
849
850
851 /* async version of receive control reply */
852 int ctdb_control_recv(struct ctdb_context *ctdb, 
853                 struct ctdb_client_control_state *state, 
854                 TALLOC_CTX *mem_ctx,
855                 TDB_DATA *outdata, int32_t *status, char **errormsg)
856 {
857         TALLOC_CTX *tmp_ctx;
858
859         if (status != NULL) {
860                 *status = -1;
861         }
862         if (errormsg != NULL) {
863                 *errormsg = NULL;
864         }
865
866         if (state == NULL) {
867                 return -1;
868         }
869
870         /* prevent double free of state */
871         tmp_ctx = talloc_new(ctdb);
872         talloc_steal(tmp_ctx, state);
873
874         /* loop one event at a time until we either timeout or the control
875            completes.
876         */
877         while (state->state == CTDB_CONTROL_WAIT) {
878                 event_loop_once(ctdb->ev);
879         }
880
881         if (state->state != CTDB_CONTROL_DONE) {
882                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
883                 if (state->async.fn) {
884                         state->async.fn(state);
885                 }
886                 talloc_free(tmp_ctx);
887                 return -1;
888         }
889
890         if (state->errormsg) {
891                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
892                 if (errormsg) {
893                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
894                 }
895                 if (state->async.fn) {
896                         state->async.fn(state);
897                 }
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (outdata) {
903                 *outdata = state->outdata;
904                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
905         }
906
907         if (status) {
908                 *status = state->status;
909         }
910
911         if (state->async.fn) {
912                 state->async.fn(state);
913         }
914
915         talloc_free(tmp_ctx);
916         return 0;
917 }
918
919
920
921 /*
922   send a ctdb control message
923   timeout specifies how long we should wait for a reply.
924   if timeout is NULL we wait indefinitely
925  */
926 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
927                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
928                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
929                  struct timeval *timeout,
930                  char **errormsg)
931 {
932         struct ctdb_client_control_state *state;
933
934         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
935                         flags, data, mem_ctx,
936                         timeout, errormsg);
937         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
938                         errormsg);
939 }
940
941
942
943
944 /*
945   a process exists call. Returns 0 if process exists, -1 otherwise
946  */
947 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
948 {
949         int ret;
950         TDB_DATA data;
951         int32_t status;
952
953         data.dptr = (uint8_t*)&pid;
954         data.dsize = sizeof(pid);
955
956         ret = ctdb_control(ctdb, destnode, 0, 
957                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
958                            NULL, NULL, &status, NULL, NULL);
959         if (ret != 0) {
960                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
961                 return -1;
962         }
963
964         return status;
965 }
966
967 /*
968   get remote statistics
969  */
970 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
971 {
972         int ret;
973         TDB_DATA data;
974         int32_t res;
975
976         ret = ctdb_control(ctdb, destnode, 0, 
977                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
978                            ctdb, &data, &res, NULL, NULL);
979         if (ret != 0 || res != 0) {
980                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
981                 return -1;
982         }
983
984         if (data.dsize != sizeof(struct ctdb_statistics)) {
985                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
986                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
987                       return -1;
988         }
989
990         *status = *(struct ctdb_statistics *)data.dptr;
991         talloc_free(data.dptr);
992                         
993         return 0;
994 }
995
996 /*
997   shutdown a remote ctdb node
998  */
999 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1000 {
1001         struct ctdb_client_control_state *state;
1002
1003         state = ctdb_control_send(ctdb, destnode, 0, 
1004                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1005                            NULL, &timeout, NULL);
1006         if (state == NULL) {
1007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1008                 return -1;
1009         }
1010
1011         return 0;
1012 }
1013
1014 /*
1015   get vnn map from a remote node
1016  */
1017 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1018 {
1019         int ret;
1020         TDB_DATA outdata;
1021         int32_t res;
1022         struct ctdb_vnn_map_wire *map;
1023
1024         ret = ctdb_control(ctdb, destnode, 0, 
1025                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1026                            mem_ctx, &outdata, &res, &timeout, NULL);
1027         if (ret != 0 || res != 0) {
1028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1029                 return -1;
1030         }
1031         
1032         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1033         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1034             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1035                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1036                 return -1;
1037         }
1038
1039         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1040         CTDB_NO_MEMORY(ctdb, *vnnmap);
1041         (*vnnmap)->generation = map->generation;
1042         (*vnnmap)->size       = map->size;
1043         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1044
1045         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1046         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1047         talloc_free(outdata.dptr);
1048                     
1049         return 0;
1050 }
1051
1052
1053 /*
1054   get the recovery mode of a remote node
1055  */
1056 struct ctdb_client_control_state *
1057 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1058 {
1059         return ctdb_control_send(ctdb, destnode, 0, 
1060                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1061                            mem_ctx, &timeout, NULL);
1062 }
1063
1064 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1065 {
1066         int ret;
1067         int32_t res;
1068
1069         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1072                 return -1;
1073         }
1074
1075         if (recmode) {
1076                 *recmode = (uint32_t)res;
1077         }
1078
1079         return 0;
1080 }
1081
1082 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1083 {
1084         struct ctdb_client_control_state *state;
1085
1086         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1087         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1088 }
1089
1090
1091
1092
1093 /*
1094   set the recovery mode of a remote node
1095  */
1096 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1097 {
1098         int ret;
1099         TDB_DATA data;
1100         int32_t res;
1101
1102         data.dsize = sizeof(uint32_t);
1103         data.dptr = (unsigned char *)&recmode;
1104
1105         ret = ctdb_control(ctdb, destnode, 0, 
1106                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1107                            NULL, NULL, &res, &timeout, NULL);
1108         if (ret != 0 || res != 0) {
1109                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1110                 return -1;
1111         }
1112
1113         return 0;
1114 }
1115
1116
1117
1118 /*
1119   get the recovery master of a remote node
1120  */
1121 struct ctdb_client_control_state *
1122 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1123                         struct timeval timeout, uint32_t destnode)
1124 {
1125         return ctdb_control_send(ctdb, destnode, 0, 
1126                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1127                            mem_ctx, &timeout, NULL);
1128 }
1129
1130 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1131 {
1132         int ret;
1133         int32_t res;
1134
1135         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1136         if (ret != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1138                 return -1;
1139         }
1140
1141         if (recmaster) {
1142                 *recmaster = (uint32_t)res;
1143         }
1144
1145         return 0;
1146 }
1147
1148 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1149 {
1150         struct ctdb_client_control_state *state;
1151
1152         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1153         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1154 }
1155
1156
1157 /*
1158   set the recovery master of a remote node
1159  */
1160 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t res;
1165
1166         ZERO_STRUCT(data);
1167         data.dsize = sizeof(uint32_t);
1168         data.dptr = (unsigned char *)&recmaster;
1169
1170         ret = ctdb_control(ctdb, destnode, 0, 
1171                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1172                            NULL, NULL, &res, &timeout, NULL);
1173         if (ret != 0 || res != 0) {
1174                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1175                 return -1;
1176         }
1177
1178         return 0;
1179 }
1180
1181
1182 /*
1183   get a list of databases off a remote node
1184  */
1185 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1186                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1187 {
1188         int ret;
1189         TDB_DATA outdata;
1190         int32_t res;
1191
1192         ret = ctdb_control(ctdb, destnode, 0, 
1193                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1194                            mem_ctx, &outdata, &res, &timeout, NULL);
1195         if (ret != 0 || res != 0) {
1196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1197                 return -1;
1198         }
1199
1200         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1201         talloc_free(outdata.dptr);
1202                     
1203         return 0;
1204 }
1205
1206 /*
1207   get a list of nodes (vnn and flags ) from a remote node
1208  */
1209 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1210                 struct timeval timeout, uint32_t destnode, 
1211                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1212 {
1213         int ret;
1214         TDB_DATA outdata;
1215         int32_t res;
1216
1217         ret = ctdb_control(ctdb, destnode, 0, 
1218                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1219                            mem_ctx, &outdata, &res, &timeout, NULL);
1220         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1222                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1223         }
1224         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret, i, len;
1243         TDB_DATA outdata;
1244         struct ctdb_node_mapv4 *nodemapv4;
1245         int32_t res;
1246
1247         ret = ctdb_control(ctdb, destnode, 0, 
1248                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1249                            mem_ctx, &outdata, &res, &timeout, NULL);
1250         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1251                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1252                 return -1;
1253         }
1254
1255         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1256
1257         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1258         (*nodemap) = talloc_zero_size(mem_ctx, len);
1259         CTDB_NO_MEMORY(ctdb, (*nodemap));
1260
1261         (*nodemap)->num = nodemapv4->num;
1262         for (i=0; i<nodemapv4->num; i++) {
1263                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1264                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1265                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1266                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1267         }
1268                 
1269         talloc_free(outdata.dptr);
1270                     
1271         return 0;
1272 }
1273
1274 /*
1275   drop the transport, reload the nodes file and restart the transport
1276  */
1277 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1278                     struct timeval timeout, uint32_t destnode)
1279 {
1280         int ret;
1281         int32_t res;
1282
1283         ret = ctdb_control(ctdb, destnode, 0, 
1284                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1285                            NULL, NULL, &res, &timeout, NULL);
1286         if (ret != 0 || res != 0) {
1287                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1288                 return -1;
1289         }
1290
1291         return 0;
1292 }
1293
1294
1295 /*
1296   set vnn map on a node
1297  */
1298 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1299                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1300 {
1301         int ret;
1302         TDB_DATA data;
1303         int32_t res;
1304         struct ctdb_vnn_map_wire *map;
1305         size_t len;
1306
1307         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1308         map = talloc_size(mem_ctx, len);
1309         CTDB_NO_MEMORY(ctdb, map);
1310
1311         map->generation = vnnmap->generation;
1312         map->size = vnnmap->size;
1313         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1314         
1315         data.dsize = len;
1316         data.dptr  = (uint8_t *)map;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1323                 return -1;
1324         }
1325
1326         talloc_free(map);
1327
1328         return 0;
1329 }
1330
1331
1332 /*
1333   async send for pull database
1334  */
1335 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1336         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1337         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1338 {
1339         TDB_DATA indata;
1340         struct ctdb_control_pulldb *pull;
1341         struct ctdb_client_control_state *state;
1342
1343         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1344         CTDB_NO_MEMORY_NULL(ctdb, pull);
1345
1346         pull->db_id   = dbid;
1347         pull->lmaster = lmaster;
1348
1349         indata.dsize = sizeof(struct ctdb_control_pulldb);
1350         indata.dptr  = (unsigned char *)pull;
1351
1352         state = ctdb_control_send(ctdb, destnode, 0, 
1353                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1354                                   mem_ctx, &timeout, NULL);
1355         talloc_free(pull);
1356
1357         return state;
1358 }
1359
1360 /*
1361   async recv for pull database
1362  */
1363 int ctdb_ctrl_pulldb_recv(
1364         struct ctdb_context *ctdb, 
1365         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1366         TDB_DATA *outdata)
1367 {
1368         int ret;
1369         int32_t res;
1370
1371         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1372         if ( (ret != 0) || (res != 0) ){
1373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1374                 return -1;
1375         }
1376
1377         return 0;
1378 }
1379
1380 /*
1381   pull all keys and records for a specific database on a node
1382  */
1383 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1384                 uint32_t dbid, uint32_t lmaster, 
1385                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1386                 TDB_DATA *outdata)
1387 {
1388         struct ctdb_client_control_state *state;
1389
1390         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1391                                       timeout);
1392         
1393         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1394 }
1395
1396
1397 /*
1398   change dmaster for all keys in the database to the new value
1399  */
1400 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1401                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1402 {
1403         int ret;
1404         TDB_DATA indata;
1405         int32_t res;
1406
1407         indata.dsize = 2*sizeof(uint32_t);
1408         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1409
1410         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1411         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424 /*
1425   ping a node, return number of clients connected
1426  */
1427 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1428 {
1429         int ret;
1430         int32_t res;
1431
1432         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1433                            tdb_null, NULL, NULL, &res, NULL, NULL);
1434         if (ret != 0) {
1435                 return -1;
1436         }
1437         return res;
1438 }
1439
1440 /*
1441   find the real path to a ltdb 
1442  */
1443 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1444                    const char **path)
1445 {
1446         int ret;
1447         int32_t res;
1448         TDB_DATA data;
1449
1450         data.dptr = (uint8_t *)&dbid;
1451         data.dsize = sizeof(dbid);
1452
1453         ret = ctdb_control(ctdb, destnode, 0, 
1454                            CTDB_CONTROL_GETDBPATH, 0, data, 
1455                            mem_ctx, &data, &res, &timeout, NULL);
1456         if (ret != 0 || res != 0) {
1457                 return -1;
1458         }
1459
1460         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1461         if ((*path) == NULL) {
1462                 return -1;
1463         }
1464
1465         talloc_free(data.dptr);
1466
1467         return 0;
1468 }
1469
1470 /*
1471   find the name of a db 
1472  */
1473 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1474                    const char **name)
1475 {
1476         int ret;
1477         int32_t res;
1478         TDB_DATA data;
1479
1480         data.dptr = (uint8_t *)&dbid;
1481         data.dsize = sizeof(dbid);
1482
1483         ret = ctdb_control(ctdb, destnode, 0, 
1484                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1485                            mem_ctx, &data, &res, &timeout, NULL);
1486         if (ret != 0 || res != 0) {
1487                 return -1;
1488         }
1489
1490         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1491         if ((*name) == NULL) {
1492                 return -1;
1493         }
1494
1495         talloc_free(data.dptr);
1496
1497         return 0;
1498 }
1499
1500 /*
1501   create a database
1502  */
1503 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1504                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1505 {
1506         int ret;
1507         int32_t res;
1508         TDB_DATA data;
1509
1510         data.dptr = discard_const(name);
1511         data.dsize = strlen(name)+1;
1512
1513         ret = ctdb_control(ctdb, destnode, 0, 
1514                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1515                            0, data, 
1516                            mem_ctx, &data, &res, &timeout, NULL);
1517
1518         if (ret != 0 || res != 0) {
1519                 return -1;
1520         }
1521
1522         return 0;
1523 }
1524
1525 /*
1526   get debug level on a node
1527  */
1528 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1529 {
1530         int ret;
1531         int32_t res;
1532         TDB_DATA data;
1533
1534         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1535                            ctdb, &data, &res, NULL, NULL);
1536         if (ret != 0 || res != 0) {
1537                 return -1;
1538         }
1539         if (data.dsize != sizeof(int32_t)) {
1540                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1541                          (unsigned)data.dsize));
1542                 return -1;
1543         }
1544         *level = *(int32_t *)data.dptr;
1545         talloc_free(data.dptr);
1546         return 0;
1547 }
1548
1549 /*
1550   set debug level on a node
1551  */
1552 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1553 {
1554         int ret;
1555         int32_t res;
1556         TDB_DATA data;
1557
1558         data.dptr = (uint8_t *)&level;
1559         data.dsize = sizeof(level);
1560
1561         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1562                            NULL, NULL, &res, NULL, NULL);
1563         if (ret != 0 || res != 0) {
1564                 return -1;
1565         }
1566         return 0;
1567 }
1568
1569
1570 /*
1571   get a list of connected nodes
1572  */
1573 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1574                                 struct timeval timeout,
1575                                 TALLOC_CTX *mem_ctx,
1576                                 uint32_t *num_nodes)
1577 {
1578         struct ctdb_node_map *map=NULL;
1579         int ret, i;
1580         uint32_t *nodes;
1581
1582         *num_nodes = 0;
1583
1584         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1585         if (ret != 0) {
1586                 return NULL;
1587         }
1588
1589         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1590         if (nodes == NULL) {
1591                 return NULL;
1592         }
1593
1594         for (i=0;i<map->num;i++) {
1595                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1596                         nodes[*num_nodes] = map->nodes[i].pnn;
1597                         (*num_nodes)++;
1598                 }
1599         }
1600
1601         return nodes;
1602 }
1603
1604
1605 /*
1606   reset remote status
1607  */
1608 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1609 {
1610         int ret;
1611         int32_t res;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1615                            NULL, NULL, &res, NULL, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1618                 return -1;
1619         }
1620         return 0;
1621 }
1622
1623 /*
1624   this is the dummy null procedure that all databases support
1625 */
1626 static int ctdb_null_func(struct ctdb_call_info *call)
1627 {
1628         return 0;
1629 }
1630
1631 /*
1632   this is a plain fetch procedure that all databases support
1633 */
1634 static int ctdb_fetch_func(struct ctdb_call_info *call)
1635 {
1636         call->reply_data = &call->record_data;
1637         return 0;
1638 }
1639
1640 /*
1641   attach to a specific database - client call
1642 */
1643 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1644 {
1645         struct ctdb_db_context *ctdb_db;
1646         TDB_DATA data;
1647         int ret;
1648         int32_t res;
1649
1650         ctdb_db = ctdb_db_handle(ctdb, name);
1651         if (ctdb_db) {
1652                 return ctdb_db;
1653         }
1654
1655         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1656         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1657
1658         ctdb_db->ctdb = ctdb;
1659         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1660         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1661
1662         data.dptr = discard_const(name);
1663         data.dsize = strlen(name)+1;
1664
1665         /* tell ctdb daemon to attach */
1666         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1667                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1668                            0, data, ctdb_db, &data, &res, NULL, NULL);
1669         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1670                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1671                 talloc_free(ctdb_db);
1672                 return NULL;
1673         }
1674         
1675         ctdb_db->db_id = *(uint32_t *)data.dptr;
1676         talloc_free(data.dptr);
1677
1678         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1679         if (ret != 0) {
1680                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1681                 talloc_free(ctdb_db);
1682                 return NULL;
1683         }
1684
1685         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1686         if (!ctdb->do_setsched) {
1687                 tdb_flags |= TDB_NOMMAP;
1688         }
1689
1690         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1691         if (ctdb_db->ltdb == NULL) {
1692                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1693                 talloc_free(ctdb_db);
1694                 return NULL;
1695         }
1696
1697         ctdb_db->persistent = persistent;
1698
1699         DLIST_ADD(ctdb->db_list, ctdb_db);
1700
1701         /* add well known functions */
1702         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1703         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1704
1705         return ctdb_db;
1706 }
1707
1708
1709 /*
1710   setup a call for a database
1711  */
1712 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1713 {
1714         struct ctdb_registered_call *call;
1715
1716 #if 0
1717         TDB_DATA data;
1718         int32_t status;
1719         struct ctdb_control_set_call c;
1720         int ret;
1721
1722         /* this is no longer valid with the separate daemon architecture */
1723         c.db_id = ctdb_db->db_id;
1724         c.fn    = fn;
1725         c.id    = id;
1726
1727         data.dptr = (uint8_t *)&c;
1728         data.dsize = sizeof(c);
1729
1730         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1731                            data, NULL, NULL, &status, NULL, NULL);
1732         if (ret != 0 || status != 0) {
1733                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1734                 return -1;
1735         }
1736 #endif
1737
1738         /* also register locally */
1739         call = talloc(ctdb_db, struct ctdb_registered_call);
1740         call->fn = fn;
1741         call->id = id;
1742
1743         DLIST_ADD(ctdb_db->calls, call);        
1744         return 0;
1745 }
1746
1747
1748 struct traverse_state {
1749         bool done;
1750         uint32_t count;
1751         ctdb_traverse_func fn;
1752         void *private_data;
1753 };
1754
1755 /*
1756   called on each key during a ctdb_traverse
1757  */
1758 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1759 {
1760         struct traverse_state *state = (struct traverse_state *)p;
1761         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1762         TDB_DATA key;
1763
1764         if (data.dsize < sizeof(uint32_t) ||
1765             d->length != data.dsize) {
1766                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1767                 state->done = True;
1768                 return;
1769         }
1770
1771         key.dsize = d->keylen;
1772         key.dptr  = &d->data[0];
1773         data.dsize = d->datalen;
1774         data.dptr = &d->data[d->keylen];
1775
1776         if (key.dsize == 0 && data.dsize == 0) {
1777                 /* end of traverse */
1778                 state->done = True;
1779                 return;
1780         }
1781
1782         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1783                 /* empty records are deleted records in ctdb */
1784                 return;
1785         }
1786
1787         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1788                 state->done = True;
1789         }
1790
1791         state->count++;
1792 }
1793
1794
1795 /*
1796   start a cluster wide traverse, calling the supplied fn on each record
1797   return the number of records traversed, or -1 on error
1798  */
1799 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1800 {
1801         TDB_DATA data;
1802         struct ctdb_traverse_start t;
1803         int32_t status;
1804         int ret;
1805         uint64_t srvid = (getpid() | 0xFLL<<60);
1806         struct traverse_state state;
1807
1808         state.done = False;
1809         state.count = 0;
1810         state.private_data = private_data;
1811         state.fn = fn;
1812
1813         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1816                 return -1;
1817         }
1818
1819         t.db_id = ctdb_db->db_id;
1820         t.srvid = srvid;
1821         t.reqid = 0;
1822
1823         data.dptr = (uint8_t *)&t;
1824         data.dsize = sizeof(t);
1825
1826         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1827                            data, NULL, NULL, &status, NULL, NULL);
1828         if (ret != 0 || status != 0) {
1829                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1830                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1831                 return -1;
1832         }
1833
1834         while (!state.done) {
1835                 event_loop_once(ctdb_db->ctdb->ev);
1836         }
1837
1838         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1841                 return -1;
1842         }
1843
1844         return state.count;
1845 }
1846
1847 #define ISASCII(x) ((x>31)&&(x<128))
1848 /*
1849   called on each key during a catdb
1850  */
1851 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1852 {
1853         int i;
1854         FILE *f = (FILE *)p;
1855         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1856
1857         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1858         for (i=0;i<key.dsize;i++) {
1859                 if (ISASCII(key.dptr[i])) {
1860                         fprintf(f, "%c", key.dptr[i]);
1861                 } else {
1862                         fprintf(f, "\\%02X", key.dptr[i]);
1863                 }
1864         }
1865         fprintf(f, "\"\n");
1866
1867         fprintf(f, "dmaster: %u\n", h->dmaster);
1868         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1869
1870         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1871         for (i=sizeof(*h);i<data.dsize;i++) {
1872                 if (ISASCII(data.dptr[i])) {
1873                         fprintf(f, "%c", data.dptr[i]);
1874                 } else {
1875                         fprintf(f, "\\%02X", data.dptr[i]);
1876                 }
1877         }
1878         fprintf(f, "\"\n");
1879
1880         fprintf(f, "\n");
1881
1882         return 0;
1883 }
1884
1885 /*
1886   convenience function to list all keys to stdout
1887  */
1888 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1889 {
1890         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1891 }
1892
1893 /*
1894   get the pid of a ctdb daemon
1895  */
1896 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1897 {
1898         int ret;
1899         int32_t res;
1900
1901         ret = ctdb_control(ctdb, destnode, 0, 
1902                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1903                            NULL, NULL, &res, &timeout, NULL);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1906                 return -1;
1907         }
1908
1909         *pid = res;
1910
1911         return 0;
1912 }
1913
1914
1915 /*
1916   async freeze send control
1917  */
1918 struct ctdb_client_control_state *
1919 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1920 {
1921         return ctdb_control_send(ctdb, destnode, priority, 
1922                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1923                            mem_ctx, &timeout, NULL);
1924 }
1925
1926 /* 
1927    async freeze recv control
1928 */
1929 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1930 {
1931         int ret;
1932         int32_t res;
1933
1934         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1935         if ( (ret != 0) || (res != 0) ){
1936                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1937                 return -1;
1938         }
1939
1940         return 0;
1941 }
1942
1943 /*
1944   freeze databases of a certain priority
1945  */
1946 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1947 {
1948         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1949         struct ctdb_client_control_state *state;
1950         int ret;
1951
1952         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1953         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1954         talloc_free(tmp_ctx);
1955
1956         return ret;
1957 }
1958
1959 /* Freeze all databases */
1960 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1961 {
1962         int i;
1963
1964         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1965                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1966                         return -1;
1967                 }
1968         }
1969         return 0;
1970 }
1971
1972 /*
1973   thaw databases of a certain priority
1974  */
1975 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1976 {
1977         int ret;
1978         int32_t res;
1979
1980         ret = ctdb_control(ctdb, destnode, priority, 
1981                            CTDB_CONTROL_THAW, 0, tdb_null, 
1982                            NULL, NULL, &res, &timeout, NULL);
1983         if (ret != 0 || res != 0) {
1984                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1985                 return -1;
1986         }
1987
1988         return 0;
1989 }
1990
1991 /* thaw all databases */
1992 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1993 {
1994         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1995 }
1996
1997 /*
1998   get pnn of a node, or -1
1999  */
2000 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2001 {
2002         int ret;
2003         int32_t res;
2004
2005         ret = ctdb_control(ctdb, destnode, 0, 
2006                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2007                            NULL, NULL, &res, &timeout, NULL);
2008         if (ret != 0) {
2009                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2010                 return -1;
2011         }
2012
2013         return res;
2014 }
2015
2016 /*
2017   get the monitoring mode of a remote node
2018  */
2019 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2020 {
2021         int ret;
2022         int32_t res;
2023
2024         ret = ctdb_control(ctdb, destnode, 0, 
2025                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2026                            NULL, NULL, &res, &timeout, NULL);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2029                 return -1;
2030         }
2031
2032         *monmode = res;
2033
2034         return 0;
2035 }
2036
2037
2038 /*
2039  set the monitoring mode of a remote node to active
2040  */
2041 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2042 {
2043         int ret;
2044         
2045
2046         ret = ctdb_control(ctdb, destnode, 0, 
2047                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2048                            NULL, NULL,NULL, &timeout, NULL);
2049         if (ret != 0) {
2050                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2051                 return -1;
2052         }
2053
2054         
2055
2056         return 0;
2057 }
2058
2059 /*
2060   set the monitoring mode of a remote node to disable
2061  */
2062 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2063 {
2064         int ret;
2065         
2066
2067         ret = ctdb_control(ctdb, destnode, 0, 
2068                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2069                            NULL, NULL, NULL, &timeout, NULL);
2070         if (ret != 0) {
2071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2072                 return -1;
2073         }
2074
2075         
2076
2077         return 0;
2078 }
2079
2080
2081
2082 /* 
2083   sent to a node to make it take over an ip address
2084 */
2085 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2086                           uint32_t destnode, struct ctdb_public_ip *ip)
2087 {
2088         TDB_DATA data;
2089         struct ctdb_public_ipv4 ipv4;
2090         int ret;
2091         int32_t res;
2092
2093         if (ip->addr.sa.sa_family == AF_INET) {
2094                 ipv4.pnn = ip->pnn;
2095                 ipv4.sin = ip->addr.ip;
2096
2097                 data.dsize = sizeof(ipv4);
2098                 data.dptr  = (uint8_t *)&ipv4;
2099
2100                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2101                            NULL, &res, &timeout, NULL);
2102         } else {
2103                 data.dsize = sizeof(*ip);
2104                 data.dptr  = (uint8_t *)ip;
2105
2106                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2107                            NULL, &res, &timeout, NULL);
2108         }
2109
2110         if (ret != 0 || res != 0) {
2111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2112                 return -1;
2113         }
2114
2115         return 0;       
2116 }
2117
2118
2119 /* 
2120   sent to a node to make it release an ip address
2121 */
2122 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2123                          uint32_t destnode, struct ctdb_public_ip *ip)
2124 {
2125         TDB_DATA data;
2126         struct ctdb_public_ipv4 ipv4;
2127         int ret;
2128         int32_t res;
2129
2130         if (ip->addr.sa.sa_family == AF_INET) {
2131                 ipv4.pnn = ip->pnn;
2132                 ipv4.sin = ip->addr.ip;
2133
2134                 data.dsize = sizeof(ipv4);
2135                 data.dptr  = (uint8_t *)&ipv4;
2136
2137                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2138                                    NULL, &res, &timeout, NULL);
2139         } else {
2140                 data.dsize = sizeof(*ip);
2141                 data.dptr  = (uint8_t *)ip;
2142
2143                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2144                                    NULL, &res, &timeout, NULL);
2145         }
2146
2147         if (ret != 0 || res != 0) {
2148                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2149                 return -1;
2150         }
2151
2152         return 0;       
2153 }
2154
2155
2156 /*
2157   get a tunable
2158  */
2159 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2160                           struct timeval timeout, 
2161                           uint32_t destnode,
2162                           const char *name, uint32_t *value)
2163 {
2164         struct ctdb_control_get_tunable *t;
2165         TDB_DATA data, outdata;
2166         int32_t res;
2167         int ret;
2168
2169         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2170         data.dptr  = talloc_size(ctdb, data.dsize);
2171         CTDB_NO_MEMORY(ctdb, data.dptr);
2172
2173         t = (struct ctdb_control_get_tunable *)data.dptr;
2174         t->length = strlen(name)+1;
2175         memcpy(t->name, name, t->length);
2176
2177         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2178                            &outdata, &res, &timeout, NULL);
2179         talloc_free(data.dptr);
2180         if (ret != 0 || res != 0) {
2181                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2182                 return -1;
2183         }
2184
2185         if (outdata.dsize != sizeof(uint32_t)) {
2186                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2187                 talloc_free(outdata.dptr);
2188                 return -1;
2189         }
2190         
2191         *value = *(uint32_t *)outdata.dptr;
2192         talloc_free(outdata.dptr);
2193
2194         return 0;
2195 }
2196
2197 /*
2198   set a tunable
2199  */
2200 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2201                           struct timeval timeout, 
2202                           uint32_t destnode,
2203                           const char *name, uint32_t value)
2204 {
2205         struct ctdb_control_set_tunable *t;
2206         TDB_DATA data;
2207         int32_t res;
2208         int ret;
2209
2210         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2211         data.dptr  = talloc_size(ctdb, data.dsize);
2212         CTDB_NO_MEMORY(ctdb, data.dptr);
2213
2214         t = (struct ctdb_control_set_tunable *)data.dptr;
2215         t->length = strlen(name)+1;
2216         memcpy(t->name, name, t->length);
2217         t->value = value;
2218
2219         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2220                            NULL, &res, &timeout, NULL);
2221         talloc_free(data.dptr);
2222         if (ret != 0 || res != 0) {
2223                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2224                 return -1;
2225         }
2226
2227         return 0;
2228 }
2229
2230 /*
2231   list tunables
2232  */
2233 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2234                             struct timeval timeout, 
2235                             uint32_t destnode,
2236                             TALLOC_CTX *mem_ctx,
2237                             const char ***list, uint32_t *count)
2238 {
2239         TDB_DATA outdata;
2240         int32_t res;
2241         int ret;
2242         struct ctdb_control_list_tunable *t;
2243         char *p, *s, *ptr;
2244
2245         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2246                            mem_ctx, &outdata, &res, &timeout, NULL);
2247         if (ret != 0 || res != 0) {
2248                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2249                 return -1;
2250         }
2251
2252         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2253         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2254             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2255                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2256                 talloc_free(outdata.dptr);
2257                 return -1;              
2258         }
2259         
2260         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2261         CTDB_NO_MEMORY(ctdb, p);
2262
2263         talloc_free(outdata.dptr);
2264         
2265         (*list) = NULL;
2266         (*count) = 0;
2267
2268         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2269                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2270                 CTDB_NO_MEMORY(ctdb, *list);
2271                 (*list)[*count] = talloc_strdup(*list, s);
2272                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2273                 (*count)++;
2274         }
2275
2276         talloc_free(p);
2277
2278         return 0;
2279 }
2280
2281
2282 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2283                         struct timeval timeout, uint32_t destnode, 
2284                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2285 {
2286         int ret;
2287         TDB_DATA outdata;
2288         int32_t res;
2289
2290         ret = ctdb_control(ctdb, destnode, 0, 
2291                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2292                            mem_ctx, &outdata, &res, &timeout, NULL);
2293         if (ret == 0 && res == -1) {
2294                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2295                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2296         }
2297         if (ret != 0 || res != 0) {
2298           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2299                 return -1;
2300         }
2301
2302         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2303         talloc_free(outdata.dptr);
2304                     
2305         return 0;
2306 }
2307
2308 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2309                         struct timeval timeout, uint32_t destnode, 
2310                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2311 {
2312         int ret, i, len;
2313         TDB_DATA outdata;
2314         int32_t res;
2315         struct ctdb_all_public_ipsv4 *ipsv4;
2316
2317         ret = ctdb_control(ctdb, destnode, 0, 
2318                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2319                            mem_ctx, &outdata, &res, &timeout, NULL);
2320         if (ret != 0 || res != 0) {
2321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2322                 return -1;
2323         }
2324
2325         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2326         len = offsetof(struct ctdb_all_public_ips, ips) +
2327                 ipsv4->num*sizeof(struct ctdb_public_ip);
2328         *ips = talloc_zero_size(mem_ctx, len);
2329         CTDB_NO_MEMORY(ctdb, *ips);
2330         (*ips)->num = ipsv4->num;
2331         for (i=0; i<ipsv4->num; i++) {
2332                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2333                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2334         }
2335
2336         talloc_free(outdata.dptr);
2337                     
2338         return 0;
2339 }
2340
2341 /*
2342   set/clear the permanent disabled bit on a remote node
2343  */
2344 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2345                        uint32_t set, uint32_t clear)
2346 {
2347         int ret;
2348         TDB_DATA data;
2349         struct ctdb_node_map *nodemap=NULL;
2350         struct ctdb_node_flag_change c;
2351         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2352         uint32_t recmaster;
2353         uint32_t *nodes;
2354
2355
2356         /* find the recovery master */
2357         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2358         if (ret != 0) {
2359                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2360                 talloc_free(tmp_ctx);
2361                 return ret;
2362         }
2363
2364
2365         /* read the node flags from the recmaster */
2366         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2367         if (ret != 0) {
2368                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2369                 talloc_free(tmp_ctx);
2370                 return -1;
2371         }
2372         if (destnode >= nodemap->num) {
2373                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2374                 talloc_free(tmp_ctx);
2375                 return -1;
2376         }
2377
2378         c.pnn       = destnode;
2379         c.old_flags = nodemap->nodes[destnode].flags;
2380         c.new_flags = c.old_flags;
2381         c.new_flags |= set;
2382         c.new_flags &= ~clear;
2383
2384         data.dsize = sizeof(c);
2385         data.dptr = (unsigned char *)&c;
2386
2387         /* send the flags update to all connected nodes */
2388         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2389
2390         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2391                                         nodes, 0,
2392                                         timeout, false, data,
2393                                         NULL, NULL,
2394                                         NULL) != 0) {
2395                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2396
2397                 talloc_free(tmp_ctx);
2398                 return -1;
2399         }
2400
2401         talloc_free(tmp_ctx);
2402         return 0;
2403 }
2404
2405
2406 /*
2407   get all tunables
2408  */
2409 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2410                                struct timeval timeout, 
2411                                uint32_t destnode,
2412                                struct ctdb_tunable *tunables)
2413 {
2414         TDB_DATA outdata;
2415         int ret;
2416         int32_t res;
2417
2418         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2419                            &outdata, &res, &timeout, NULL);
2420         if (ret != 0 || res != 0) {
2421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2422                 return -1;
2423         }
2424
2425         if (outdata.dsize != sizeof(*tunables)) {
2426                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2427                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2428                 return -1;              
2429         }
2430
2431         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2432         talloc_free(outdata.dptr);
2433         return 0;
2434 }
2435
2436 /*
2437   add a public address to a node
2438  */
2439 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2440                       struct timeval timeout, 
2441                       uint32_t destnode,
2442                       struct ctdb_control_ip_iface *pub)
2443 {
2444         TDB_DATA data;
2445         int32_t res;
2446         int ret;
2447
2448         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2449         data.dptr  = (unsigned char *)pub;
2450
2451         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2452                            NULL, &res, &timeout, NULL);
2453         if (ret != 0 || res != 0) {
2454                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2455                 return -1;
2456         }
2457
2458         return 0;
2459 }
2460
2461 /*
2462   delete a public address from a node
2463  */
2464 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2465                       struct timeval timeout, 
2466                       uint32_t destnode,
2467                       struct ctdb_control_ip_iface *pub)
2468 {
2469         TDB_DATA data;
2470         int32_t res;
2471         int ret;
2472
2473         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2474         data.dptr  = (unsigned char *)pub;
2475
2476         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2477                            NULL, &res, &timeout, NULL);
2478         if (ret != 0 || res != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2480                 return -1;
2481         }
2482
2483         return 0;
2484 }
2485
2486 /*
2487   kill a tcp connection
2488  */
2489 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2490                       struct timeval timeout, 
2491                       uint32_t destnode,
2492                       struct ctdb_control_killtcp *killtcp)
2493 {
2494         TDB_DATA data;
2495         int32_t res;
2496         int ret;
2497
2498         data.dsize = sizeof(struct ctdb_control_killtcp);
2499         data.dptr  = (unsigned char *)killtcp;
2500
2501         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2502                            NULL, &res, &timeout, NULL);
2503         if (ret != 0 || res != 0) {
2504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2505                 return -1;
2506         }
2507
2508         return 0;
2509 }
2510
2511 /*
2512   send a gratious arp
2513  */
2514 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2515                       struct timeval timeout, 
2516                       uint32_t destnode,
2517                       ctdb_sock_addr *addr,
2518                       const char *ifname)
2519 {
2520         TDB_DATA data;
2521         int32_t res;
2522         int ret, len;
2523         struct ctdb_control_gratious_arp *gratious_arp;
2524         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2525
2526
2527         len = strlen(ifname)+1;
2528         gratious_arp = talloc_size(tmp_ctx, 
2529                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2530         CTDB_NO_MEMORY(ctdb, gratious_arp);
2531
2532         gratious_arp->addr = *addr;
2533         gratious_arp->len = len;
2534         memcpy(&gratious_arp->iface[0], ifname, len);
2535
2536
2537         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2538         data.dptr  = (unsigned char *)gratious_arp;
2539
2540         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2541                            NULL, &res, &timeout, NULL);
2542         if (ret != 0 || res != 0) {
2543                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2544                 talloc_free(tmp_ctx);
2545                 return -1;
2546         }
2547
2548         talloc_free(tmp_ctx);
2549         return 0;
2550 }
2551
2552 /*
2553   get a list of all tcp tickles that a node knows about for a particular vnn
2554  */
2555 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2556                               struct timeval timeout, uint32_t destnode, 
2557                               TALLOC_CTX *mem_ctx, 
2558                               ctdb_sock_addr *addr,
2559                               struct ctdb_control_tcp_tickle_list **list)
2560 {
2561         int ret;
2562         TDB_DATA data, outdata;
2563         int32_t status;
2564
2565         data.dptr = (uint8_t*)addr;
2566         data.dsize = sizeof(ctdb_sock_addr);
2567
2568         ret = ctdb_control(ctdb, destnode, 0, 
2569                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2570                            mem_ctx, &outdata, &status, NULL, NULL);
2571         if (ret != 0 || status != 0) {
2572                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2573                 return -1;
2574         }
2575
2576         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2577
2578         return status;
2579 }
2580
2581 /*
2582   register a server id
2583  */
2584 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2585                       struct timeval timeout, 
2586                       struct ctdb_server_id *id)
2587 {
2588         TDB_DATA data;
2589         int32_t res;
2590         int ret;
2591
2592         data.dsize = sizeof(struct ctdb_server_id);
2593         data.dptr  = (unsigned char *)id;
2594
2595         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2596                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2597                         0, data, NULL,
2598                         NULL, &res, &timeout, NULL);
2599         if (ret != 0 || res != 0) {
2600                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2601                 return -1;
2602         }
2603
2604         return 0;
2605 }
2606
2607 /*
2608   unregister a server id
2609  */
2610 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2611                       struct timeval timeout, 
2612                       struct ctdb_server_id *id)
2613 {
2614         TDB_DATA data;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = sizeof(struct ctdb_server_id);
2619         data.dptr  = (unsigned char *)id;
2620
2621         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2622                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2623                         0, data, NULL,
2624                         NULL, &res, &timeout, NULL);
2625         if (ret != 0 || res != 0) {
2626                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2627                 return -1;
2628         }
2629
2630         return 0;
2631 }
2632
2633
2634 /*
2635   check if a server id exists
2636
2637   if a server id does exist, return *status == 1, otherwise *status == 0
2638  */
2639 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2640                       struct timeval timeout, 
2641                       uint32_t destnode,
2642                       struct ctdb_server_id *id,
2643                       uint32_t *status)
2644 {
2645         TDB_DATA data;
2646         int32_t res;
2647         int ret;
2648
2649         data.dsize = sizeof(struct ctdb_server_id);
2650         data.dptr  = (unsigned char *)id;
2651
2652         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2653                         0, data, NULL,
2654                         NULL, &res, &timeout, NULL);
2655         if (ret != 0) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2657                 return -1;
2658         }
2659
2660         if (res) {
2661                 *status = 1;
2662         } else {
2663                 *status = 0;
2664         }
2665
2666         return 0;
2667 }
2668
2669 /*
2670    get the list of server ids that are registered on a node
2671 */
2672 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2673                 TALLOC_CTX *mem_ctx,
2674                 struct timeval timeout, uint32_t destnode, 
2675                 struct ctdb_server_id_list **svid_list)
2676 {
2677         int ret;
2678         TDB_DATA outdata;
2679         int32_t res;
2680
2681         ret = ctdb_control(ctdb, destnode, 0, 
2682                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2683                            mem_ctx, &outdata, &res, &timeout, NULL);
2684         if (ret != 0 || res != 0) {
2685                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2686                 return -1;
2687         }
2688
2689         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2690                     
2691         return 0;
2692 }
2693
2694 /*
2695   initialise the ctdb daemon for client applications
2696
2697   NOTE: In current code the daemon does not fork. This is for testing purposes only
2698   and to simplify the code.
2699 */
2700 struct ctdb_context *ctdb_init(struct event_context *ev)
2701 {
2702         int ret;
2703         struct ctdb_context *ctdb;
2704
2705         ctdb = talloc_zero(ev, struct ctdb_context);
2706         if (ctdb == NULL) {
2707                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2708                 return NULL;
2709         }
2710         ctdb->ev  = ev;
2711         ctdb->idr = idr_init(ctdb);
2712         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2713
2714         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2715         if (ret != 0) {
2716                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2717                 talloc_free(ctdb);
2718                 return NULL;
2719         }
2720
2721         return ctdb;
2722 }
2723
2724
2725 /*
2726   set some ctdb flags
2727 */
2728 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2729 {
2730         ctdb->flags |= flags;
2731 }
2732
2733 /*
2734   setup the local socket name
2735 */
2736 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2737 {
2738         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2739         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2740
2741         return 0;
2742 }
2743
2744 /*
2745   return the pnn of this node
2746 */
2747 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2748 {
2749         return ctdb->pnn;
2750 }
2751
2752
2753 /*
2754   get the uptime of a remote node
2755  */
2756 struct ctdb_client_control_state *
2757 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2758 {
2759         return ctdb_control_send(ctdb, destnode, 0, 
2760                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2761                            mem_ctx, &timeout, NULL);
2762 }
2763
2764 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2765 {
2766         int ret;
2767         int32_t res;
2768         TDB_DATA outdata;
2769
2770         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2771         if (ret != 0 || res != 0) {
2772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2773                 return -1;
2774         }
2775
2776         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2777
2778         return 0;
2779 }
2780
2781 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2782 {
2783         struct ctdb_client_control_state *state;
2784
2785         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2786         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2787 }
2788
2789 /*
2790   send a control to execute the "recovered" event script on a node
2791  */
2792 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2793 {
2794         int ret;
2795         int32_t status;
2796
2797         ret = ctdb_control(ctdb, destnode, 0, 
2798                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2799                            NULL, NULL, &status, &timeout, NULL);
2800         if (ret != 0 || status != 0) {
2801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2802                 return -1;
2803         }
2804
2805         return 0;
2806 }
2807
2808 /* 
2809   callback for the async helpers used when sending the same control
2810   to multiple nodes in parallell.
2811 */
2812 static void async_callback(struct ctdb_client_control_state *state)
2813 {
2814         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2815         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2816         int ret;
2817         TDB_DATA outdata;
2818         int32_t res;
2819         uint32_t destnode = state->c->hdr.destnode;
2820
2821         /* one more node has responded with recmode data */
2822         data->count--;
2823
2824         /* if we failed to push the db, then return an error and let
2825            the main loop try again.
2826         */
2827         if (state->state != CTDB_CONTROL_DONE) {
2828                 if ( !data->dont_log_errors) {
2829                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2830                 }
2831                 data->fail_count++;
2832                 if (data->fail_callback) {
2833                         data->fail_callback(ctdb, destnode, res, outdata,
2834                                         data->callback_data);
2835                 }
2836                 return;
2837         }
2838         
2839         state->async.fn = NULL;
2840
2841         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2842         if ((ret != 0) || (res != 0)) {
2843                 if ( !data->dont_log_errors) {
2844                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2845                 }
2846                 data->fail_count++;
2847                 if (data->fail_callback) {
2848                         data->fail_callback(ctdb, destnode, res, outdata,
2849                                         data->callback_data);
2850                 }
2851         }
2852         if ((ret == 0) && (data->callback != NULL)) {
2853                 data->callback(ctdb, destnode, res, outdata,
2854                                         data->callback_data);
2855         }
2856 }
2857
2858
2859 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2860 {
2861         /* set up the callback functions */
2862         state->async.fn = async_callback;
2863         state->async.private_data = data;
2864         
2865         /* one more control to wait for to complete */
2866         data->count++;
2867 }
2868
2869
2870 /* wait for up to the maximum number of seconds allowed
2871    or until all nodes we expect a response from has replied
2872 */
2873 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2874 {
2875         while (data->count > 0) {
2876                 event_loop_once(ctdb->ev);
2877         }
2878         if (data->fail_count != 0) {
2879                 if (!data->dont_log_errors) {
2880                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2881                                  data->fail_count));
2882                 }
2883                 return -1;
2884         }
2885         return 0;
2886 }
2887
2888
2889 /* 
2890    perform a simple control on the listed nodes
2891    The control cannot return data
2892  */
2893 int ctdb_client_async_control(struct ctdb_context *ctdb,
2894                                 enum ctdb_controls opcode,
2895                                 uint32_t *nodes,
2896                                 uint64_t srvid,
2897                                 struct timeval timeout,
2898                                 bool dont_log_errors,
2899                                 TDB_DATA data,
2900                                 client_async_callback client_callback,
2901                                 client_async_callback fail_callback,
2902                                 void *callback_data)
2903 {
2904         struct client_async_data *async_data;
2905         struct ctdb_client_control_state *state;
2906         int j, num_nodes;
2907
2908         async_data = talloc_zero(ctdb, struct client_async_data);
2909         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2910         async_data->dont_log_errors = dont_log_errors;
2911         async_data->callback = client_callback;
2912         async_data->fail_callback = fail_callback;
2913         async_data->callback_data = callback_data;
2914         async_data->opcode        = opcode;
2915
2916         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2917
2918         /* loop over all nodes and send an async control to each of them */
2919         for (j=0; j<num_nodes; j++) {
2920                 uint32_t pnn = nodes[j];
2921
2922                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2923                                           0, data, async_data, &timeout, NULL);
2924                 if (state == NULL) {
2925                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2926                         talloc_free(async_data);
2927                         return -1;
2928                 }
2929                 
2930                 ctdb_client_async_add(async_data, state);
2931         }
2932
2933         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2934                 talloc_free(async_data);
2935                 return -1;
2936         }
2937
2938         talloc_free(async_data);
2939         return 0;
2940 }
2941
2942 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2943                                 struct ctdb_vnn_map *vnn_map,
2944                                 TALLOC_CTX *mem_ctx,
2945                                 bool include_self)
2946 {
2947         int i, j, num_nodes;
2948         uint32_t *nodes;
2949
2950         for (i=num_nodes=0;i<vnn_map->size;i++) {
2951                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2952                         continue;
2953                 }
2954                 num_nodes++;
2955         } 
2956
2957         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2958         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2959
2960         for (i=j=0;i<vnn_map->size;i++) {
2961                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2962                         continue;
2963                 }
2964                 nodes[j++] = vnn_map->map[i];
2965         } 
2966
2967         return nodes;
2968 }
2969
2970 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2971                                 struct ctdb_node_map *node_map,
2972                                 TALLOC_CTX *mem_ctx,
2973                                 bool include_self)
2974 {
2975         int i, j, num_nodes;
2976         uint32_t *nodes;
2977
2978         for (i=num_nodes=0;i<node_map->num;i++) {
2979                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2980                         continue;
2981                 }
2982                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2983                         continue;
2984                 }
2985                 num_nodes++;
2986         } 
2987
2988         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2989         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2990
2991         for (i=j=0;i<node_map->num;i++) {
2992                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2993                         continue;
2994                 }
2995                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2996                         continue;
2997                 }
2998                 nodes[j++] = node_map->nodes[i].pnn;
2999         } 
3000
3001         return nodes;
3002 }
3003
3004 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3005                                 struct ctdb_node_map *node_map,
3006                                 TALLOC_CTX *mem_ctx,
3007                                 uint32_t pnn)
3008 {
3009         int i, j, num_nodes;
3010         uint32_t *nodes;
3011
3012         for (i=num_nodes=0;i<node_map->num;i++) {
3013                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3014                         continue;
3015                 }
3016                 if (node_map->nodes[i].pnn == pnn) {
3017                         continue;
3018                 }
3019                 num_nodes++;
3020         } 
3021
3022         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3023         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3024
3025         for (i=j=0;i<node_map->num;i++) {
3026                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3027                         continue;
3028                 }
3029                 if (node_map->nodes[i].pnn == pnn) {
3030                         continue;
3031                 }
3032                 nodes[j++] = node_map->nodes[i].pnn;
3033         } 
3034
3035         return nodes;
3036 }
3037
3038 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3039                                 struct ctdb_node_map *node_map,
3040                                 TALLOC_CTX *mem_ctx,
3041                                 bool include_self)
3042 {
3043         int i, j, num_nodes;
3044         uint32_t *nodes;
3045
3046         for (i=num_nodes=0;i<node_map->num;i++) {
3047                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3048                         continue;
3049                 }
3050                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3051                         continue;
3052                 }
3053                 num_nodes++;
3054         } 
3055
3056         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3057         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3058
3059         for (i=j=0;i<node_map->num;i++) {
3060                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3061                         continue;
3062                 }
3063                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3064                         continue;
3065                 }
3066                 nodes[j++] = node_map->nodes[i].pnn;
3067         } 
3068
3069         return nodes;
3070 }
3071
3072 /* 
3073   this is used to test if a pnn lock exists and if it exists will return
3074   the number of connections that pnn has reported or -1 if that recovery
3075   daemon is not running.
3076 */
3077 int
3078 ctdb_read_pnn_lock(int fd, int32_t pnn)
3079 {
3080         struct flock lock;
3081         char c;
3082
3083         lock.l_type = F_WRLCK;
3084         lock.l_whence = SEEK_SET;
3085         lock.l_start = pnn;
3086         lock.l_len = 1;
3087         lock.l_pid = 0;
3088
3089         if (fcntl(fd, F_GETLK, &lock) != 0) {
3090                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3091                 return -1;
3092         }
3093
3094         if (lock.l_type == F_UNLCK) {
3095                 return -1;
3096         }
3097
3098         if (pread(fd, &c, 1, pnn) == -1) {
3099                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3100                 return -1;
3101         }
3102
3103         return c;
3104 }
3105
3106 /*
3107   get capabilities of a remote node
3108  */
3109 struct ctdb_client_control_state *
3110 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3111 {
3112         return ctdb_control_send(ctdb, destnode, 0, 
3113                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3114                            mem_ctx, &timeout, NULL);
3115 }
3116
3117 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3118 {
3119         int ret;
3120         int32_t res;
3121         TDB_DATA outdata;
3122
3123         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3124         if ( (ret != 0) || (res != 0) ) {
3125                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3126                 return -1;
3127         }
3128
3129         if (capabilities) {
3130                 *capabilities = *((uint32_t *)outdata.dptr);
3131         }
3132
3133         return 0;
3134 }
3135
3136 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3137 {
3138         struct ctdb_client_control_state *state;
3139         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3140         int ret;
3141
3142         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3143         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3144         talloc_free(tmp_ctx);
3145         return ret;
3146 }
3147
3148 /**
3149  * check whether a transaction is active on a given db on a given node
3150  */
3151 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3152                                      uint32_t destnode,
3153                                      uint32_t db_id)
3154 {
3155         int32_t status;
3156         int ret;
3157         TDB_DATA indata;
3158
3159         indata.dptr = (uint8_t *)&db_id;
3160         indata.dsize = sizeof(db_id);
3161
3162         ret = ctdb_control(ctdb, destnode, 0,
3163                            CTDB_CONTROL_TRANS2_ACTIVE,
3164                            0, indata, NULL, NULL, &status,
3165                            NULL, NULL);
3166
3167         if (ret != 0) {
3168                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3169                 return -1;
3170         }
3171
3172         return status;
3173 }
3174
3175
3176 struct ctdb_transaction_handle {
3177         struct ctdb_db_context *ctdb_db;
3178         bool in_replay;
3179         /*
3180          * we store the reads and writes done under a transaction:
3181          * - one list stores both reads and writes (m_all),
3182          * - the other just writes (m_write)
3183          */
3184         struct ctdb_marshall_buffer *m_all;
3185         struct ctdb_marshall_buffer *m_write;
3186 };
3187
3188 /* start a transaction on a database */
3189 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3190 {
3191         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3192         return 0;
3193 }
3194
3195 /* start a transaction on a database */
3196 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3197 {
3198         struct ctdb_record_handle *rh;
3199         TDB_DATA key;
3200         TDB_DATA data;
3201         struct ctdb_ltdb_header header;
3202         TALLOC_CTX *tmp_ctx;
3203         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3204         int ret;
3205         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3206         pid_t pid;
3207         int32_t status;
3208
3209         key.dptr = discard_const(keyname);
3210         key.dsize = strlen(keyname);
3211
3212         if (!ctdb_db->persistent) {
3213                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3214                 return -1;
3215         }
3216
3217 again:
3218         tmp_ctx = talloc_new(h);
3219
3220         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3221         if (rh == NULL) {
3222                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3223                 talloc_free(tmp_ctx);
3224                 return -1;
3225         }
3226
3227         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3228                                               CTDB_CURRENT_NODE,
3229                                               ctdb_db->db_id);
3230         if (status == 1) {
3231                 unsigned long int usec = (1000 + random()) % 100000;
3232                 DEBUG(DEBUG_NOTICE, (__location__ " transaction is active "
3233                                      "on db_id[0x%08x]. waiting for %lu "
3234                                      "microseconds\n",
3235                                      ctdb_db->db_id, usec));
3236                 talloc_free(tmp_ctx);
3237                 usleep(usec);
3238                 goto again;
3239         }
3240
3241         /*
3242          * store the pid in the database:
3243          * it is not enough that the node is dmaster...
3244          */
3245         pid = getpid();
3246         data.dptr = (unsigned char *)&pid;
3247         data.dsize = sizeof(pid_t);
3248         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3249         if (ret != 0) {
3250                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3251                                   "transaction record\n"));
3252                 talloc_free(tmp_ctx);
3253                 return -1;
3254         }
3255
3256         talloc_free(rh);
3257
3258         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3259         if (ret != 0) {
3260                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3261                 talloc_free(tmp_ctx);
3262                 return -1;
3263         }
3264
3265         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3266         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3267                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3268                 talloc_free(tmp_ctx);
3269                 goto again;
3270         }
3271
3272         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3273                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3274                 talloc_free(tmp_ctx);
3275                 goto again;
3276         }
3277
3278         talloc_free(tmp_ctx);
3279
3280         return 0;
3281 }
3282
3283
3284 /* start a transaction on a database */
3285 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3286                                                        TALLOC_CTX *mem_ctx)
3287 {
3288         struct ctdb_transaction_handle *h;
3289         int ret;
3290
3291         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3292         if (h == NULL) {
3293                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3294                 return NULL;
3295         }
3296
3297         h->ctdb_db = ctdb_db;
3298
3299         ret = ctdb_transaction_fetch_start(h);
3300         if (ret != 0) {
3301                 talloc_free(h);
3302                 return NULL;
3303         }
3304
3305         talloc_set_destructor(h, ctdb_transaction_destructor);
3306
3307         return h;
3308 }
3309
3310
3311
3312 /*
3313   fetch a record inside a transaction
3314  */
3315 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3316                            TALLOC_CTX *mem_ctx, 
3317                            TDB_DATA key, TDB_DATA *data)
3318 {
3319         struct ctdb_ltdb_header header;
3320         int ret;
3321
3322         ZERO_STRUCT(header);
3323
3324         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3325         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3326                 /* record doesn't exist yet */
3327                 *data = tdb_null;
3328                 ret = 0;
3329         }
3330         
3331         if (ret != 0) {
3332                 return ret;
3333         }
3334
3335         if (!h->in_replay) {
3336                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3337                 if (h->m_all == NULL) {
3338                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3339                         return -1;
3340                 }
3341         }
3342
3343         return 0;
3344 }
3345
3346 /*
3347   stores a record inside a transaction
3348  */
3349 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3350                            TDB_DATA key, TDB_DATA data)
3351 {
3352         TALLOC_CTX *tmp_ctx = talloc_new(h);
3353         struct ctdb_ltdb_header header;
3354         TDB_DATA olddata;
3355         int ret;
3356
3357         ZERO_STRUCT(header);
3358
3359         /* we need the header so we can update the RSN */
3360         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3361         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3362                 /* the record doesn't exist - create one with us as dmaster.
3363                    This is only safe because we are in a transaction and this
3364                    is a persistent database */
3365                 ZERO_STRUCT(header);
3366         } else if (ret != 0) {
3367                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3368                 talloc_free(tmp_ctx);
3369                 return ret;
3370         }
3371
3372         if (data.dsize == olddata.dsize &&
3373             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3374                 /* save writing the same data */
3375                 talloc_free(tmp_ctx);
3376                 return 0;
3377         }
3378
3379         header.dmaster = h->ctdb_db->ctdb->pnn;
3380         header.rsn++;
3381
3382         if (!h->in_replay) {
3383                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3384                 if (h->m_all == NULL) {
3385                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3386                         talloc_free(tmp_ctx);
3387                         return -1;
3388                 }
3389         }               
3390
3391         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3392         if (h->m_write == NULL) {
3393                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3394                 talloc_free(tmp_ctx);
3395                 return -1;
3396         }
3397         
3398         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3399
3400         talloc_free(tmp_ctx);
3401         
3402         return ret;
3403 }
3404
3405 /*
3406   replay a transaction
3407  */
3408 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3409 {
3410         int ret, i;
3411         struct ctdb_rec_data *rec = NULL;
3412
3413         h->in_replay = true;
3414         talloc_free(h->m_write);
3415         h->m_write = NULL;
3416
3417         ret = ctdb_transaction_fetch_start(h);
3418         if (ret != 0) {
3419                 return ret;
3420         }
3421
3422         for (i=0;i<h->m_all->count;i++) {
3423                 TDB_DATA key, data;
3424
3425                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3426                 if (rec == NULL) {
3427                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3428                         goto failed;
3429                 }
3430
3431                 if (rec->reqid == 0) {
3432                         /* its a store */
3433                         if (ctdb_transaction_store(h, key, data) != 0) {
3434                                 goto failed;
3435                         }
3436                 } else {
3437                         TDB_DATA data2;
3438                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3439
3440                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3441                                 talloc_free(tmp_ctx);
3442                                 goto failed;
3443                         }
3444                         if (data2.dsize != data.dsize ||
3445                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3446                                 /* the record has changed on us - we have to give up */
3447                                 talloc_free(tmp_ctx);
3448                                 goto failed;
3449                         }
3450                         talloc_free(tmp_ctx);
3451                 }
3452         }
3453         
3454         return 0;
3455
3456 failed:
3457         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3458         return -1;
3459 }
3460
3461
3462 /*
3463   commit a transaction
3464  */
3465 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3466 {
3467         int ret, retries=0;
3468         int32_t status;
3469         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3470         struct timeval timeout;
3471         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3472
3473         talloc_set_destructor(h, NULL);
3474
3475         /* our commit strategy is quite complex.
3476
3477            - we first try to commit the changes to all other nodes
3478
3479            - if that works, then we commit locally and we are done
3480
3481            - if a commit on another node fails, then we need to cancel
3482              the transaction, then restart the transaction (thus
3483              opening a window of time for a pending recovery to
3484              complete), then replay the transaction, checking all the
3485              reads and writes (checking that reads give the same data,
3486              and writes succeed). Then we retry the transaction to the
3487              other nodes
3488         */
3489
3490 again:
3491         if (h->m_write == NULL) {
3492                 /* no changes were made */
3493                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3494                 talloc_free(h);
3495                 return 0;
3496         }
3497
3498         /* tell ctdbd to commit to the other nodes */
3499         timeout = timeval_current_ofs(1, 0);
3500         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3501                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3502                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3503                            &timeout, NULL);
3504         if (ret != 0 || status != 0) {
3505                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3506                 DEBUG(DEBUG_WARNING, (__location__ " transaction commit%s failed"
3507                                       ", retrying after 1 second...\n",
3508                                       (retries==0)?"":"retry "));
3509                 sleep(1);
3510
3511                 if (ret != 0) {
3512                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3513                 } else {
3514                         /* work out what error code we will give if we 
3515                            have to fail the operation */
3516                         switch ((enum ctdb_trans2_commit_error)status) {
3517                         case CTDB_TRANS2_COMMIT_SUCCESS:
3518                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3519                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3520                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3521                                 break;
3522                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3523                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3524                                 break;
3525                         }
3526                 }
3527
3528                 if (++retries == 10) {
3529                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3530                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3531                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3532                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3533                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3534                         talloc_free(h);
3535                         return -1;
3536                 }               
3537
3538                 if (ctdb_replay_transaction(h) != 0) {
3539                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3540                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3541                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3542                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3543                         talloc_free(h);
3544                         return -1;
3545                 }
3546                 goto again;
3547         } else {
3548                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3549         }
3550
3551         /* do the real commit locally */
3552         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3553         if (ret != 0) {
3554                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3555                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3556                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3557                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3558                 talloc_free(h);
3559                 return ret;
3560         }
3561
3562         /* tell ctdbd that we are finished with our local commit */
3563         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3564                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3565                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3566         talloc_free(h);
3567         return 0;
3568 }
3569
3570 /*
3571   recovery daemon ping to main daemon
3572  */
3573 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3574 {
3575         int ret;
3576         int32_t res;
3577
3578         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3579                            ctdb, NULL, &res, NULL, NULL);
3580         if (ret != 0 || res != 0) {
3581                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3582                 return -1;
3583         }
3584
3585         return 0;
3586 }
3587
3588 /* when forking the main daemon and the child process needs to connect back
3589  * to the daemon as a client process, this function can be used to change
3590  * the ctdb context from daemon into client mode
3591  */
3592 int switch_from_server_to_client(struct ctdb_context *ctdb)
3593 {
3594         int ret;
3595
3596         /* shutdown the transport */
3597         if (ctdb->methods) {
3598                 ctdb->methods->shutdown(ctdb);
3599         }
3600
3601         /* get a new event context */
3602         talloc_free(ctdb->ev);
3603         ctdb->ev = event_context_init(ctdb);
3604
3605         close(ctdb->daemon.sd);
3606         ctdb->daemon.sd = -1;
3607
3608         /* the client does not need to be realtime */
3609         if (ctdb->do_setsched) {
3610                 ctdb_restore_scheduler(ctdb);
3611         }
3612
3613         /* initialise ctdb */
3614         ret = ctdb_socket_connect(ctdb);
3615         if (ret != 0) {
3616                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3617                 return -1;
3618         }
3619
3620          return 0;
3621 }
3622
3623 /*
3624   tell the main daemon we are starting a new monitor event script
3625  */
3626 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3627 {
3628         int ret;
3629         int32_t res;
3630
3631         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3632                            ctdb, NULL, &res, NULL, NULL);
3633         if (ret != 0 || res != 0) {
3634                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3635                 return -1;
3636         }
3637
3638         return 0;
3639 }
3640
3641 /*
3642   tell the main daemon we are starting a new monitor event script
3643  */
3644 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3645 {
3646         int ret;
3647         int32_t res;
3648
3649         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3650                            ctdb, NULL, &res, NULL, NULL);
3651         if (ret != 0 || res != 0) {
3652                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3653                 return -1;
3654         }
3655
3656         return 0;
3657 }
3658
3659 /*
3660   tell the main daemon we are starting to run an eventscript
3661  */
3662 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3663 {
3664         int ret;
3665         int32_t res;
3666         TDB_DATA data;
3667
3668         data.dptr = discard_const(name);
3669         data.dsize = strlen(name)+1;
3670
3671         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3672                            ctdb, NULL, &res, NULL, NULL);
3673         if (ret != 0 || res != 0) {
3674                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3675                 return -1;
3676         }
3677
3678         return 0;
3679 }
3680
3681 /*
3682   tell the main daemon the status of the script we ran
3683  */
3684 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3685 {
3686         int ret;
3687         int32_t res;
3688         TDB_DATA data;
3689
3690         data.dptr = (uint8_t *)&result;
3691         data.dsize = sizeof(result);
3692
3693         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3694                            ctdb, NULL, &res, NULL, NULL);
3695         if (ret != 0 || res != 0) {
3696                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3697                 return -1;
3698         }
3699
3700         return 0;
3701 }
3702
3703 /*
3704   tell the main daemon a script was disabled
3705  */
3706 int ctdb_ctrl_event_script_disabled(struct ctdb_context *ctdb, const char *name)
3707 {
3708         int ret;
3709         int32_t res;
3710         TDB_DATA data;
3711
3712         data.dptr = discard_const(name);
3713         data.dsize = strlen(name)+1;
3714
3715         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_DISABLED, 0, data, 
3716                            ctdb, NULL, &res, NULL, NULL);
3717         if (ret != 0 || res != 0) {
3718                 DEBUG(DEBUG_ERR,("Failed to send event_script_disabeld\n"));
3719                 return -1;
3720         }
3721
3722         return 0;
3723 }
3724
3725 /*
3726   get the status of running the monitor eventscripts
3727  */
3728 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3729                 struct timeval timeout, uint32_t destnode, 
3730                 TALLOC_CTX *mem_ctx,
3731                 struct ctdb_monitoring_wire **script_status)
3732 {
3733         int ret;
3734         TDB_DATA outdata;
3735         int32_t res;
3736
3737         ret = ctdb_control(ctdb, destnode, 0, 
3738                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3739                            mem_ctx, &outdata, &res, &timeout, NULL);
3740         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3741                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3742                 return -1;
3743         }
3744
3745         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3746         talloc_free(outdata.dptr);
3747                     
3748         return 0;
3749 }
3750
3751 /*
3752   tell the main daemon how long it took to lock the reclock file
3753  */
3754 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3755 {
3756         int ret;
3757         int32_t res;
3758         TDB_DATA data;
3759
3760         data.dptr = (uint8_t *)&latency;
3761         data.dsize = sizeof(latency);
3762
3763         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3764                            ctdb, NULL, &res, NULL, NULL);
3765         if (ret != 0 || res != 0) {
3766                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3767                 return -1;
3768         }
3769
3770         return 0;
3771 }
3772
3773 /*
3774   get the name of the reclock file
3775  */
3776 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3777                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3778                          const char **name)
3779 {
3780         int ret;
3781         int32_t res;
3782         TDB_DATA data;
3783
3784         ret = ctdb_control(ctdb, destnode, 0, 
3785                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3786                            mem_ctx, &data, &res, &timeout, NULL);
3787         if (ret != 0 || res != 0) {
3788                 return -1;
3789         }
3790
3791         if (data.dsize == 0) {
3792                 *name = NULL;
3793         } else {
3794                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3795         }
3796         talloc_free(data.dptr);
3797
3798         return 0;
3799 }
3800
3801 /*
3802   set the reclock filename for a node
3803  */
3804 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3805 {
3806         int ret;
3807         TDB_DATA data;
3808         int32_t res;
3809
3810         if (reclock == NULL) {
3811                 data.dsize = 0;
3812                 data.dptr  = NULL;
3813         } else {
3814                 data.dsize = strlen(reclock) + 1;
3815                 data.dptr  = discard_const(reclock);
3816         }
3817
3818         ret = ctdb_control(ctdb, destnode, 0, 
3819                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3820                            NULL, NULL, &res, &timeout, NULL);
3821         if (ret != 0 || res != 0) {
3822                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3823                 return -1;
3824         }
3825
3826         return 0;
3827 }
3828
3829 /*
3830   stop a node
3831  */
3832 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3833 {
3834         int ret;
3835         int32_t res;
3836
3837         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3838                            ctdb, NULL, &res, &timeout, NULL);
3839         if (ret != 0 || res != 0) {
3840                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3841                 return -1;
3842         }
3843
3844         return 0;
3845 }
3846
3847 /*
3848   continue a node
3849  */
3850 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3851 {
3852         int ret;
3853
3854         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3855                            ctdb, NULL, NULL, &timeout, NULL);
3856         if (ret != 0) {
3857                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3858                 return -1;
3859         }
3860
3861         return 0;
3862 }
3863
3864 /*
3865   set the natgw state for a node
3866  */
3867 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3868 {
3869         int ret;
3870         TDB_DATA data;
3871         int32_t res;
3872
3873         data.dsize = sizeof(natgwstate);
3874         data.dptr  = (uint8_t *)&natgwstate;
3875
3876         ret = ctdb_control(ctdb, destnode, 0, 
3877                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3878                            NULL, NULL, &res, &timeout, NULL);
3879         if (ret != 0 || res != 0) {
3880                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3881                 return -1;
3882         }
3883
3884         return 0;
3885 }
3886
3887 /*
3888   set the lmaster role for a node
3889  */
3890 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3891 {
3892         int ret;
3893         TDB_DATA data;
3894         int32_t res;
3895
3896         data.dsize = sizeof(lmasterrole);
3897         data.dptr  = (uint8_t *)&lmasterrole;
3898
3899         ret = ctdb_control(ctdb, destnode, 0, 
3900                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3901                            NULL, NULL, &res, &timeout, NULL);
3902         if (ret != 0 || res != 0) {
3903                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3904                 return -1;
3905         }
3906
3907         return 0;
3908 }
3909
3910 /*
3911   set the recmaster role for a node
3912  */
3913 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3914 {
3915         int ret;
3916         TDB_DATA data;
3917         int32_t res;
3918
3919         data.dsize = sizeof(recmasterrole);
3920         data.dptr  = (uint8_t *)&recmasterrole;
3921
3922         ret = ctdb_control(ctdb, destnode, 0, 
3923                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3924                            NULL, NULL, &res, &timeout, NULL);
3925         if (ret != 0 || res != 0) {
3926                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3927                 return -1;
3928         }
3929
3930         return 0;
3931 }
3932
3933 /* enable an eventscript
3934  */
3935 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3936 {
3937         int ret;
3938         TDB_DATA data;
3939         int32_t res;
3940
3941         data.dsize = strlen(script) + 1;
3942         data.dptr  = discard_const(script);
3943
3944         ret = ctdb_control(ctdb, destnode, 0, 
3945                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3946                            NULL, NULL, &res, &timeout, NULL);
3947         if (ret != 0 || res != 0) {
3948                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3949                 return -1;
3950         }
3951
3952         return 0;
3953 }
3954
3955 /* disable an eventscript
3956  */
3957 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3958 {
3959         int ret;
3960         TDB_DATA data;
3961         int32_t res;
3962
3963         data.dsize = strlen(script) + 1;
3964         data.dptr  = discard_const(script);
3965
3966         ret = ctdb_control(ctdb, destnode, 0, 
3967                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3968                            NULL, NULL, &res, &timeout, NULL);
3969         if (ret != 0 || res != 0) {
3970                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3971                 return -1;
3972         }
3973
3974         return 0;
3975 }
3976
3977
3978 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3979 {
3980         int ret;
3981         TDB_DATA data;
3982         int32_t res;
3983
3984         data.dsize = sizeof(*bantime);
3985         data.dptr  = (uint8_t *)bantime;
3986
3987         ret = ctdb_control(ctdb, destnode, 0, 
3988                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3989                            NULL, NULL, &res, &timeout, NULL);
3990         if (ret != 0 || res != 0) {
3991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3992                 return -1;
3993         }
3994
3995         return 0;
3996 }
3997
3998
3999 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4000 {
4001         int ret;
4002         TDB_DATA outdata;
4003         int32_t res;
4004         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4005
4006         ret = ctdb_control(ctdb, destnode, 0, 
4007                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4008                            tmp_ctx, &outdata, &res, &timeout, NULL);
4009         if (ret != 0 || res != 0) {
4010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4011                 talloc_free(tmp_ctx);
4012                 return -1;
4013         }
4014
4015         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4016         talloc_free(tmp_ctx);
4017
4018         return 0;
4019 }
4020
4021
4022 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4023 {
4024         int ret;
4025         int32_t res;
4026         TDB_DATA data;
4027         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4028
4029         data.dptr = (uint8_t*)db_prio;
4030         data.dsize = sizeof(*db_prio);
4031
4032         ret = ctdb_control(ctdb, destnode, 0, 
4033                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4034                            tmp_ctx, NULL, &res, &timeout, NULL);
4035         if (ret != 0 || res != 0) {
4036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4037                 talloc_free(tmp_ctx);
4038                 return -1;
4039         }
4040
4041         talloc_free(tmp_ctx);
4042
4043         return 0;
4044 }
4045
4046 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4047 {
4048         int ret;
4049         int32_t res;
4050         TDB_DATA data;
4051         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4052
4053         data.dptr = (uint8_t*)&db_id;
4054         data.dsize = sizeof(db_id);
4055
4056         ret = ctdb_control(ctdb, destnode, 0, 
4057                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4058                            tmp_ctx, NULL, &res, &timeout, NULL);
4059         if (ret != 0 || res < 0) {
4060                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4061                 talloc_free(tmp_ctx);
4062                 return -1;
4063         }
4064
4065         if (priority) {
4066                 *priority = res;
4067         }
4068
4069         talloc_free(tmp_ctx);
4070
4071         return 0;
4072 }