28f4358fcb721e5fe3bd28092ca9cf3c7083f750
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/time.h"
35 #include "lib/util/debug.h"
36 #include "lib/util/samba_util.h"
37
38 #include "ctdb_logging.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
41
42 #include "common/reqid.h"
43 #include "common/system.h"
44 #include "common/common.h"
45
46 /*
47   allocate a packet for use in client<->daemon communication
48  */
49 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
50                                             TALLOC_CTX *mem_ctx, 
51                                             enum ctdb_operation operation, 
52                                             size_t length, size_t slength,
53                                             const char *type)
54 {
55         int size;
56         struct ctdb_req_header *hdr;
57
58         length = MAX(length, slength);
59         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
60
61         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
62         if (hdr == NULL) {
63                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
64                          operation, (unsigned)length));
65                 return NULL;
66         }
67         talloc_set_name_const(hdr, type);
68         hdr->length       = length;
69         hdr->operation    = operation;
70         hdr->ctdb_magic   = CTDB_MAGIC;
71         hdr->ctdb_version = CTDB_PROTOCOL;
72         hdr->srcnode      = ctdb->pnn;
73         if (ctdb->vnn_map) {
74                 hdr->generation = ctdb->vnn_map->generation;
75         }
76
77         return hdr;
78 }
79
80 /*
81   local version of ctdb_call
82 */
83 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
84                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
85                     TDB_DATA *data, bool updatetdb)
86 {
87         struct ctdb_call_info *c;
88         struct ctdb_registered_call *fn;
89         struct ctdb_context *ctdb = ctdb_db->ctdb;
90         
91         c = talloc(ctdb, struct ctdb_call_info);
92         CTDB_NO_MEMORY(ctdb, c);
93
94         c->key = call->key;
95         c->call_data = &call->call_data;
96         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
97         c->record_data.dsize = data->dsize;
98         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
99         c->new_data = NULL;
100         c->reply_data = NULL;
101         c->status = 0;
102         c->header = header;
103
104         for (fn=ctdb_db->calls;fn;fn=fn->next) {
105                 if (fn->id == call->call_id) break;
106         }
107         if (fn == NULL) {
108                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
109                 talloc_free(c);
110                 return -1;
111         }
112
113         if (fn->fn(c) != 0) {
114                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
115                 talloc_free(c);
116                 return -1;
117         }
118
119         /* we need to force the record to be written out if this was a remote access */
120         if (c->new_data == NULL) {
121                 c->new_data = &c->record_data;
122         }
123
124         if (c->new_data && updatetdb) {
125                 /* XXX check that we always have the lock here? */
126                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
127                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
128                         talloc_free(c);
129                         return -1;
130                 }
131         }
132
133         if (c->reply_data) {
134                 call->reply_data = *c->reply_data;
135
136                 talloc_steal(call, call->reply_data.dptr);
137                 talloc_set_name_const(call->reply_data.dptr, __location__);
138         } else {
139                 call->reply_data.dptr = NULL;
140                 call->reply_data.dsize = 0;
141         }
142         call->status = c->status;
143
144         talloc_free(c);
145
146         return 0;
147 }
148
149
150 /*
151   queue a packet for sending from client to daemon
152 */
153 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
156 }
157
158
159 /*
160   called when a CTDB_REPLY_CALL packet comes in in the client
161
162   This packet comes in response to a CTDB_REQ_CALL request packet. It
163   contains any reply data from the call
164 */
165 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
166 {
167         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
168         struct ctdb_client_call_state *state;
169
170         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
171         if (state == NULL) {
172                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
173                 return;
174         }
175
176         if (hdr->reqid != state->reqid) {
177                 /* we found a record  but it was the wrong one */
178                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
179                 return;
180         }
181
182         state->call->reply_data.dptr = c->data;
183         state->call->reply_data.dsize = c->datalen;
184         state->call->status = c->status;
185
186         talloc_steal(state, c);
187
188         state->state = CTDB_CALL_DONE;
189
190         if (state->async.fn) {
191                 state->async.fn(state);
192         }
193 }
194
195 void ctdb_request_message(struct ctdb_context *ctdb,
196                           struct ctdb_req_header *hdr)
197 {
198         struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
199         TDB_DATA data;
200
201         data.dsize = c->datalen;
202         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
203         if (data.dptr == NULL) {
204                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
205                 return;
206         }
207
208         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
209 }
210
211 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
212
213 /*
214   this is called in the client, when data comes in from the daemon
215  */
216 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
217 {
218         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
219         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
220         TALLOC_CTX *tmp_ctx;
221
222         /* place the packet as a child of a tmp_ctx. We then use
223            talloc_free() below to free it. If any of the calls want
224            to keep it, then they will steal it somewhere else, and the
225            talloc_free() will be a no-op */
226         tmp_ctx = talloc_new(ctdb);
227         talloc_steal(tmp_ctx, hdr);
228
229         if (cnt == 0) {
230                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
231                 exit(1);
232         }
233
234         if (cnt < sizeof(*hdr)) {
235                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
236                 goto done;
237         }
238         if (cnt != hdr->length) {
239                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
240                                (unsigned)hdr->length, (unsigned)cnt);
241                 goto done;
242         }
243
244         if (hdr->ctdb_magic != CTDB_MAGIC) {
245                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
246                 goto done;
247         }
248
249         if (hdr->ctdb_version != CTDB_PROTOCOL) {
250                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
251                 goto done;
252         }
253
254         switch (hdr->operation) {
255         case CTDB_REPLY_CALL:
256                 ctdb_client_reply_call(ctdb, hdr);
257                 break;
258
259         case CTDB_REQ_MESSAGE:
260                 ctdb_request_message(ctdb, hdr);
261                 break;
262
263         case CTDB_REPLY_CONTROL:
264                 ctdb_client_reply_control(ctdb, hdr);
265                 break;
266
267         default:
268                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
269         }
270
271 done:
272         talloc_free(tmp_ctx);
273 }
274
275 /*
276   connect to a unix domain socket
277 */
278 int ctdb_socket_connect(struct ctdb_context *ctdb)
279 {
280         struct sockaddr_un addr;
281
282         memset(&addr, 0, sizeof(addr));
283         addr.sun_family = AF_UNIX;
284         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
285
286         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
287         if (ctdb->daemon.sd == -1) {
288                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
289                 return -1;
290         }
291
292         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
293                 close(ctdb->daemon.sd);
294                 ctdb->daemon.sd = -1;
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 return -1;
297         }
298
299         set_nonblocking(ctdb->daemon.sd);
300         set_close_on_exec(ctdb->daemon.sd);
301         
302         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
303                                               CTDB_DS_ALIGNMENT, 
304                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
305         return 0;
306 }
307
308
309 struct ctdb_record_handle {
310         struct ctdb_db_context *ctdb_db;
311         TDB_DATA key;
312         TDB_DATA *data;
313         struct ctdb_ltdb_header header;
314 };
315
316
317 /*
318   make a recv call to the local ctdb daemon - called from client context
319
320   This is called when the program wants to wait for a ctdb_call to complete and get the 
321   results. This call will block unless the call has already completed.
322 */
323 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
324 {
325         if (state == NULL) {
326                 return -1;
327         }
328
329         while (state->state < CTDB_CALL_DONE) {
330                 tevent_loop_once(state->ctdb_db->ctdb->ev);
331         }
332         if (state->state != CTDB_CALL_DONE) {
333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
334                 talloc_free(state);
335                 return -1;
336         }
337
338         if (state->call->reply_data.dsize) {
339                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
340                                                       state->call->reply_data.dptr,
341                                                       state->call->reply_data.dsize);
342                 call->reply_data.dsize = state->call->reply_data.dsize;
343         } else {
344                 call->reply_data.dptr = NULL;
345                 call->reply_data.dsize = 0;
346         }
347         call->status = state->call->status;
348         talloc_free(state);
349
350         return call->status;
351 }
352
353
354
355
356 /*
357   destroy a ctdb_call in client
358 */
359 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
360 {
361         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
362         return 0;
363 }
364
365 /*
366   construct an event driven local ctdb_call
367
368   this is used so that locally processed ctdb_call requests are processed
369   in an event driven manner
370 */
371 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
372                                                                   struct ctdb_call *call,
373                                                                   struct ctdb_ltdb_header *header,
374                                                                   TDB_DATA *data)
375 {
376         struct ctdb_client_call_state *state;
377         struct ctdb_context *ctdb = ctdb_db->ctdb;
378         int ret;
379
380         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
381         CTDB_NO_MEMORY_NULL(ctdb, state);
382         state->call = talloc_zero(state, struct ctdb_call);
383         CTDB_NO_MEMORY_NULL(ctdb, state->call);
384
385         talloc_steal(state, data->dptr);
386
387         state->state   = CTDB_CALL_DONE;
388         *(state->call) = *call;
389         state->ctdb_db = ctdb_db;
390
391         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
392         if (ret != 0) {
393                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
394         }
395
396         return state;
397 }
398
399 /*
400   make a ctdb call to the local daemon - async send. Called from client context.
401
402   This constructs a ctdb_call request and queues it for processing. 
403   This call never blocks.
404 */
405 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
406                                               struct ctdb_call *call)
407 {
408         struct ctdb_client_call_state *state;
409         struct ctdb_context *ctdb = ctdb_db->ctdb;
410         struct ctdb_ltdb_header header;
411         TDB_DATA data;
412         int ret;
413         size_t len;
414         struct ctdb_req_call *c;
415
416         /* if the domain socket is not yet open, open it */
417         if (ctdb->daemon.sd==-1) {
418                 ctdb_socket_connect(ctdb);
419         }
420
421         ret = ctdb_ltdb_lock(ctdb_db, call->key);
422         if (ret != 0) {
423                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
424                 return NULL;
425         }
426
427         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
428
429         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
430                 ret = -1;
431         }
432
433         if (ret == 0 && header.dmaster == ctdb->pnn) {
434                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
435                 talloc_free(data.dptr);
436                 ctdb_ltdb_unlock(ctdb_db, call->key);
437                 return state;
438         }
439
440         ctdb_ltdb_unlock(ctdb_db, call->key);
441         talloc_free(data.dptr);
442
443         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
444         if (state == NULL) {
445                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
446                 return NULL;
447         }
448         state->call = talloc_zero(state, struct ctdb_call);
449         if (state->call == NULL) {
450                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
451                 return NULL;
452         }
453
454         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
455         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
456         if (c == NULL) {
457                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
458                 return NULL;
459         }
460
461         state->reqid     = reqid_new(ctdb->idr, state);
462         state->ctdb_db = ctdb_db;
463         talloc_set_destructor(state, ctdb_client_call_destructor);
464
465         c->hdr.reqid     = state->reqid;
466         c->flags         = call->flags;
467         c->db_id         = ctdb_db->db_id;
468         c->callid        = call->call_id;
469         c->hopcount      = 0;
470         c->keylen        = call->key.dsize;
471         c->calldatalen   = call->call_data.dsize;
472         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
473         memcpy(&c->data[call->key.dsize], 
474                call->call_data.dptr, call->call_data.dsize);
475         *(state->call)              = *call;
476         state->call->call_data.dptr = &c->data[call->key.dsize];
477         state->call->key.dptr       = &c->data[0];
478
479         state->state  = CTDB_CALL_WAIT;
480
481
482         ctdb_client_queue_pkt(ctdb, &c->hdr);
483
484         return state;
485 }
486
487
488 /*
489   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
490 */
491 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
492 {
493         struct ctdb_client_call_state *state;
494
495         state = ctdb_call_send(ctdb_db, call);
496         return ctdb_call_recv(state, call);
497 }
498
499
500 /*
501   tell the daemon what messaging srvid we will use, and register the message
502   handler function in the client
503 */
504 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
505                                     srvid_handler_fn handler,
506                                     void *private_data)
507 {
508         int res;
509         int32_t status;
510
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
512                            CTDB_CONTROL_REGISTER_SRVID, 0,
513                            tdb_null, NULL, NULL, &status, NULL, NULL);
514         if (res != 0 || status != 0) {
515                 DEBUG(DEBUG_ERR,
516                       ("Failed to register srvid %llu\n",
517                        (unsigned long long)srvid));
518                 return -1;
519         }
520
521         /* also need to register the handler with our own ctdb structure */
522         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
523 }
524
525 /*
526   tell the daemon we no longer want a srvid
527 */
528 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
529                                        uint64_t srvid, void *private_data)
530 {
531         int res;
532         int32_t status;
533
534         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
535                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
536                            tdb_null, NULL, NULL, &status, NULL, NULL);
537         if (res != 0 || status != 0) {
538                 DEBUG(DEBUG_ERR,
539                       ("Failed to deregister srvid %llu\n",
540                        (unsigned long long)srvid));
541                 return -1;
542         }
543
544         /* also need to register the handler with our own ctdb structure */
545         srvid_deregister(ctdb->srv, srvid, private_data);
546         return 0;
547 }
548
549 /*
550  * check server ids
551  */
552 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
553                                        uint8_t *result)
554 {
555         TDB_DATA indata, outdata;
556         int res;
557         int32_t status;
558         int i;
559
560         indata.dptr = (uint8_t *)ids;
561         indata.dsize = num * sizeof(*ids);
562
563         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
564                            indata, ctdb, &outdata, &status, NULL, NULL);
565         if (res != 0 || status != 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
567                 return -1;
568         }
569
570         if (outdata.dsize != num*sizeof(uint8_t)) {
571                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
572                                   (long unsigned int)num*sizeof(uint8_t),
573                                   outdata.dsize));
574                 talloc_free(outdata.dptr);
575                 return -1;
576         }
577
578         for (i=0; i<num; i++) {
579                 result[i] = outdata.dptr[i];
580         }
581
582         talloc_free(outdata.dptr);
583         return 0;
584 }
585
586 /*
587   send a message - from client context
588  */
589 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
590                       uint64_t srvid, TDB_DATA data)
591 {
592         struct ctdb_req_message *r;
593         int len, res;
594
595         len = offsetof(struct ctdb_req_message, data) + data.dsize;
596         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
597                                len, struct ctdb_req_message);
598         CTDB_NO_MEMORY(ctdb, r);
599
600         r->hdr.destnode  = pnn;
601         r->srvid         = srvid;
602         r->datalen       = data.dsize;
603         memcpy(&r->data[0], data.dptr, data.dsize);
604         
605         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
606         talloc_free(r);
607         return res;
608 }
609
610
611 /*
612   cancel a ctdb_fetch_lock operation, releasing the lock
613  */
614 static int fetch_lock_destructor(struct ctdb_record_handle *h)
615 {
616         ctdb_ltdb_unlock(h->ctdb_db, h->key);
617         return 0;
618 }
619
620 /*
621   force the migration of a record to this node
622  */
623 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
624 {
625         struct ctdb_call call;
626         ZERO_STRUCT(call);
627         call.call_id = CTDB_NULL_FUNC;
628         call.key = key;
629         call.flags = CTDB_IMMEDIATE_MIGRATION;
630         return ctdb_call(ctdb_db, &call);
631 }
632
633 /*
634   try to fetch a readonly copy of a record
635  */
636 static int
637 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
638 {
639         int ret;
640
641         struct ctdb_call call;
642         ZERO_STRUCT(call);
643
644         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
645         call.call_data.dptr = NULL;
646         call.call_data.dsize = 0;
647         call.key = key;
648         call.flags = CTDB_WANT_READONLY;
649         ret = ctdb_call(ctdb_db, &call);
650
651         if (ret != 0) {
652                 return -1;
653         }
654         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
655                 return -1;
656         }
657
658         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
659         if (*hdr == NULL) {
660                 talloc_free(call.reply_data.dptr);
661                 return -1;
662         }
663
664         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
665         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
666         if (data->dptr == NULL) {
667                 talloc_free(call.reply_data.dptr);
668                 talloc_free(hdr);
669                 return -1;
670         }
671
672         return 0;
673 }
674
675 /*
676   get a lock on a record, and return the records data. Blocks until it gets the lock
677  */
678 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
679                                            TDB_DATA key, TDB_DATA *data)
680 {
681         int ret;
682         struct ctdb_record_handle *h;
683
684         /*
685           procedure is as follows:
686
687           1) get the chain lock. 
688           2) check if we are dmaster
689           3) if we are the dmaster then return handle 
690           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
691              reply from ctdbd
692           5) when we get the reply, goto (1)
693          */
694
695         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
696         if (h == NULL) {
697                 return NULL;
698         }
699
700         h->ctdb_db = ctdb_db;
701         h->key     = key;
702         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
703         if (h->key.dptr == NULL) {
704                 talloc_free(h);
705                 return NULL;
706         }
707         h->data    = data;
708
709         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
710                  (const char *)key.dptr));
711
712 again:
713         /* step 1 - get the chain lock */
714         ret = ctdb_ltdb_lock(ctdb_db, key);
715         if (ret != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
717                 talloc_free(h);
718                 return NULL;
719         }
720
721         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
722
723         talloc_set_destructor(h, fetch_lock_destructor);
724
725         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
726
727         /* when torturing, ensure we test the remote path */
728         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
729             random() % 5 == 0) {
730                 h->header.dmaster = (uint32_t)-1;
731         }
732
733
734         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
735
736         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
737                 ctdb_ltdb_unlock(ctdb_db, key);
738                 ret = ctdb_client_force_migration(ctdb_db, key);
739                 if (ret != 0) {
740                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
741                         talloc_free(h);
742                         return NULL;
743                 }
744                 goto again;
745         }
746
747         /* if this is a request for read/write and we have delegations
748            we have to revoke all delegations first
749         */
750         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
751             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
752                 ctdb_ltdb_unlock(ctdb_db, key);
753                 ret = ctdb_client_force_migration(ctdb_db, key);
754                 if (ret != 0) {
755                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
756                         talloc_free(h);
757                         return NULL;
758                 }
759                 goto again;
760         }
761
762         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
763         return h;
764 }
765
766 /*
767   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
768  */
769 struct ctdb_record_handle *
770 ctdb_fetch_readonly_lock(
771         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
772         TDB_DATA key, TDB_DATA *data,
773         int read_only)
774 {
775         int ret;
776         struct ctdb_record_handle *h;
777         struct ctdb_ltdb_header *roheader = NULL;
778
779         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
780         if (h == NULL) {
781                 return NULL;
782         }
783
784         h->ctdb_db = ctdb_db;
785         h->key     = key;
786         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
787         if (h->key.dptr == NULL) {
788                 talloc_free(h);
789                 return NULL;
790         }
791         h->data    = data;
792
793         data->dptr = NULL;
794         data->dsize = 0;
795
796
797 again:
798         talloc_free(roheader);
799         roheader = NULL;
800
801         talloc_free(data->dptr);
802         data->dptr = NULL;
803         data->dsize = 0;
804
805         /* Lock the record/chain */
806         ret = ctdb_ltdb_lock(ctdb_db, key);
807         if (ret != 0) {
808                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
809                 talloc_free(h);
810                 return NULL;
811         }
812
813         talloc_set_destructor(h, fetch_lock_destructor);
814
815         /* Check if record exists yet in the TDB */
816         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
817         if (ret != 0) {
818                 ctdb_ltdb_unlock(ctdb_db, key);
819                 ret = ctdb_client_force_migration(ctdb_db, key);
820                 if (ret != 0) {
821                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
822                         talloc_free(h);
823                         return NULL;
824                 }
825                 goto again;
826         }
827
828         /* if this is a request for read/write and we have delegations
829            we have to revoke all delegations first
830         */
831         if ((read_only == 0) 
832         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
833         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
834                 ctdb_ltdb_unlock(ctdb_db, key);
835                 ret = ctdb_client_force_migration(ctdb_db, key);
836                 if (ret != 0) {
837                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                         talloc_free(h);
839                         return NULL;
840                 }
841                 goto again;
842         }
843
844         /* if we are dmaster, just return the handle */
845         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
846                 return h;
847         }
848
849         if (read_only != 0) {
850                 TDB_DATA rodata = {NULL, 0};
851
852                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
853                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
854                         return h;
855                 }
856
857                 ctdb_ltdb_unlock(ctdb_db, key);
858                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
859                 if (ret != 0) {
860                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
861                         ret = ctdb_client_force_migration(ctdb_db, key);
862                         if (ret != 0) {
863                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
864                                 talloc_free(h);
865                                 return NULL;
866                         }
867
868                         goto again;
869                 }
870
871                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
872                         ret = ctdb_client_force_migration(ctdb_db, key);
873                         if (ret != 0) {
874                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
875                                 talloc_free(h);
876                                 return NULL;
877                         }
878
879                         goto again;
880                 }
881
882                 ret = ctdb_ltdb_lock(ctdb_db, key);
883                 if (ret != 0) {
884                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
885                         talloc_free(h);
886                         return NULL;
887                 }
888
889                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
890                 if (ret != 0) {
891                         ctdb_ltdb_unlock(ctdb_db, key);
892
893                         ret = ctdb_client_force_migration(ctdb_db, key);
894                         if (ret != 0) {
895                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
896                                 talloc_free(h);
897                                 return NULL;
898                         }
899
900                         goto again;
901                 }
902
903                 return h;
904         }
905
906         /* we are not dmaster and this was not a request for a readonly lock
907          * so unlock the record, migrate it and try again
908          */
909         ctdb_ltdb_unlock(ctdb_db, key);
910         ret = ctdb_client_force_migration(ctdb_db, key);
911         if (ret != 0) {
912                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
913                 talloc_free(h);
914                 return NULL;
915         }
916         goto again;
917 }
918
919 /*
920   store some data to the record that was locked with ctdb_fetch_lock()
921 */
922 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
923 {
924         if (h->ctdb_db->persistent) {
925                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
926                 return -1;
927         }
928
929         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
930 }
931
932 /*
933   non-locking fetch of a record
934  */
935 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
936                TDB_DATA key, TDB_DATA *data)
937 {
938         struct ctdb_call call;
939         int ret;
940
941         call.call_id = CTDB_FETCH_FUNC;
942         call.call_data.dptr = NULL;
943         call.call_data.dsize = 0;
944         call.key = key;
945
946         ret = ctdb_call(ctdb_db, &call);
947
948         if (ret == 0) {
949                 *data = call.reply_data;
950                 talloc_steal(mem_ctx, data->dptr);
951         }
952
953         return ret;
954 }
955
956
957
958 /*
959    called when a control completes or timesout to invoke the callback
960    function the user provided
961 */
962 static void invoke_control_callback(struct tevent_context *ev,
963                                     struct tevent_timer *te,
964                                     struct timeval t, void *private_data)
965 {
966         struct ctdb_client_control_state *state;
967         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
968         int ret;
969
970         state = talloc_get_type(private_data, struct ctdb_client_control_state);
971         talloc_steal(tmp_ctx, state);
972
973         ret = ctdb_control_recv(state->ctdb, state, state,
974                         NULL, 
975                         NULL, 
976                         NULL);
977         if (ret != 0) {
978                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
979         }
980
981         talloc_free(tmp_ctx);
982 }
983
984 /*
985   called when a CTDB_REPLY_CONTROL packet comes in in the client
986
987   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
988   contains any reply data from the control
989 */
990 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
991                                       struct ctdb_req_header *hdr)
992 {
993         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
994         struct ctdb_client_control_state *state;
995
996         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
997         if (state == NULL) {
998                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
999                 return;
1000         }
1001
1002         if (hdr->reqid != state->reqid) {
1003                 /* we found a record  but it was the wrong one */
1004                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
1005                 return;
1006         }
1007
1008         state->outdata.dptr = c->data;
1009         state->outdata.dsize = c->datalen;
1010         state->status = c->status;
1011         if (c->errorlen) {
1012                 state->errormsg = talloc_strndup(state, 
1013                                                  (char *)&c->data[c->datalen], 
1014                                                  c->errorlen);
1015         }
1016
1017         /* state->outdata now uses resources from c so we dont want c
1018            to just dissappear from under us while state is still alive
1019         */
1020         talloc_steal(state, c);
1021
1022         state->state = CTDB_CONTROL_DONE;
1023
1024         /* if we had a callback registered for this control, pull the response
1025            and call the callback.
1026         */
1027         if (state->async.fn) {
1028                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1029                                  invoke_control_callback, state);
1030         }
1031 }
1032
1033
1034 /*
1035   destroy a ctdb_control in client
1036 */
1037 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1038 {
1039         reqid_remove(state->ctdb->idr, state->reqid);
1040         return 0;
1041 }
1042
1043
1044 /* time out handler for ctdb_control */
1045 static void control_timeout_func(struct tevent_context *ev,
1046                                  struct tevent_timer *te,
1047                                  struct timeval t, void *private_data)
1048 {
1049         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1050
1051         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1052                          "dstnode:%u\n", state->reqid, state->c->opcode,
1053                          state->c->hdr.destnode));
1054
1055         state->state = CTDB_CONTROL_TIMEOUT;
1056
1057         /* if we had a callback registered for this control, pull the response
1058            and call the callback.
1059         */
1060         if (state->async.fn) {
1061                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
1062                                  invoke_control_callback, state);
1063         }
1064 }
1065
1066 /* async version of send control request */
1067 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1068                 uint32_t destnode, uint64_t srvid, 
1069                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1070                 TALLOC_CTX *mem_ctx,
1071                 struct timeval *timeout,
1072                 char **errormsg)
1073 {
1074         struct ctdb_client_control_state *state;
1075         size_t len;
1076         struct ctdb_req_control *c;
1077         int ret;
1078
1079         if (errormsg) {
1080                 *errormsg = NULL;
1081         }
1082
1083         /* if the domain socket is not yet open, open it */
1084         if (ctdb->daemon.sd==-1) {
1085                 ctdb_socket_connect(ctdb);
1086         }
1087
1088         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1089         CTDB_NO_MEMORY_NULL(ctdb, state);
1090
1091         state->ctdb       = ctdb;
1092         state->reqid      = reqid_new(ctdb->idr, state);
1093         state->state      = CTDB_CONTROL_WAIT;
1094         state->errormsg   = NULL;
1095
1096         talloc_set_destructor(state, ctdb_client_control_destructor);
1097
1098         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1099         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1100                                len, struct ctdb_req_control);
1101         state->c            = c;        
1102         CTDB_NO_MEMORY_NULL(ctdb, c);
1103         c->hdr.reqid        = state->reqid;
1104         c->hdr.destnode     = destnode;
1105         c->opcode           = opcode;
1106         c->client_id        = 0;
1107         c->flags            = flags;
1108         c->srvid            = srvid;
1109         c->datalen          = data.dsize;
1110         if (data.dsize) {
1111                 memcpy(&c->data[0], data.dptr, data.dsize);
1112         }
1113
1114         /* timeout */
1115         if (timeout && !timeval_is_zero(timeout)) {
1116                 tevent_add_timer(ctdb->ev, state, *timeout,
1117                                  control_timeout_func, state);
1118         }
1119
1120         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1121         if (ret != 0) {
1122                 talloc_free(state);
1123                 return NULL;
1124         }
1125
1126         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1127                 talloc_free(state);
1128                 return NULL;
1129         }
1130
1131         return state;
1132 }
1133
1134
1135 /* async version of receive control reply */
1136 int ctdb_control_recv(struct ctdb_context *ctdb, 
1137                 struct ctdb_client_control_state *state, 
1138                 TALLOC_CTX *mem_ctx,
1139                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1140 {
1141         TALLOC_CTX *tmp_ctx;
1142
1143         if (status != NULL) {
1144                 *status = -1;
1145         }
1146         if (errormsg != NULL) {
1147                 *errormsg = NULL;
1148         }
1149
1150         if (state == NULL) {
1151                 return -1;
1152         }
1153
1154         /* prevent double free of state */
1155         tmp_ctx = talloc_new(ctdb);
1156         talloc_steal(tmp_ctx, state);
1157
1158         /* loop one event at a time until we either timeout or the control
1159            completes.
1160         */
1161         while (state->state == CTDB_CONTROL_WAIT) {
1162                 tevent_loop_once(ctdb->ev);
1163         }
1164
1165         if (state->state != CTDB_CONTROL_DONE) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1167                 if (state->async.fn) {
1168                         state->async.fn(state);
1169                 }
1170                 talloc_free(tmp_ctx);
1171                 return -1;
1172         }
1173
1174         if (state->errormsg) {
1175                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1176                 if (errormsg) {
1177                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1178                 }
1179                 if (state->async.fn) {
1180                         state->async.fn(state);
1181                 }
1182                 talloc_free(tmp_ctx);
1183                 return (status == 0 ? -1 : state->status);
1184         }
1185
1186         if (outdata) {
1187                 *outdata = state->outdata;
1188                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1189         }
1190
1191         if (status) {
1192                 *status = state->status;
1193         }
1194
1195         if (state->async.fn) {
1196                 state->async.fn(state);
1197         }
1198
1199         talloc_free(tmp_ctx);
1200         return 0;
1201 }
1202
1203
1204
1205 /*
1206   send a ctdb control message
1207   timeout specifies how long we should wait for a reply.
1208   if timeout is NULL we wait indefinitely
1209  */
1210 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1211                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1212                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1213                  struct timeval *timeout,
1214                  char **errormsg)
1215 {
1216         struct ctdb_client_control_state *state;
1217
1218         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1219                         flags, data, mem_ctx,
1220                         timeout, errormsg);
1221
1222         /* FIXME: Error conditions in ctdb_control_send return NULL without
1223          * setting errormsg.  So, there is no way to distinguish between sucess
1224          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1225         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1226                 if (status != NULL) {
1227                         *status = 0;
1228                 }
1229                 return 0;
1230         }
1231
1232         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1233                         errormsg);
1234 }
1235
1236
1237
1238
1239 /*
1240   a process exists call. Returns 0 if process exists, -1 otherwise
1241  */
1242 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1243 {
1244         int ret;
1245         TDB_DATA data;
1246         int32_t status;
1247
1248         data.dptr = (uint8_t*)&pid;
1249         data.dsize = sizeof(pid);
1250
1251         ret = ctdb_control(ctdb, destnode, 0, 
1252                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1253                            NULL, NULL, &status, NULL, NULL);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1256                 return -1;
1257         }
1258
1259         return status;
1260 }
1261
1262 /*
1263   get remote statistics
1264  */
1265 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1266 {
1267         int ret;
1268         TDB_DATA data;
1269         int32_t res;
1270
1271         ret = ctdb_control(ctdb, destnode, 0, 
1272                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1273                            ctdb, &data, &res, NULL, NULL);
1274         if (ret != 0 || res != 0) {
1275                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1276                 return -1;
1277         }
1278
1279         if (data.dsize != sizeof(struct ctdb_statistics)) {
1280                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1281                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1282                       return -1;
1283         }
1284
1285         *status = *(struct ctdb_statistics *)data.dptr;
1286         talloc_free(data.dptr);
1287                         
1288         return 0;
1289 }
1290
1291 /*
1292  * get db statistics
1293  */
1294 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1295                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1296 {
1297         int ret;
1298         TDB_DATA indata, outdata;
1299         int32_t res;
1300         struct ctdb_db_statistics *wire, *s;
1301         char *ptr;
1302         int i;
1303
1304         indata.dptr = (uint8_t *)&dbid;
1305         indata.dsize = sizeof(dbid);
1306
1307         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1308                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1309         if (ret != 0 || res != 0) {
1310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1311                 return -1;
1312         }
1313
1314         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1315                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1316                                  outdata.dsize,
1317                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1318                 return -1;
1319         }
1320
1321         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1322         if (s == NULL) {
1323                 talloc_free(outdata.dptr);
1324                 CTDB_NO_MEMORY(ctdb, s);
1325         }
1326
1327         wire = (struct ctdb_db_statistics *)outdata.dptr;
1328         memcpy(s, wire, offsetof(struct ctdb_db_statistics, hot_keys_wire));
1329         ptr = &wire->hot_keys_wire[0];
1330         for (i=0; i<wire->num_hot_keys; i++) {
1331                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1332                 if (s->hot_keys[i].key.dptr == NULL) {
1333                         talloc_free(outdata.dptr);
1334                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1335                 }
1336
1337                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1338                 ptr += wire->hot_keys[i].key.dsize;
1339         }
1340
1341         talloc_free(outdata.dptr);
1342         *dbstat = s;
1343         return 0;
1344 }
1345
1346 /*
1347   shutdown a remote ctdb node
1348  */
1349 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1350 {
1351         struct ctdb_client_control_state *state;
1352
1353         state = ctdb_control_send(ctdb, destnode, 0, 
1354                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1355                            NULL, &timeout, NULL);
1356         if (state == NULL) {
1357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1358                 return -1;
1359         }
1360
1361         return 0;
1362 }
1363
1364 /*
1365   get vnn map from a remote node
1366  */
1367 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1368 {
1369         int ret;
1370         TDB_DATA outdata;
1371         int32_t res;
1372         struct ctdb_vnn_map_wire *map;
1373
1374         ret = ctdb_control(ctdb, destnode, 0, 
1375                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1376                            mem_ctx, &outdata, &res, &timeout, NULL);
1377         if (ret != 0 || res != 0) {
1378                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1379                 return -1;
1380         }
1381         
1382         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1383         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1384             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1385                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1386                 return -1;
1387         }
1388
1389         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1390         CTDB_NO_MEMORY(ctdb, *vnnmap);
1391         (*vnnmap)->generation = map->generation;
1392         (*vnnmap)->size       = map->size;
1393         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1394
1395         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1396         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1397         talloc_free(outdata.dptr);
1398                     
1399         return 0;
1400 }
1401
1402
1403 /*
1404   get the recovery mode of a remote node
1405  */
1406 struct ctdb_client_control_state *
1407 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1408 {
1409         return ctdb_control_send(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1411                            mem_ctx, &timeout, NULL);
1412 }
1413
1414 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1415 {
1416         int ret;
1417         int32_t res;
1418
1419         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1422                 return -1;
1423         }
1424
1425         if (recmode) {
1426                 *recmode = (uint32_t)res;
1427         }
1428
1429         return 0;
1430 }
1431
1432 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1433 {
1434         struct ctdb_client_control_state *state;
1435
1436         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1437         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1438 }
1439
1440
1441
1442
1443 /*
1444   set the recovery mode of a remote node
1445  */
1446 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1447 {
1448         int ret;
1449         TDB_DATA data;
1450         int32_t res;
1451
1452         data.dsize = sizeof(uint32_t);
1453         data.dptr = (unsigned char *)&recmode;
1454
1455         ret = ctdb_control(ctdb, destnode, 0, 
1456                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1457                            NULL, NULL, &res, &timeout, NULL);
1458         if (ret != 0 || res != 0) {
1459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1460                 return -1;
1461         }
1462
1463         return 0;
1464 }
1465
1466
1467
1468 /*
1469   get the recovery master of a remote node
1470  */
1471 struct ctdb_client_control_state *
1472 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1473                         struct timeval timeout, uint32_t destnode)
1474 {
1475         return ctdb_control_send(ctdb, destnode, 0, 
1476                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1477                            mem_ctx, &timeout, NULL);
1478 }
1479
1480 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1481 {
1482         int ret;
1483         int32_t res;
1484
1485         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1486         if (ret != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1488                 return -1;
1489         }
1490
1491         if (recmaster) {
1492                 *recmaster = (uint32_t)res;
1493         }
1494
1495         return 0;
1496 }
1497
1498 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1499 {
1500         struct ctdb_client_control_state *state;
1501
1502         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1503         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1504 }
1505
1506
1507 /*
1508   set the recovery master of a remote node
1509  */
1510 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1511 {
1512         int ret;
1513         TDB_DATA data;
1514         int32_t res;
1515
1516         ZERO_STRUCT(data);
1517         data.dsize = sizeof(uint32_t);
1518         data.dptr = (unsigned char *)&recmaster;
1519
1520         ret = ctdb_control(ctdb, destnode, 0, 
1521                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1522                            NULL, NULL, &res, &timeout, NULL);
1523         if (ret != 0 || res != 0) {
1524                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1525                 return -1;
1526         }
1527
1528         return 0;
1529 }
1530
1531
1532 /*
1533   get a list of databases off a remote node
1534  */
1535 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1536                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1537 {
1538         int ret;
1539         TDB_DATA outdata;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1551         talloc_free(outdata.dptr);
1552                     
1553         return 0;
1554 }
1555
1556 /*
1557   get a list of nodes (vnn and flags ) from a remote node
1558  */
1559 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1560                 struct timeval timeout, uint32_t destnode, 
1561                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1562 {
1563         int ret;
1564         TDB_DATA outdata;
1565         int32_t res;
1566
1567         ret = ctdb_control(ctdb, destnode, 0,
1568                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1569                            mem_ctx, &outdata, &res, &timeout, NULL);
1570         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1572                 return -1;
1573         }
1574
1575         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1576         talloc_free(outdata.dptr);
1577         return 0;
1578 }
1579
1580 /*
1581   load nodes file on a remote node and return as a node map
1582  */
1583 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1584                            struct timeval timeout, uint32_t destnode,
1585                            TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1586 {
1587         int ret;
1588         TDB_DATA outdata;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0,
1592                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1593                            mem_ctx, &outdata, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1596                 return -1;
1597         }
1598
1599         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1600         talloc_free(outdata.dptr);
1601
1602         return 0;
1603 }
1604
1605 /*
1606   drop the transport, reload the nodes file and restart the transport
1607  */
1608 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1609                     struct timeval timeout, uint32_t destnode)
1610 {
1611         int ret;
1612         int32_t res;
1613
1614         ret = ctdb_control(ctdb, destnode, 0, 
1615                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1616                            NULL, NULL, &res, &timeout, NULL);
1617         if (ret != 0 || res != 0) {
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1619                 return -1;
1620         }
1621
1622         return 0;
1623 }
1624
1625
1626 /*
1627   set vnn map on a node
1628  */
1629 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1630                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1631 {
1632         int ret;
1633         TDB_DATA data;
1634         int32_t res;
1635         struct ctdb_vnn_map_wire *map;
1636         size_t len;
1637
1638         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1639         map = talloc_size(mem_ctx, len);
1640         CTDB_NO_MEMORY(ctdb, map);
1641
1642         map->generation = vnnmap->generation;
1643         map->size = vnnmap->size;
1644         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1645         
1646         data.dsize = len;
1647         data.dptr  = (uint8_t *)map;
1648
1649         ret = ctdb_control(ctdb, destnode, 0, 
1650                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1651                            NULL, NULL, &res, &timeout, NULL);
1652         if (ret != 0 || res != 0) {
1653                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1654                 return -1;
1655         }
1656
1657         talloc_free(map);
1658
1659         return 0;
1660 }
1661
1662
1663 /*
1664   async send for pull database
1665  */
1666 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1667         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1668         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1669 {
1670         TDB_DATA indata;
1671         struct ctdb_control_pulldb *pull;
1672         struct ctdb_client_control_state *state;
1673
1674         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1675         CTDB_NO_MEMORY_NULL(ctdb, pull);
1676
1677         pull->db_id   = dbid;
1678         pull->lmaster = lmaster;
1679
1680         indata.dsize = sizeof(struct ctdb_control_pulldb);
1681         indata.dptr  = (unsigned char *)pull;
1682
1683         state = ctdb_control_send(ctdb, destnode, 0, 
1684                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1685                                   mem_ctx, &timeout, NULL);
1686         talloc_free(pull);
1687
1688         return state;
1689 }
1690
1691 /*
1692   async recv for pull database
1693  */
1694 int ctdb_ctrl_pulldb_recv(
1695         struct ctdb_context *ctdb, 
1696         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1697         TDB_DATA *outdata)
1698 {
1699         int ret;
1700         int32_t res;
1701
1702         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1703         if ( (ret != 0) || (res != 0) ){
1704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1705                 return -1;
1706         }
1707
1708         return 0;
1709 }
1710
1711 /*
1712   pull all keys and records for a specific database on a node
1713  */
1714 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1715                 uint32_t dbid, uint32_t lmaster, 
1716                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1717                 TDB_DATA *outdata)
1718 {
1719         struct ctdb_client_control_state *state;
1720
1721         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1722                                       timeout);
1723         
1724         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1725 }
1726
1727
1728 /*
1729   change dmaster for all keys in the database to the new value
1730  */
1731 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1732                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1733 {
1734         int ret;
1735         TDB_DATA indata;
1736         int32_t res;
1737
1738         indata.dsize = 2*sizeof(uint32_t);
1739         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1740
1741         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1742         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, 
1745                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1746                            NULL, NULL, &res, &timeout, NULL);
1747         if (ret != 0 || res != 0) {
1748                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1749                 return -1;
1750         }
1751
1752         return 0;
1753 }
1754
1755 /*
1756   ping a node, return number of clients connected
1757  */
1758 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1759 {
1760         int ret;
1761         int32_t res;
1762
1763         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1764                            tdb_null, NULL, NULL, &res, NULL, NULL);
1765         if (ret != 0) {
1766                 return -1;
1767         }
1768         return res;
1769 }
1770
1771 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1772                            struct timeval timeout, 
1773                            uint32_t destnode,
1774                            uint32_t *runstate)
1775 {
1776         TDB_DATA outdata;
1777         int32_t res;
1778         int ret;
1779
1780         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1781                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1782         if (ret != 0 || res != 0) {
1783                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1784                 return ret != 0 ? ret : res;
1785         }
1786
1787         if (outdata.dsize != sizeof(uint32_t)) {
1788                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1789                 talloc_free(outdata.dptr);
1790                 return -1;
1791         }
1792
1793         if (runstate != NULL) {
1794                 *runstate = *(uint32_t *)outdata.dptr;
1795         }
1796         talloc_free(outdata.dptr);
1797
1798         return 0;
1799 }
1800
1801 /*
1802   find the real path to a ltdb 
1803  */
1804 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1805                    const char **path)
1806 {
1807         int ret;
1808         int32_t res;
1809         TDB_DATA data;
1810
1811         data.dptr = (uint8_t *)&dbid;
1812         data.dsize = sizeof(dbid);
1813
1814         ret = ctdb_control(ctdb, destnode, 0, 
1815                            CTDB_CONTROL_GETDBPATH, 0, data, 
1816                            mem_ctx, &data, &res, &timeout, NULL);
1817         if (ret != 0 || res != 0) {
1818                 return -1;
1819         }
1820
1821         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1822         if ((*path) == NULL) {
1823                 return -1;
1824         }
1825
1826         talloc_free(data.dptr);
1827
1828         return 0;
1829 }
1830
1831 /*
1832   find the name of a db 
1833  */
1834 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1835                    const char **name)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&dbid;
1842         data.dsize = sizeof(dbid);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, 
1845                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1846                            mem_ctx, &data, &res, &timeout, NULL);
1847         if (ret != 0 || res != 0) {
1848                 return -1;
1849         }
1850
1851         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1852         if ((*name) == NULL) {
1853                 return -1;
1854         }
1855
1856         talloc_free(data.dptr);
1857
1858         return 0;
1859 }
1860
1861 /*
1862   get the health status of a db
1863  */
1864 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1865                           struct timeval timeout,
1866                           uint32_t destnode,
1867                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1868                           const char **reason)
1869 {
1870         int ret;
1871         int32_t res;
1872         TDB_DATA data;
1873
1874         data.dptr = (uint8_t *)&dbid;
1875         data.dsize = sizeof(dbid);
1876
1877         ret = ctdb_control(ctdb, destnode, 0,
1878                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1879                            mem_ctx, &data, &res, &timeout, NULL);
1880         if (ret != 0 || res != 0) {
1881                 return -1;
1882         }
1883
1884         if (data.dsize == 0) {
1885                 (*reason) = NULL;
1886                 return 0;
1887         }
1888
1889         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1890         if ((*reason) == NULL) {
1891                 return -1;
1892         }
1893
1894         talloc_free(data.dptr);
1895
1896         return 0;
1897 }
1898
1899 /*
1900  * get db sequence number
1901  */
1902 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1903                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1904 {
1905         int ret;
1906         int32_t res;
1907         TDB_DATA data, outdata;
1908
1909         data.dptr = (uint8_t *)&dbid;
1910         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1911
1912         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1913                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1914         if (ret != 0 || res != 0) {
1915                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1916                 return -1;
1917         }
1918
1919         if (outdata.dsize != sizeof(uint64_t)) {
1920                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1921                 talloc_free(outdata.dptr);
1922                 return -1;
1923         }
1924
1925         if (seqnum != NULL) {
1926                 *seqnum = *(uint64_t *)outdata.dptr;
1927         }
1928         talloc_free(outdata.dptr);
1929
1930         return 0;
1931 }
1932
1933 /*
1934   create a database
1935  */
1936 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1937                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1938 {
1939         int ret;
1940         int32_t res;
1941         TDB_DATA data;
1942         uint64_t tdb_flags = 0;
1943
1944         data.dptr = discard_const(name);
1945         data.dsize = strlen(name)+1;
1946
1947         /* Make sure that volatile databases use jenkins hash */
1948         if (!persistent) {
1949                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1950         }
1951
1952 #ifdef TDB_MUTEX_LOCKING
1953         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1954                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1955         }
1956 #endif
1957
1958         ret = ctdb_control(ctdb, destnode, tdb_flags,
1959                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1960                            0, data, 
1961                            mem_ctx, &data, &res, &timeout, NULL);
1962
1963         if (ret != 0 || res != 0) {
1964                 return -1;
1965         }
1966
1967         return 0;
1968 }
1969
1970 /*
1971   get debug level on a node
1972  */
1973 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1974 {
1975         int ret;
1976         int32_t res;
1977         TDB_DATA data;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1980                            ctdb, &data, &res, NULL, NULL);
1981         if (ret != 0 || res != 0) {
1982                 return -1;
1983         }
1984         if (data.dsize != sizeof(int32_t)) {
1985                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1986                          (unsigned)data.dsize));
1987                 return -1;
1988         }
1989         *level = *(int32_t *)data.dptr;
1990         talloc_free(data.dptr);
1991         return 0;
1992 }
1993
1994 /*
1995   set debug level on a node
1996  */
1997 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1998 {
1999         int ret;
2000         int32_t res;
2001         TDB_DATA data;
2002
2003         data.dptr = (uint8_t *)&level;
2004         data.dsize = sizeof(level);
2005
2006         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
2007                            NULL, NULL, &res, NULL, NULL);
2008         if (ret != 0 || res != 0) {
2009                 return -1;
2010         }
2011         return 0;
2012 }
2013
2014
2015 /*
2016   get a list of connected nodes
2017  */
2018 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
2019                                 struct timeval timeout,
2020                                 TALLOC_CTX *mem_ctx,
2021                                 uint32_t *num_nodes)
2022 {
2023         struct ctdb_node_map *map=NULL;
2024         int ret, i;
2025         uint32_t *nodes;
2026
2027         *num_nodes = 0;
2028
2029         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2030         if (ret != 0) {
2031                 return NULL;
2032         }
2033
2034         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2035         if (nodes == NULL) {
2036                 return NULL;
2037         }
2038
2039         for (i=0;i<map->num;i++) {
2040                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2041                         nodes[*num_nodes] = map->nodes[i].pnn;
2042                         (*num_nodes)++;
2043                 }
2044         }
2045
2046         return nodes;
2047 }
2048
2049
2050 /*
2051   reset remote status
2052  */
2053 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2054 {
2055         int ret;
2056         int32_t res;
2057
2058         ret = ctdb_control(ctdb, destnode, 0, 
2059                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2060                            NULL, NULL, &res, NULL, NULL);
2061         if (ret != 0 || res != 0) {
2062                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2063                 return -1;
2064         }
2065         return 0;
2066 }
2067
2068 /*
2069   attach to a specific database - client call
2070 */
2071 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2072                                     struct timeval timeout,
2073                                     const char *name,
2074                                     bool persistent,
2075                                     uint32_t tdb_flags)
2076 {
2077         struct ctdb_db_context *ctdb_db;
2078         TDB_DATA data;
2079         int ret;
2080         int32_t res;
2081 #ifdef TDB_MUTEX_LOCKING
2082         uint32_t mutex_enabled = 0;
2083 #endif
2084
2085         ctdb_db = ctdb_db_handle(ctdb, name);
2086         if (ctdb_db) {
2087                 return ctdb_db;
2088         }
2089
2090         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2091         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2092
2093         ctdb_db->ctdb = ctdb;
2094         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2095         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2096
2097         data.dptr = discard_const(name);
2098         data.dsize = strlen(name)+1;
2099
2100         /* CTDB has switched to using jenkins hash for volatile databases.
2101          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2102          * always set it.
2103          */
2104         if (!persistent) {
2105                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2106         }
2107
2108 #ifdef TDB_MUTEX_LOCKING
2109         if (!persistent) {
2110                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2111                                             CTDB_CURRENT_NODE,
2112                                             "TDBMutexEnabled",
2113                                             &mutex_enabled);
2114                 if (ret != 0) {
2115                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2116                 }
2117
2118                 if (mutex_enabled == 1) {
2119                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2120                 }
2121         }
2122 #endif
2123
2124         /* tell ctdb daemon to attach */
2125         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2126                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2127                            0, data, ctdb_db, &data, &res, NULL, NULL);
2128         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2129                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2130                 talloc_free(ctdb_db);
2131                 return NULL;
2132         }
2133         
2134         ctdb_db->db_id = *(uint32_t *)data.dptr;
2135         talloc_free(data.dptr);
2136
2137         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2140                 talloc_free(ctdb_db);
2141                 return NULL;
2142         }
2143
2144         if (persistent) {
2145                 tdb_flags = TDB_DEFAULT;
2146         } else {
2147                 tdb_flags = TDB_NOSYNC;
2148 #ifdef TDB_MUTEX_LOCKING
2149                 if (mutex_enabled) {
2150                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2151                 }
2152 #endif
2153         }
2154         if (ctdb->valgrinding) {
2155                 tdb_flags |= TDB_NOMMAP;
2156         }
2157         tdb_flags |= TDB_DISALLOW_NESTING;
2158
2159         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2160                                       O_RDWR, 0);
2161         if (ctdb_db->ltdb == NULL) {
2162                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2163                 talloc_free(ctdb_db);
2164                 return NULL;
2165         }
2166
2167         ctdb_db->persistent = persistent;
2168
2169         DLIST_ADD(ctdb->db_list, ctdb_db);
2170
2171         /* add well known functions */
2172         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2173         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2174         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2175
2176         return ctdb_db;
2177 }
2178
2179 /*
2180  * detach from a specific database - client call
2181  */
2182 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2183 {
2184         int ret;
2185         int32_t status;
2186         TDB_DATA data;
2187
2188         data.dsize = sizeof(db_id);
2189         data.dptr = (uint8_t *)&db_id;
2190
2191         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2192                            0, data, NULL, NULL, &status, NULL, NULL);
2193         if (ret != 0 || status != 0) {
2194                 return -1;
2195         }
2196         return 0;
2197 }
2198
2199 /*
2200   setup a call for a database
2201  */
2202 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2203 {
2204         struct ctdb_registered_call *call;
2205
2206         /* register locally */
2207         call = talloc(ctdb_db, struct ctdb_registered_call);
2208         call->fn = fn;
2209         call->id = id;
2210
2211         DLIST_ADD(ctdb_db->calls, call);
2212         return 0;
2213 }
2214
2215
2216 struct traverse_state {
2217         bool done;
2218         uint32_t count;
2219         ctdb_traverse_func fn;
2220         void *private_data;
2221         bool listemptyrecords;
2222 };
2223
2224 /*
2225   called on each key during a ctdb_traverse
2226  */
2227 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2228 {
2229         struct traverse_state *state = (struct traverse_state *)p;
2230         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2231         TDB_DATA key;
2232
2233         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2234                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2235                                   (unsigned)data.dsize));
2236                 state->done = true;
2237                 return;
2238         }
2239
2240         key.dsize = d->keylen;
2241         key.dptr  = &d->data[0];
2242         data.dsize = d->datalen;
2243         data.dptr = &d->data[d->keylen];
2244
2245         if (key.dsize == 0 && data.dsize == 0) {
2246                 /* end of traverse */
2247                 state->done = true;
2248                 return;
2249         }
2250
2251         if (!state->listemptyrecords &&
2252             data.dsize == sizeof(struct ctdb_ltdb_header))
2253         {
2254                 /* empty records are deleted records in ctdb */
2255                 return;
2256         }
2257
2258         if (state->fn(key, data, state->private_data) != 0) {
2259                 state->done = true;
2260         }
2261
2262         state->count++;
2263 }
2264
2265 /**
2266  * start a cluster wide traverse, calling the supplied fn on each record
2267  * return the number of records traversed, or -1 on error
2268  *
2269  * Extendet variant with a flag to signal whether empty records should
2270  * be listed.
2271  */
2272 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2273                              ctdb_traverse_func fn,
2274                              bool withemptyrecords,
2275                              void *private_data)
2276 {
2277         TDB_DATA data;
2278         struct ctdb_traverse_start_ext t;
2279         int32_t status;
2280         int ret;
2281         uint64_t srvid = (getpid() | 0xFLL<<60);
2282         struct traverse_state state;
2283
2284         state.done = false;
2285         state.count = 0;
2286         state.private_data = private_data;
2287         state.fn = fn;
2288         state.listemptyrecords = withemptyrecords;
2289
2290         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2291         if (ret != 0) {
2292                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2293                 return -1;
2294         }
2295
2296         t.db_id = ctdb_db->db_id;
2297         t.srvid = srvid;
2298         t.reqid = 0;
2299         t.withemptyrecords = withemptyrecords;
2300
2301         data.dptr = (uint8_t *)&t;
2302         data.dsize = sizeof(t);
2303
2304         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2305                            data, NULL, NULL, &status, NULL, NULL);
2306         if (ret != 0 || status != 0) {
2307                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2308                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2309                 return -1;
2310         }
2311
2312         while (!state.done) {
2313                 tevent_loop_once(ctdb_db->ctdb->ev);
2314         }
2315
2316         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2319                 return -1;
2320         }
2321
2322         return state.count;
2323 }
2324
2325 /**
2326  * start a cluster wide traverse, calling the supplied fn on each record
2327  * return the number of records traversed, or -1 on error
2328  *
2329  * Standard version which does not list the empty records:
2330  * These are considered deleted.
2331  */
2332 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2333 {
2334         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2335 }
2336
2337 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2338 /*
2339   called on each key during a catdb
2340  */
2341 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2342 {
2343         int i;
2344         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2345         FILE *f = c->f;
2346         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2347
2348         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2349         for (i=0;i<key.dsize;i++) {
2350                 if (ISASCII(key.dptr[i])) {
2351                         fprintf(f, "%c", key.dptr[i]);
2352                 } else {
2353                         fprintf(f, "\\%02X", key.dptr[i]);
2354                 }
2355         }
2356         fprintf(f, "\"\n");
2357
2358         fprintf(f, "dmaster: %u\n", h->dmaster);
2359         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2360
2361         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2362                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2363         }
2364
2365         if (c->printhash) {
2366                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2367         }
2368
2369         if (c->printrecordflags) {
2370                 fprintf(f, "flags: 0x%08x", h->flags);
2371                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2372                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2373                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2374                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2375                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2376                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2377                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2378                 fprintf(f, "\n");
2379         }
2380
2381         if (c->printdatasize) {
2382                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2383         } else {
2384                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2385                 for (i=sizeof(*h);i<data.dsize;i++) {
2386                         if (ISASCII(data.dptr[i])) {
2387                                 fprintf(f, "%c", data.dptr[i]);
2388                         } else {
2389                                 fprintf(f, "\\%02X", data.dptr[i]);
2390                         }
2391                 }
2392                 fprintf(f, "\"\n");
2393         }
2394
2395         fprintf(f, "\n");
2396
2397         return 0;
2398 }
2399
2400 /*
2401   convenience function to list all keys to stdout
2402  */
2403 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2404                  struct ctdb_dump_db_context *ctx)
2405 {
2406         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2407                                  ctx->printemptyrecords, ctx);
2408 }
2409
2410 /*
2411   get the pid of a ctdb daemon
2412  */
2413 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2414 {
2415         int ret;
2416         int32_t res;
2417
2418         ret = ctdb_control(ctdb, destnode, 0, 
2419                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2420                            NULL, NULL, &res, &timeout, NULL);
2421         if (ret != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2423                 return -1;
2424         }
2425
2426         *pid = res;
2427
2428         return 0;
2429 }
2430
2431
2432 /*
2433   async freeze send control
2434  */
2435 struct ctdb_client_control_state *
2436 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2437 {
2438         return ctdb_control_send(ctdb, destnode, priority, 
2439                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2440                            mem_ctx, &timeout, NULL);
2441 }
2442
2443 /* 
2444    async freeze recv control
2445 */
2446 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2447 {
2448         int ret;
2449         int32_t res;
2450
2451         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2452         if ( (ret != 0) || (res != 0) ){
2453                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2454                 return -1;
2455         }
2456
2457         return 0;
2458 }
2459
2460 /*
2461   freeze databases of a certain priority
2462  */
2463 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2464 {
2465         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2466         struct ctdb_client_control_state *state;
2467         int ret;
2468
2469         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2470         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2471         talloc_free(tmp_ctx);
2472
2473         return ret;
2474 }
2475
2476 /* Freeze all databases */
2477 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2478 {
2479         int i;
2480
2481         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2482                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2483                         return -1;
2484                 }
2485         }
2486         return 0;
2487 }
2488
2489 /*
2490   thaw databases of a certain priority
2491  */
2492 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2493 {
2494         int ret;
2495         int32_t res;
2496
2497         ret = ctdb_control(ctdb, destnode, priority, 
2498                            CTDB_CONTROL_THAW, 0, tdb_null, 
2499                            NULL, NULL, &res, &timeout, NULL);
2500         if (ret != 0 || res != 0) {
2501                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2502                 return -1;
2503         }
2504
2505         return 0;
2506 }
2507
2508 /* thaw all databases */
2509 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2510 {
2511         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2512 }
2513
2514 /*
2515   get pnn of a node, or -1
2516  */
2517 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2518 {
2519         int ret;
2520         int32_t res;
2521
2522         ret = ctdb_control(ctdb, destnode, 0, 
2523                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2524                            NULL, NULL, &res, &timeout, NULL);
2525         if (ret != 0) {
2526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2527                 return -1;
2528         }
2529
2530         return res;
2531 }
2532
2533 /*
2534   get the monitoring mode of a remote node
2535  */
2536 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2537 {
2538         int ret;
2539         int32_t res;
2540
2541         ret = ctdb_control(ctdb, destnode, 0, 
2542                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2543                            NULL, NULL, &res, &timeout, NULL);
2544         if (ret != 0) {
2545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2546                 return -1;
2547         }
2548
2549         *monmode = res;
2550
2551         return 0;
2552 }
2553
2554
2555 /*
2556  set the monitoring mode of a remote node to active
2557  */
2558 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2559 {
2560         int ret;
2561         
2562
2563         ret = ctdb_control(ctdb, destnode, 0, 
2564                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2565                            NULL, NULL,NULL, &timeout, NULL);
2566         if (ret != 0) {
2567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2568                 return -1;
2569         }
2570
2571         
2572
2573         return 0;
2574 }
2575
2576 /*
2577   set the monitoring mode of a remote node to disable
2578  */
2579 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2580 {
2581         int ret;
2582         
2583
2584         ret = ctdb_control(ctdb, destnode, 0, 
2585                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2586                            NULL, NULL, NULL, &timeout, NULL);
2587         if (ret != 0) {
2588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2589                 return -1;
2590         }
2591
2592         
2593
2594         return 0;
2595 }
2596
2597
2598
2599 /*
2600   sent to a node to make it take over an ip address
2601 */
2602 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2603                           uint32_t destnode, struct ctdb_public_ip *ip)
2604 {
2605         TDB_DATA data;
2606         int ret;
2607         int32_t res;
2608
2609         data.dsize = sizeof(*ip);
2610         data.dptr  = (uint8_t *)ip;
2611
2612         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2613                            data, NULL, NULL, &res, &timeout, NULL);
2614         if (ret != 0 || res != 0) {
2615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2616                 return -1;
2617         }
2618
2619         return 0;
2620 }
2621
2622
2623 /*
2624   sent to a node to make it release an ip address
2625 */
2626 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2627                          uint32_t destnode, struct ctdb_public_ip *ip)
2628 {
2629         TDB_DATA data;
2630         int ret;
2631         int32_t res;
2632
2633         data.dsize = sizeof(*ip);
2634         data.dptr  = (uint8_t *)ip;
2635
2636         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2637                            data, NULL, NULL, &res, &timeout, NULL);
2638         if (ret != 0 || res != 0) {
2639                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2640                 return -1;
2641         }
2642
2643         return 0;
2644 }
2645
2646
2647 /*
2648   get a tunable
2649  */
2650 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2651                           struct timeval timeout, 
2652                           uint32_t destnode,
2653                           const char *name, uint32_t *value)
2654 {
2655         struct ctdb_control_get_tunable *t;
2656         TDB_DATA data, outdata;
2657         int32_t res;
2658         int ret;
2659
2660         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2661         data.dptr  = talloc_size(ctdb, data.dsize);
2662         CTDB_NO_MEMORY(ctdb, data.dptr);
2663
2664         t = (struct ctdb_control_get_tunable *)data.dptr;
2665         t->length = strlen(name)+1;
2666         memcpy(t->name, name, t->length);
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2669                            &outdata, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2673                 return ret != 0 ? ret : res;
2674         }
2675
2676         if (outdata.dsize != sizeof(uint32_t)) {
2677                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2678                 talloc_free(outdata.dptr);
2679                 return -1;
2680         }
2681         
2682         *value = *(uint32_t *)outdata.dptr;
2683         talloc_free(outdata.dptr);
2684
2685         return 0;
2686 }
2687
2688 /*
2689   set a tunable
2690  */
2691 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2692                           struct timeval timeout, 
2693                           uint32_t destnode,
2694                           const char *name, uint32_t value)
2695 {
2696         struct ctdb_control_set_tunable *t;
2697         TDB_DATA data;
2698         int32_t res;
2699         int ret;
2700
2701         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2702         data.dptr  = talloc_size(ctdb, data.dsize);
2703         CTDB_NO_MEMORY(ctdb, data.dptr);
2704
2705         t = (struct ctdb_control_set_tunable *)data.dptr;
2706         t->length = strlen(name)+1;
2707         memcpy(t->name, name, t->length);
2708         t->value = value;
2709
2710         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2711                            NULL, &res, &timeout, NULL);
2712         talloc_free(data.dptr);
2713         if ((ret != 0) || (res == -1)) {
2714                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2715                 return -1;
2716         }
2717
2718         return res;
2719 }
2720
2721 /*
2722   list tunables
2723  */
2724 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2725                             struct timeval timeout, 
2726                             uint32_t destnode,
2727                             TALLOC_CTX *mem_ctx,
2728                             const char ***list, uint32_t *count)
2729 {
2730         TDB_DATA outdata;
2731         int32_t res;
2732         int ret;
2733         struct ctdb_control_list_tunable *t;
2734         char *p, *s, *ptr;
2735
2736         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2737                            mem_ctx, &outdata, &res, &timeout, NULL);
2738         if (ret != 0 || res != 0) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2740                 return -1;
2741         }
2742
2743         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2744         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2745             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2746                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2747                 talloc_free(outdata.dptr);
2748                 return -1;              
2749         }
2750         
2751         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2752         CTDB_NO_MEMORY(ctdb, p);
2753
2754         talloc_free(outdata.dptr);
2755         
2756         (*list) = NULL;
2757         (*count) = 0;
2758
2759         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2760                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2761                 CTDB_NO_MEMORY(ctdb, *list);
2762                 (*list)[*count] = talloc_strdup(*list, s);
2763                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2764                 (*count)++;
2765         }
2766
2767         talloc_free(p);
2768
2769         return 0;
2770 }
2771
2772
2773 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2774                                    struct timeval timeout, uint32_t destnode,
2775                                    TALLOC_CTX *mem_ctx,
2776                                    uint32_t flags,
2777                                    struct ctdb_all_public_ips **ips)
2778 {
2779         int ret;
2780         TDB_DATA outdata;
2781         int32_t res;
2782
2783         ret = ctdb_control(ctdb, destnode, 0,
2784                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2785                            mem_ctx, &outdata, &res, &timeout, NULL);
2786         if (ret != 0 || res != 0) {
2787                 DEBUG(DEBUG_ERR,(__location__
2788                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2789                                  ret, res));
2790                 return -1;
2791         }
2792
2793         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2794         talloc_free(outdata.dptr);
2795
2796         return 0;
2797 }
2798
2799 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2800                              struct timeval timeout, uint32_t destnode,
2801                              TALLOC_CTX *mem_ctx,
2802                              struct ctdb_all_public_ips **ips)
2803 {
2804         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2805                                               destnode, mem_ctx,
2806                                               0, ips);
2807 }
2808
2809 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2810                                  struct timeval timeout, uint32_t destnode,
2811                                  TALLOC_CTX *mem_ctx,
2812                                  const ctdb_sock_addr *addr,
2813                                  struct ctdb_control_public_ip_info **_info)
2814 {
2815         int ret;
2816         TDB_DATA indata;
2817         TDB_DATA outdata;
2818         int32_t res;
2819         struct ctdb_control_public_ip_info *info;
2820         uint32_t len;
2821         uint32_t i;
2822
2823         indata.dptr = discard_const_p(uint8_t, addr);
2824         indata.dsize = sizeof(*addr);
2825
2826         ret = ctdb_control(ctdb, destnode, 0,
2827                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2828                            mem_ctx, &outdata, &res, &timeout, NULL);
2829         if (ret != 0 || res != 0) {
2830                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2831                                 "failed ret:%d res:%d\n",
2832                                 ret, res));
2833                 return -1;
2834         }
2835
2836         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2837         if (len > outdata.dsize) {
2838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2839                                 "returned invalid data with size %u > %u\n",
2840                                 (unsigned int)outdata.dsize,
2841                                 (unsigned int)len));
2842                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2843                 return -1;
2844         }
2845
2846         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2847         len += info->num*sizeof(struct ctdb_control_iface_info);
2848
2849         if (len > outdata.dsize) {
2850                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2851                                 "returned invalid data with size %u > %u\n",
2852                                 (unsigned int)outdata.dsize,
2853                                 (unsigned int)len));
2854                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2855                 return -1;
2856         }
2857
2858         /* make sure we null terminate the returned strings */
2859         for (i=0; i < info->num; i++) {
2860                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2861         }
2862
2863         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2864                                                                 outdata.dptr,
2865                                                                 outdata.dsize);
2866         talloc_free(outdata.dptr);
2867         if (*_info == NULL) {
2868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2869                                 "talloc_memdup size %u failed\n",
2870                                 (unsigned int)outdata.dsize));
2871                 return -1;
2872         }
2873
2874         return 0;
2875 }
2876
2877 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2878                          struct timeval timeout, uint32_t destnode,
2879                          TALLOC_CTX *mem_ctx,
2880                          struct ctdb_control_get_ifaces **_ifaces)
2881 {
2882         int ret;
2883         TDB_DATA outdata;
2884         int32_t res;
2885         struct ctdb_control_get_ifaces *ifaces;
2886         uint32_t len;
2887         uint32_t i;
2888
2889         ret = ctdb_control(ctdb, destnode, 0,
2890                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2891                            mem_ctx, &outdata, &res, &timeout, NULL);
2892         if (ret != 0 || res != 0) {
2893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2894                                 "failed ret:%d res:%d\n",
2895                                 ret, res));
2896                 return -1;
2897         }
2898
2899         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2900         if (len > outdata.dsize) {
2901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2902                                 "returned invalid data with size %u > %u\n",
2903                                 (unsigned int)outdata.dsize,
2904                                 (unsigned int)len));
2905                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2906                 return -1;
2907         }
2908
2909         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2910         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2911
2912         if (len > outdata.dsize) {
2913                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2914                                 "returned invalid data with size %u > %u\n",
2915                                 (unsigned int)outdata.dsize,
2916                                 (unsigned int)len));
2917                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2918                 return -1;
2919         }
2920
2921         /* make sure we null terminate the returned strings */
2922         for (i=0; i < ifaces->num; i++) {
2923                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2924         }
2925
2926         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2927                                                                   outdata.dptr,
2928                                                                   outdata.dsize);
2929         talloc_free(outdata.dptr);
2930         if (*_ifaces == NULL) {
2931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2932                                 "talloc_memdup size %u failed\n",
2933                                 (unsigned int)outdata.dsize));
2934                 return -1;
2935         }
2936
2937         return 0;
2938 }
2939
2940 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2941                              struct timeval timeout, uint32_t destnode,
2942                              TALLOC_CTX *mem_ctx,
2943                              const struct ctdb_control_iface_info *info)
2944 {
2945         int ret;
2946         TDB_DATA indata;
2947         int32_t res;
2948
2949         indata.dptr = discard_const_p(uint8_t, info);
2950         indata.dsize = sizeof(*info);
2951
2952         ret = ctdb_control(ctdb, destnode, 0,
2953                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2954                            mem_ctx, NULL, &res, &timeout, NULL);
2955         if (ret != 0 || res != 0) {
2956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2957                                 "failed ret:%d res:%d\n",
2958                                 ret, res));
2959                 return -1;
2960         }
2961
2962         return 0;
2963 }
2964
2965 /*
2966   set/clear the permanent disabled bit on a remote node
2967  */
2968 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2969                        uint32_t set, uint32_t clear)
2970 {
2971         int ret;
2972         TDB_DATA data;
2973         struct ctdb_node_map *nodemap=NULL;
2974         struct ctdb_node_flag_change c;
2975         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2976         uint32_t recmaster;
2977         uint32_t *nodes;
2978
2979
2980         /* find the recovery master */
2981         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2984                 talloc_free(tmp_ctx);
2985                 return ret;
2986         }
2987
2988
2989         /* read the node flags from the recmaster */
2990         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2991         if (ret != 0) {
2992                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2993                 talloc_free(tmp_ctx);
2994                 return -1;
2995         }
2996         if (destnode >= nodemap->num) {
2997                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2998                 talloc_free(tmp_ctx);
2999                 return -1;
3000         }
3001
3002         c.pnn       = destnode;
3003         c.old_flags = nodemap->nodes[destnode].flags;
3004         c.new_flags = c.old_flags;
3005         c.new_flags |= set;
3006         c.new_flags &= ~clear;
3007
3008         data.dsize = sizeof(c);
3009         data.dptr = (unsigned char *)&c;
3010
3011         /* send the flags update to all connected nodes */
3012         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3013
3014         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3015                                         nodes, 0,
3016                                         timeout, false, data,
3017                                         NULL, NULL,
3018                                         NULL) != 0) {
3019                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3020
3021                 talloc_free(tmp_ctx);
3022                 return -1;
3023         }
3024
3025         talloc_free(tmp_ctx);
3026         return 0;
3027 }
3028
3029
3030 /*
3031   get all tunables
3032  */
3033 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3034                                struct timeval timeout, 
3035                                uint32_t destnode,
3036                                struct ctdb_tunable *tunables)
3037 {
3038         TDB_DATA outdata;
3039         int ret;
3040         int32_t res;
3041
3042         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3043                            &outdata, &res, &timeout, NULL);
3044         if (ret != 0 || res != 0) {
3045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3046                 return -1;
3047         }
3048
3049         if (outdata.dsize != sizeof(*tunables)) {
3050                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3051                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3052                 return -1;              
3053         }
3054
3055         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3056         talloc_free(outdata.dptr);
3057         return 0;
3058 }
3059
3060 /*
3061   add a public address to a node
3062  */
3063 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3064                       struct timeval timeout, 
3065                       uint32_t destnode,
3066                       struct ctdb_control_ip_iface *pub)
3067 {
3068         TDB_DATA data;
3069         int32_t res;
3070         int ret;
3071
3072         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3073         data.dptr  = (unsigned char *)pub;
3074
3075         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3076                            NULL, &res, &timeout, NULL);
3077         if (ret != 0 || res != 0) {
3078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3079                 return -1;
3080         }
3081
3082         return 0;
3083 }
3084
3085 /*
3086   delete a public address from a node
3087  */
3088 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3089                       struct timeval timeout, 
3090                       uint32_t destnode,
3091                       struct ctdb_control_ip_iface *pub)
3092 {
3093         TDB_DATA data;
3094         int32_t res;
3095         int ret;
3096
3097         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3098         data.dptr  = (unsigned char *)pub;
3099
3100         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3101                            NULL, &res, &timeout, NULL);
3102         if (ret != 0 || res != 0) {
3103                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3104                 return -1;
3105         }
3106
3107         return 0;
3108 }
3109
3110 /*
3111   kill a tcp connection
3112  */
3113 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3114                       struct timeval timeout, 
3115                       uint32_t destnode,
3116                       struct ctdb_tcp_connection *killtcp)
3117 {
3118         TDB_DATA data;
3119         int32_t res;
3120         int ret;
3121
3122         data.dsize = sizeof(struct ctdb_tcp_connection);
3123         data.dptr  = (unsigned char *)killtcp;
3124
3125         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3126                            NULL, &res, &timeout, NULL);
3127         if (ret != 0 || res != 0) {
3128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3129                 return -1;
3130         }
3131
3132         return 0;
3133 }
3134
3135 /*
3136   send a gratious arp
3137  */
3138 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3139                       struct timeval timeout, 
3140                       uint32_t destnode,
3141                       ctdb_sock_addr *addr,
3142                       const char *ifname)
3143 {
3144         TDB_DATA data;
3145         int32_t res;
3146         int ret, len;
3147         struct ctdb_control_gratious_arp *gratious_arp;
3148         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3149
3150
3151         len = strlen(ifname)+1;
3152         gratious_arp = talloc_size(tmp_ctx, 
3153                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3154         CTDB_NO_MEMORY(ctdb, gratious_arp);
3155
3156         gratious_arp->addr = *addr;
3157         gratious_arp->len = len;
3158         memcpy(&gratious_arp->iface[0], ifname, len);
3159
3160
3161         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3162         data.dptr  = (unsigned char *)gratious_arp;
3163
3164         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3165                            NULL, &res, &timeout, NULL);
3166         if (ret != 0 || res != 0) {
3167                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3168                 talloc_free(tmp_ctx);
3169                 return -1;
3170         }
3171
3172         talloc_free(tmp_ctx);
3173         return 0;
3174 }
3175
3176 /*
3177   get a list of all tcp tickles that a node knows about for a particular vnn
3178  */
3179 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3180                               struct timeval timeout, uint32_t destnode, 
3181                               TALLOC_CTX *mem_ctx, 
3182                               ctdb_sock_addr *addr,
3183                               struct ctdb_control_tcp_tickle_list **list)
3184 {
3185         int ret;
3186         TDB_DATA data, outdata;
3187         int32_t status;
3188
3189         data.dptr = (uint8_t*)addr;
3190         data.dsize = sizeof(ctdb_sock_addr);
3191
3192         ret = ctdb_control(ctdb, destnode, 0, 
3193                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3194                            mem_ctx, &outdata, &status, NULL, NULL);
3195         if (ret != 0 || status != 0) {
3196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3197                 return -1;
3198         }
3199
3200         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3201
3202         return status;
3203 }
3204
3205 /*
3206   register a server id
3207  */
3208 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3209                       struct timeval timeout, 
3210                       struct ctdb_server_id *id)
3211 {
3212         TDB_DATA data;
3213         int32_t res;
3214         int ret;
3215
3216         data.dsize = sizeof(struct ctdb_server_id);
3217         data.dptr  = (unsigned char *)id;
3218
3219         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3220                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3221                         0, data, NULL,
3222                         NULL, &res, &timeout, NULL);
3223         if (ret != 0 || res != 0) {
3224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3225                 return -1;
3226         }
3227
3228         return 0;
3229 }
3230
3231 /*
3232   unregister a server id
3233  */
3234 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3235                       struct timeval timeout, 
3236                       struct ctdb_server_id *id)
3237 {
3238         TDB_DATA data;
3239         int32_t res;
3240         int ret;
3241
3242         data.dsize = sizeof(struct ctdb_server_id);
3243         data.dptr  = (unsigned char *)id;
3244
3245         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3246                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3247                         0, data, NULL,
3248                         NULL, &res, &timeout, NULL);
3249         if (ret != 0 || res != 0) {
3250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3251                 return -1;
3252         }
3253
3254         return 0;
3255 }
3256
3257
3258 /*
3259   check if a server id exists
3260
3261   if a server id does exist, return *status == 1, otherwise *status == 0
3262  */
3263 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3264                       struct timeval timeout, 
3265                       uint32_t destnode,
3266                       struct ctdb_server_id *id,
3267                       uint32_t *status)
3268 {
3269         TDB_DATA data;
3270         int32_t res;
3271         int ret;
3272
3273         data.dsize = sizeof(struct ctdb_server_id);
3274         data.dptr  = (unsigned char *)id;
3275
3276         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3277                         0, data, NULL,
3278                         NULL, &res, &timeout, NULL);
3279         if (ret != 0) {
3280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3281                 return -1;
3282         }
3283
3284         if (res) {
3285                 *status = 1;
3286         } else {
3287                 *status = 0;
3288         }
3289
3290         return 0;
3291 }
3292
3293 /*
3294    get the list of server ids that are registered on a node
3295 */
3296 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3297                 TALLOC_CTX *mem_ctx,
3298                 struct timeval timeout, uint32_t destnode, 
3299                 struct ctdb_server_id_list **svid_list)
3300 {
3301         int ret;
3302         TDB_DATA outdata;
3303         int32_t res;
3304
3305         ret = ctdb_control(ctdb, destnode, 0, 
3306                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3307                            mem_ctx, &outdata, &res, &timeout, NULL);
3308         if (ret != 0 || res != 0) {
3309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3310                 return -1;
3311         }
3312
3313         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3314                     
3315         return 0;
3316 }
3317
3318 /*
3319   initialise the ctdb daemon for client applications
3320
3321   NOTE: In current code the daemon does not fork. This is for testing purposes only
3322   and to simplify the code.
3323 */
3324 struct ctdb_context *ctdb_init(struct tevent_context *ev)
3325 {
3326         int ret;
3327         struct ctdb_context *ctdb;
3328
3329         ctdb = talloc_zero(ev, struct ctdb_context);
3330         if (ctdb == NULL) {
3331                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3332                 return NULL;
3333         }
3334         ctdb->ev  = ev;
3335         /* Wrap early to exercise code. */
3336         ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3337         if (ret != 0) {
3338                 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3339                 talloc_free(ctdb);
3340                 return NULL;
3341         }
3342
3343         ret = srvid_init(ctdb, &ctdb->srv);
3344         if (ret != 0) {
3345                 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3346                 talloc_free(ctdb);
3347                 return NULL;
3348         }
3349
3350         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3351         if (ret != 0) {
3352                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3353                 talloc_free(ctdb);
3354                 return NULL;
3355         }
3356
3357         ctdb->statistics.statistics_start_time = timeval_current();
3358
3359         return ctdb;
3360 }
3361
3362
3363 /*
3364   set some ctdb flags
3365 */
3366 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3367 {
3368         ctdb->flags |= flags;
3369 }
3370
3371 /*
3372   setup the local socket name
3373 */
3374 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3375 {
3376         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3377         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3378
3379         return 0;
3380 }
3381
3382 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3383 {
3384         return ctdb->daemon.name;
3385 }
3386
3387 /*
3388   return the pnn of this node
3389 */
3390 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3391 {
3392         return ctdb->pnn;
3393 }
3394
3395
3396 /*
3397   get the uptime of a remote node
3398  */
3399 struct ctdb_client_control_state *
3400 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3401 {
3402         return ctdb_control_send(ctdb, destnode, 0, 
3403                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3404                            mem_ctx, &timeout, NULL);
3405 }
3406
3407 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3408 {
3409         int ret;
3410         int32_t res;
3411         TDB_DATA outdata;
3412
3413         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3414         if (ret != 0 || res != 0) {
3415                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3416                 return -1;
3417         }
3418
3419         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3420
3421         return 0;
3422 }
3423
3424 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3425 {
3426         struct ctdb_client_control_state *state;
3427
3428         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3429         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3430 }
3431
3432 /*
3433   send a control to execute the "recovered" event script on a node
3434  */
3435 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3436 {
3437         int ret;
3438         int32_t status;
3439
3440         ret = ctdb_control(ctdb, destnode, 0, 
3441                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3442                            NULL, NULL, &status, &timeout, NULL);
3443         if (ret != 0 || status != 0) {
3444                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3445                 return -1;
3446         }
3447
3448         return 0;
3449 }
3450
3451 /* 
3452   callback for the async helpers used when sending the same control
3453   to multiple nodes in parallell.
3454 */
3455 static void async_callback(struct ctdb_client_control_state *state)
3456 {
3457         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3458         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3459         int ret;
3460         TDB_DATA outdata;
3461         int32_t res = -1;
3462         uint32_t destnode = state->c->hdr.destnode;
3463
3464         outdata.dsize = 0;
3465         outdata.dptr = NULL;
3466
3467         /* one more node has responded with recmode data */
3468         data->count--;
3469
3470         /* if we failed to push the db, then return an error and let
3471            the main loop try again.
3472         */
3473         if (state->state != CTDB_CONTROL_DONE) {
3474                 if ( !data->dont_log_errors) {
3475                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3476                 }
3477                 data->fail_count++;
3478                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3479                         res = -ETIME;
3480                 } else {
3481                         res = -1;
3482                 }
3483                 if (data->fail_callback) {
3484                         data->fail_callback(ctdb, destnode, res, outdata,
3485                                         data->callback_data);
3486                 }
3487                 return;
3488         }
3489         
3490         state->async.fn = NULL;
3491
3492         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3493         if ((ret != 0) || (res != 0)) {
3494                 if ( !data->dont_log_errors) {
3495                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3496                 }
3497                 data->fail_count++;
3498                 if (data->fail_callback) {
3499                         data->fail_callback(ctdb, destnode, res, outdata,
3500                                         data->callback_data);
3501                 }
3502         }
3503         if ((ret == 0) && (data->callback != NULL)) {
3504                 data->callback(ctdb, destnode, res, outdata,
3505                                         data->callback_data);
3506         }
3507 }
3508
3509
3510 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3511 {
3512         /* set up the callback functions */
3513         state->async.fn = async_callback;
3514         state->async.private_data = data;
3515         
3516         /* one more control to wait for to complete */
3517         data->count++;
3518 }
3519
3520
3521 /* wait for up to the maximum number of seconds allowed
3522    or until all nodes we expect a response from has replied
3523 */
3524 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3525 {
3526         while (data->count > 0) {
3527                 tevent_loop_once(ctdb->ev);
3528         }
3529         if (data->fail_count != 0) {
3530                 if (!data->dont_log_errors) {
3531                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3532                                  data->fail_count));
3533                 }
3534                 return -1;
3535         }
3536         return 0;
3537 }
3538
3539
3540 /* 
3541    perform a simple control on the listed nodes
3542    The control cannot return data
3543  */
3544 int ctdb_client_async_control(struct ctdb_context *ctdb,
3545                                 enum ctdb_controls opcode,
3546                                 uint32_t *nodes,
3547                                 uint64_t srvid,
3548                                 struct timeval timeout,
3549                                 bool dont_log_errors,
3550                                 TDB_DATA data,
3551                                 client_async_callback client_callback,
3552                                 client_async_callback fail_callback,
3553                                 void *callback_data)
3554 {
3555         struct client_async_data *async_data;
3556         struct ctdb_client_control_state *state;
3557         int j, num_nodes;
3558
3559         async_data = talloc_zero(ctdb, struct client_async_data);
3560         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3561         async_data->dont_log_errors = dont_log_errors;
3562         async_data->callback = client_callback;
3563         async_data->fail_callback = fail_callback;
3564         async_data->callback_data = callback_data;
3565         async_data->opcode        = opcode;
3566
3567         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3568
3569         /* loop over all nodes and send an async control to each of them */
3570         for (j=0; j<num_nodes; j++) {
3571                 uint32_t pnn = nodes[j];
3572
3573                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3574                                           0, data, async_data, &timeout, NULL);
3575                 if (state == NULL) {
3576                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3577                         talloc_free(async_data);
3578                         return -1;
3579                 }
3580                 
3581                 ctdb_client_async_add(async_data, state);
3582         }
3583
3584         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3585                 talloc_free(async_data);
3586                 return -1;
3587         }
3588
3589         talloc_free(async_data);
3590         return 0;
3591 }
3592
3593 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3594                                 struct ctdb_vnn_map *vnn_map,
3595                                 TALLOC_CTX *mem_ctx,
3596                                 bool include_self)
3597 {
3598         int i, j, num_nodes;
3599         uint32_t *nodes;
3600
3601         for (i=num_nodes=0;i<vnn_map->size;i++) {
3602                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3603                         continue;
3604                 }
3605                 num_nodes++;
3606         } 
3607
3608         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3609         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3610
3611         for (i=j=0;i<vnn_map->size;i++) {
3612                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3613                         continue;
3614                 }
3615                 nodes[j++] = vnn_map->map[i];
3616         } 
3617
3618         return nodes;
3619 }
3620
3621 /* Get list of nodes not including those with flags specified by mask.
3622  * If exclude_pnn is not -1 then exclude that pnn from the list.
3623  */
3624 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3625                         struct ctdb_node_map *node_map,
3626                         TALLOC_CTX *mem_ctx,
3627                         uint32_t mask,
3628                         int exclude_pnn)
3629 {
3630         int i, j, num_nodes;
3631         uint32_t *nodes;
3632
3633         for (i=num_nodes=0;i<node_map->num;i++) {
3634                 if (node_map->nodes[i].flags & mask) {
3635                         continue;
3636                 }
3637                 if (node_map->nodes[i].pnn == exclude_pnn) {
3638                         continue;
3639                 }
3640                 num_nodes++;
3641         } 
3642
3643         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3644         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3645
3646         for (i=j=0;i<node_map->num;i++) {
3647                 if (node_map->nodes[i].flags & mask) {
3648                         continue;
3649                 }
3650                 if (node_map->nodes[i].pnn == exclude_pnn) {
3651                         continue;
3652                 }
3653                 nodes[j++] = node_map->nodes[i].pnn;
3654         } 
3655
3656         return nodes;
3657 }
3658
3659 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3660                                 struct ctdb_node_map *node_map,
3661                                 TALLOC_CTX *mem_ctx,
3662                                 bool include_self)
3663 {
3664         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3665                              include_self ? -1 : ctdb->pnn);
3666 }
3667
3668 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3669                                 struct ctdb_node_map *node_map,
3670                                 TALLOC_CTX *mem_ctx,
3671                                 bool include_self)
3672 {
3673         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3674                              include_self ? -1 : ctdb->pnn);
3675 }
3676
3677 /* 
3678   this is used to test if a pnn lock exists and if it exists will return
3679   the number of connections that pnn has reported or -1 if that recovery
3680   daemon is not running.
3681 */
3682 int
3683 ctdb_read_pnn_lock(int fd, int32_t pnn)
3684 {
3685         struct flock lock;
3686         char c;
3687
3688         lock.l_type = F_WRLCK;
3689         lock.l_whence = SEEK_SET;
3690         lock.l_start = pnn;
3691         lock.l_len = 1;
3692         lock.l_pid = 0;
3693
3694         if (fcntl(fd, F_GETLK, &lock) != 0) {
3695                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3696                 return -1;
3697         }
3698
3699         if (lock.l_type == F_UNLCK) {
3700                 return -1;
3701         }
3702
3703         if (pread(fd, &c, 1, pnn) == -1) {
3704                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3705                 return -1;
3706         }
3707
3708         return c;
3709 }
3710
3711 /*
3712   get capabilities of a remote node
3713  */
3714 struct ctdb_client_control_state *
3715 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3716 {
3717         return ctdb_control_send(ctdb, destnode, 0, 
3718                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3719                            mem_ctx, &timeout, NULL);
3720 }
3721
3722 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3723 {
3724         int ret;
3725         int32_t res;
3726         TDB_DATA outdata;
3727
3728         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3729         if ( (ret != 0) || (res != 0) ) {
3730                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3731                 return -1;
3732         }
3733
3734         if (capabilities) {
3735                 *capabilities = *((uint32_t *)outdata.dptr);
3736         }
3737
3738         return 0;
3739 }
3740
3741 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3742 {
3743         struct ctdb_client_control_state *state;
3744         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3745         int ret;
3746
3747         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3748         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3749         talloc_free(tmp_ctx);
3750         return ret;
3751 }
3752
3753 static void get_capabilities_callback(struct ctdb_context *ctdb,
3754                                       uint32_t node_pnn, int32_t res,
3755                                       TDB_DATA outdata, void *callback_data)
3756 {
3757         struct ctdb_node_capabilities *caps =
3758                 talloc_get_type(callback_data,
3759                                 struct ctdb_node_capabilities);
3760
3761         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3762                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
3763                 return;
3764         }
3765
3766         if (node_pnn >= talloc_array_length(caps)) {
3767                 DEBUG(DEBUG_ERR,
3768                       (__location__ " unexpected PNN %u\n", node_pnn));
3769                 return;
3770         }
3771
3772         caps[node_pnn].retrieved = true;
3773         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3774 }
3775
3776 struct ctdb_node_capabilities *
3777 ctdb_get_capabilities(struct ctdb_context *ctdb,
3778                       TALLOC_CTX *mem_ctx,
3779                       struct timeval timeout,
3780                       struct ctdb_node_map *nodemap)
3781 {
3782         uint32_t *nodes;
3783         uint32_t i, res;
3784         struct ctdb_node_capabilities *ret;
3785
3786         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
3787
3788         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3789                            nodemap->num);
3790         CTDB_NO_MEMORY_NULL(ctdb, ret);
3791         /* Prepopulate the expected PNNs */
3792         for (i = 0; i < talloc_array_length(ret); i++) {
3793                 ret[i].retrieved = false;
3794         }
3795
3796         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3797                                         nodes, 0, timeout,
3798                                         false, tdb_null,
3799                                         get_capabilities_callback, NULL,
3800                                         ret);
3801         if (res != 0) {
3802                 DEBUG(DEBUG_ERR,
3803                       (__location__ " Failed to read node capabilities.\n"));
3804                 TALLOC_FREE(ret);
3805         }
3806
3807         return ret;
3808 }
3809
3810 uint32_t *
3811 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3812                            uint32_t pnn)
3813 {
3814         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3815                 return &caps[pnn].capabilities;
3816         }
3817
3818         return NULL;
3819 }
3820
3821 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3822                                 uint32_t pnn,
3823                                 uint32_t capabilities_required)
3824 {
3825         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3826         return (capp != NULL) &&
3827                 ((*capp & capabilities_required) == capabilities_required);
3828 }
3829
3830
3831 struct server_id {
3832         uint64_t pid;
3833         uint32_t task_id;
3834         uint32_t vnn;
3835         uint64_t unique_id;
3836 };
3837
3838 static struct server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3839 {
3840         struct server_id id;
3841
3842         id.pid = getpid();
3843         id.task_id = reqid;
3844         id.vnn = ctdb_get_pnn(ctdb);
3845         id.unique_id = id.vnn;
3846         id.unique_id = (id.unique_id << 32) | reqid;
3847
3848         return id;
3849 }
3850
3851 /* This is basically a copy from Samba's server_id.*.  However, a
3852  * dependency chain stops us from using Samba's version, so use a
3853  * renamed copy until a better solution is found. */
3854 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3855 {
3856         if (id1->pid != id2->pid) {
3857                 return false;
3858         }
3859
3860         if (id1->task_id != id2->task_id) {
3861                 return false;
3862         }
3863
3864         if (id1->vnn != id2->vnn) {
3865                 return false;
3866         }
3867
3868         if (id1->unique_id != id2->unique_id) {
3869                 return false;
3870         }
3871
3872         return true;
3873 }
3874
3875 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3876 {
3877         struct ctdb_server_id sid;
3878         int ret;
3879         uint32_t result = 0;
3880
3881         sid.type = SERVER_TYPE_SAMBA;
3882         sid.pnn = id->vnn;
3883         sid.server_id = id->pid;
3884
3885         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3886                                         id->vnn, &sid, &result);
3887         if (ret != 0) {
3888                 /* If control times out, assume server_id exists. */
3889                 return true;
3890         }
3891
3892         if (result) {
3893                 return true;
3894         }
3895
3896         return false;
3897 }
3898
3899
3900 enum g_lock_type {
3901         G_LOCK_READ = 0,
3902         G_LOCK_WRITE = 1,
3903 };
3904
3905 struct g_lock_rec {
3906         enum g_lock_type type;
3907         struct server_id id;
3908 };
3909
3910 struct g_lock_recs {
3911         unsigned int num;
3912         struct g_lock_rec *lock;
3913 };
3914
3915 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3916                          struct g_lock_recs **locks)
3917 {
3918         struct g_lock_recs *recs;
3919
3920         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3921         if (recs == NULL) {
3922                 return false;
3923         }
3924
3925         if (data.dsize == 0) {
3926                 goto done;
3927         }
3928
3929         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3930                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3931                                   (unsigned long)data.dsize));
3932                 talloc_free(recs);
3933                 return false;
3934         }
3935
3936         recs->num = data.dsize / sizeof(struct g_lock_rec);
3937         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3938         if (recs->lock == NULL) {
3939                 talloc_free(recs);
3940                 return false;
3941         }
3942
3943 done:
3944         if (locks != NULL) {
3945                 *locks = recs;
3946         }
3947
3948         return true;
3949 }
3950
3951
3952 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3953                         struct ctdb_db_context *ctdb_db,
3954                         const char *keyname, uint32_t reqid)
3955 {
3956         TDB_DATA key, data;
3957         struct ctdb_record_handle *h;
3958         struct g_lock_recs *locks;
3959         struct server_id id;
3960         struct timeval t_start;
3961         int i;
3962
3963         key.dptr = (uint8_t *)discard_const(keyname);
3964         key.dsize = strlen(keyname) + 1;
3965
3966         t_start = timeval_current();
3967
3968 again:
3969         /* Keep trying for an hour. */
3970         if (timeval_elapsed(&t_start) > 3600) {
3971                 return false;
3972         }
3973
3974         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3975         if (h == NULL) {
3976                 return false;
3977         }
3978
3979         if (!g_lock_parse(h, data, &locks)) {
3980                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3981                 talloc_free(data.dptr);
3982                 talloc_free(h);
3983                 return false;
3984         }
3985
3986         talloc_free(data.dptr);
3987
3988         id = server_id_fetch(ctdb_db->ctdb, reqid);
3989
3990         i = 0;
3991         while (i < locks->num) {
3992                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3993                         /* Internal error */
3994                         talloc_free(h);
3995                         return false;
3996                 }
3997
3998                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3999                         if (i < locks->num-1) {
4000                                 locks->lock[i] = locks->lock[locks->num-1];
4001                         }
4002                         locks->num--;
4003                         continue;
4004                 }
4005
4006                 /* This entry is locked. */
4007                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
4008                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4009                                    (unsigned long long)id.pid,
4010                                    id.task_id, id.vnn,
4011                                    (unsigned long long)id.unique_id));
4012                 talloc_free(h);
4013                 goto again;
4014         }
4015
4016         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
4017                                      locks->num+1);
4018         if (locks->lock == NULL) {
4019                 talloc_free(h);
4020                 return false;
4021         }
4022
4023         locks->lock[locks->num].type = G_LOCK_WRITE;
4024         locks->lock[locks->num].id = id;
4025         locks->num++;
4026
4027         data.dptr = (uint8_t *)locks->lock;
4028         data.dsize = locks->num * sizeof(struct g_lock_rec);
4029
4030         if (ctdb_record_store(h, data) != 0) {
4031                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
4032                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4033                                   (unsigned long long)id.pid,
4034                                   id.task_id, id.vnn,
4035                                   (unsigned long long)id.unique_id));
4036                 talloc_free(h);
4037                 return false;
4038         }
4039
4040         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
4041                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4042                            (unsigned long long)id.pid,
4043                            id.task_id, id.vnn,
4044                            (unsigned long long)id.unique_id));
4045
4046         talloc_free(h);
4047         return true;
4048 }
4049
4050 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
4051                           struct ctdb_db_context *ctdb_db,
4052                           const char *keyname, uint32_t reqid)
4053 {
4054         TDB_DATA key, data;
4055         struct ctdb_record_handle *h;
4056         struct g_lock_recs *locks;
4057         struct server_id id;
4058         int i;
4059         bool found = false;
4060
4061         key.dptr = (uint8_t *)discard_const(keyname);
4062         key.dsize = strlen(keyname) + 1;
4063         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4064         if (h == NULL) {
4065                 return false;
4066         }
4067
4068         if (!g_lock_parse(h, data, &locks)) {
4069                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4070                 talloc_free(data.dptr);
4071                 talloc_free(h);
4072                 return false;
4073         }
4074
4075         talloc_free(data.dptr);
4076
4077         id = server_id_fetch(ctdb_db->ctdb, reqid);
4078
4079         for (i=0; i<locks->num; i++) {
4080                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4081                         if (i < locks->num-1) {
4082                                 locks->lock[i] = locks->lock[locks->num-1];
4083                         }
4084                         locks->num--;
4085                         found = true;
4086                         break;
4087                 }
4088         }
4089
4090         if (!found) {
4091                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4092                 talloc_free(h);
4093                 return false;
4094         }
4095
4096         data.dptr = (uint8_t *)locks->lock;
4097         data.dsize = locks->num * sizeof(struct g_lock_rec);
4098
4099         if (ctdb_record_store(h, data) != 0) {
4100                 talloc_free(h);
4101                 return false;
4102         }
4103
4104         talloc_free(h);
4105         return true;
4106 }
4107
4108
4109 struct ctdb_transaction_handle {
4110         struct ctdb_db_context *ctdb_db;
4111         struct ctdb_db_context *g_lock_db;
4112         char *lock_name;
4113         uint32_t reqid;
4114         /*
4115          * we store reads and writes done under a transaction:
4116          * - one list stores both reads and writes (m_all)
4117          * - the other just writes (m_write)
4118          */
4119         struct ctdb_marshall_buffer *m_all;
4120         struct ctdb_marshall_buffer *m_write;
4121 };
4122
4123 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4124 {
4125         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4126         reqid_remove(h->ctdb_db->ctdb->idr, h->reqid);
4127         return 0;
4128 }
4129
4130
4131 /**
4132  * start a transaction on a database
4133  */
4134 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4135                                                        TALLOC_CTX *mem_ctx)
4136 {
4137         struct ctdb_transaction_handle *h;
4138         struct ctdb_server_id id;
4139
4140         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4141         if (h == NULL) {
4142                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4143                 return NULL;
4144         }
4145
4146         h->ctdb_db = ctdb_db;
4147         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4148                                        (unsigned int)ctdb_db->db_id);
4149         if (h->lock_name == NULL) {
4150                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4151                 talloc_free(h);
4152                 return NULL;
4153         }
4154
4155         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4156                                    "g_lock.tdb", false, 0);
4157         if (!h->g_lock_db) {
4158                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4159                 talloc_free(h);
4160                 return NULL;
4161         }
4162
4163         id.type = SERVER_TYPE_SAMBA;
4164         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4165         id.server_id = getpid();
4166
4167         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4168                                          &id) != 0) {
4169                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4170                 talloc_free(h);
4171                 return NULL;
4172         }
4173
4174         h->reqid = reqid_new(h->ctdb_db->ctdb->idr, h);
4175
4176         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4177                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4178                 talloc_free(h);
4179                 return NULL;
4180         }
4181
4182         talloc_set_destructor(h, ctdb_transaction_destructor);
4183         return h;
4184 }
4185
4186 /**
4187  * fetch a record inside a transaction
4188  */
4189 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4190                            TALLOC_CTX *mem_ctx,
4191                            TDB_DATA key, TDB_DATA *data)
4192 {
4193         struct ctdb_ltdb_header header;
4194         int ret;
4195
4196         ZERO_STRUCT(header);
4197
4198         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4199         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4200                 /* record doesn't exist yet */
4201                 *data = tdb_null;
4202                 ret = 0;
4203         }
4204
4205         if (ret != 0) {
4206                 return ret;
4207         }
4208
4209         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4210         if (h->m_all == NULL) {
4211                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4212                 return -1;
4213         }
4214
4215         return 0;
4216 }
4217
4218 /**
4219  * stores a record inside a transaction
4220  */
4221 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4222                            TDB_DATA key, TDB_DATA data)
4223 {
4224         TALLOC_CTX *tmp_ctx = talloc_new(h);
4225         struct ctdb_ltdb_header header;
4226         TDB_DATA olddata;
4227         int ret;
4228
4229         /* we need the header so we can update the RSN */
4230         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4231         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4232                 /* the record doesn't exist - create one with us as dmaster.
4233                    This is only safe because we are in a transaction and this
4234                    is a persistent database */
4235                 ZERO_STRUCT(header);
4236         } else if (ret != 0) {
4237                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4238                 talloc_free(tmp_ctx);
4239                 return ret;
4240         }
4241
4242         if (data.dsize == olddata.dsize &&
4243             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4244             header.rsn != 0) {
4245                 /* save writing the same data */
4246                 talloc_free(tmp_ctx);
4247                 return 0;
4248         }
4249
4250         header.dmaster = h->ctdb_db->ctdb->pnn;
4251         header.rsn++;
4252
4253         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4254         if (h->m_all == NULL) {
4255                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4256                 talloc_free(tmp_ctx);
4257                 return -1;
4258         }
4259
4260         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4261         if (h->m_write == NULL) {
4262                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4263                 talloc_free(tmp_ctx);
4264                 return -1;
4265         }
4266
4267         talloc_free(tmp_ctx);
4268         return 0;
4269 }
4270
4271 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4272 {
4273         const char *keyname = CTDB_DB_SEQNUM_KEY;
4274         TDB_DATA key, data;
4275         struct ctdb_ltdb_header header;
4276         int ret;
4277
4278         key.dptr = (uint8_t *)discard_const(keyname);
4279         key.dsize = strlen(keyname) + 1;
4280
4281         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4282         if (ret != 0) {
4283                 *seqnum = 0;
4284                 return 0;
4285         }
4286
4287         if (data.dsize == 0) {
4288                 *seqnum = 0;
4289                 return 0;
4290         }
4291
4292         if (data.dsize != sizeof(*seqnum)) {
4293                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4294                                   data.dsize));
4295                 talloc_free(data.dptr);
4296                 return -1;
4297         }
4298
4299         *seqnum = *(uint64_t *)data.dptr;
4300         talloc_free(data.dptr);
4301
4302         return 0;
4303 }
4304
4305
4306 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4307                                 uint64_t seqnum)
4308 {
4309         const char *keyname = CTDB_DB_SEQNUM_KEY;
4310         TDB_DATA key, data;
4311
4312         key.dptr = (uint8_t *)discard_const(keyname);
4313         key.dsize = strlen(keyname) + 1;
4314
4315         data.dptr = (uint8_t *)&seqnum;
4316         data.dsize = sizeof(seqnum);
4317
4318         return ctdb_transaction_store(h, key, data);
4319 }
4320
4321
4322 /**
4323  * commit a transaction
4324  */
4325 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4326 {
4327         int ret;
4328         uint64_t old_seqnum, new_seqnum;
4329         int32_t status;
4330         struct timeval timeout;
4331
4332         if (h->m_write == NULL) {
4333                 /* no changes were made */
4334                 talloc_free(h);
4335                 return 0;
4336         }
4337
4338         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4339         if (ret != 0) {
4340                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4341                 ret = -1;
4342                 goto done;
4343         }
4344
4345         new_seqnum = old_seqnum + 1;
4346         ret = ctdb_store_db_seqnum(h, new_seqnum);
4347         if (ret != 0) {
4348                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4349                 ret = -1;
4350                 goto done;
4351         }
4352
4353 again:
4354         timeout = timeval_current_ofs(3,0);
4355         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4356                            h->ctdb_db->db_id,
4357                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4358                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4359                            &status, &timeout, NULL);
4360         if (ret != 0 || status != 0) {
4361                 /*
4362                  * TRANS3_COMMIT control will only fail if recovery has been
4363                  * triggered.  Check if the database has been updated or not.
4364                  */
4365                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4366                 if (ret != 0) {
4367                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4368                         goto done;
4369                 }
4370
4371                 if (new_seqnum == old_seqnum) {
4372                         /* Database not yet updated, try again */
4373                         goto again;
4374                 }
4375
4376                 if (new_seqnum != (old_seqnum + 1)) {
4377                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4378                                           (long long unsigned)new_seqnum,
4379                                           (long long unsigned)old_seqnum));
4380                         ret = -1;
4381                         goto done;
4382                 }
4383         }
4384
4385         ret = 0;
4386
4387 done:
4388         talloc_free(h);
4389         return ret;
4390 }
4391
4392 /**
4393  * cancel a transaction
4394  */
4395 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4396 {
4397         talloc_free(h);
4398         return 0;
4399 }
4400
4401
4402 /*
4403   recovery daemon ping to main daemon
4404  */
4405 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4406 {
4407         int ret;
4408         int32_t res;
4409
4410         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4411                            ctdb, NULL, &res, NULL, NULL);
4412         if (ret != 0 || res != 0) {
4413                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4414                 return -1;
4415         }
4416
4417         return 0;
4418 }
4419
4420 /* When forking the main daemon and the child process needs to connect
4421  * back to the daemon as a client process, this function can be used
4422  * to change the ctdb context from daemon into client mode.  The child
4423  * process must be created using ctdb_fork() and not fork() -
4424  * ctdb_fork() does some necessary housekeeping.
4425  */
4426 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4427 {
4428         int ret;
4429         va_list ap;
4430
4431         /* Add extra information so we can identify this in the logs */
4432         va_start(ap, fmt);
4433         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4434         va_end(ap);
4435
4436         /* get a new event context */
4437         ctdb->ev = tevent_context_init(ctdb);
4438         tevent_loop_allow_nesting(ctdb->ev);
4439
4440         /* Connect to main CTDB daemon */
4441         ret = ctdb_socket_connect(ctdb);
4442         if (ret != 0) {
4443                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4444                 return -1;
4445         }
4446
4447         ctdb->can_send_controls = true;
4448
4449         return 0;
4450 }
4451
4452 /*
4453   get the status of running the monitor eventscripts: NULL means never run.
4454  */
4455 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4456                 struct timeval timeout, uint32_t destnode, 
4457                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4458                 struct ctdb_scripts_wire **scripts)
4459 {
4460         int ret;
4461         TDB_DATA outdata, indata;
4462         int32_t res;
4463         uint32_t uinttype = type;
4464
4465         indata.dptr = (uint8_t *)&uinttype;
4466         indata.dsize = sizeof(uinttype);
4467
4468         ret = ctdb_control(ctdb, destnode, 0, 
4469                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4470                            mem_ctx, &outdata, &res, &timeout, NULL);
4471         if (ret != 0 || res != 0) {
4472                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4473                 return -1;
4474         }
4475
4476         if (outdata.dsize == 0) {
4477                 *scripts = NULL;
4478         } else {
4479                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4480                 talloc_free(outdata.dptr);
4481         }
4482                     
4483         return 0;
4484 }
4485
4486 /*
4487   tell the main daemon how long it took to lock the reclock file
4488  */
4489 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4490 {
4491         int ret;
4492         int32_t res;
4493         TDB_DATA data;
4494
4495         data.dptr = (uint8_t *)&latency;
4496         data.dsize = sizeof(latency);
4497
4498         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4499                            ctdb, NULL, &res, NULL, NULL);
4500         if (ret != 0 || res != 0) {
4501                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4502                 return -1;
4503         }
4504
4505         return 0;
4506 }
4507
4508 /*
4509   get the name of the reclock file
4510  */
4511 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4512                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4513                          const char **name)
4514 {
4515         int ret;
4516         int32_t res;
4517         TDB_DATA data;
4518
4519         ret = ctdb_control(ctdb, destnode, 0, 
4520                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4521                            mem_ctx, &data, &res, &timeout, NULL);
4522         if (ret != 0 || res != 0) {
4523                 return -1;
4524         }
4525
4526         if (data.dsize == 0) {
4527                 *name = NULL;
4528         } else {
4529                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4530         }
4531         talloc_free(data.dptr);
4532
4533         return 0;
4534 }
4535
4536 /*
4537   set the reclock filename for a node
4538  */
4539 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4540 {
4541         int ret;
4542         TDB_DATA data;
4543         int32_t res;
4544
4545         if (reclock == NULL) {
4546                 data.dsize = 0;
4547                 data.dptr  = NULL;
4548         } else {
4549                 data.dsize = strlen(reclock) + 1;
4550                 data.dptr  = discard_const(reclock);
4551         }
4552
4553         ret = ctdb_control(ctdb, destnode, 0, 
4554                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4555                            NULL, NULL, &res, &timeout, NULL);
4556         if (ret != 0 || res != 0) {
4557                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4558                 return -1;
4559         }
4560
4561         return 0;
4562 }
4563
4564 /*
4565   stop a node
4566  */
4567 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4568 {
4569         int ret;
4570         int32_t res;
4571
4572         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4573                            ctdb, NULL, &res, &timeout, NULL);
4574         if (ret != 0 || res != 0) {
4575                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4576                 return -1;
4577         }
4578
4579         return 0;
4580 }
4581
4582 /*
4583   continue a node
4584  */
4585 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4586 {
4587         int ret;
4588
4589         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4590                            ctdb, NULL, NULL, &timeout, NULL);
4591         if (ret != 0) {
4592                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4593                 return -1;
4594         }
4595
4596         return 0;
4597 }
4598
4599 /*
4600   set the natgw state for a node
4601  */
4602 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4603 {
4604         int ret;
4605         TDB_DATA data;
4606         int32_t res;
4607
4608         data.dsize = sizeof(natgwstate);
4609         data.dptr  = (uint8_t *)&natgwstate;
4610
4611         ret = ctdb_control(ctdb, destnode, 0, 
4612                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4613                            NULL, NULL, &res, &timeout, NULL);
4614         if (ret != 0 || res != 0) {
4615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4616                 return -1;
4617         }
4618
4619         return 0;
4620 }
4621
4622 /*
4623   set the lmaster role for a node
4624  */
4625 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4626 {
4627         int ret;
4628         TDB_DATA data;
4629         int32_t res;
4630
4631         data.dsize = sizeof(lmasterrole);
4632         data.dptr  = (uint8_t *)&lmasterrole;
4633
4634         ret = ctdb_control(ctdb, destnode, 0, 
4635                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4636                            NULL, NULL, &res, &timeout, NULL);
4637         if (ret != 0 || res != 0) {
4638                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4639                 return -1;
4640         }
4641
4642         return 0;
4643 }
4644
4645 /*
4646   set the recmaster role for a node
4647  */
4648 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4649 {
4650         int ret;
4651         TDB_DATA data;
4652         int32_t res;
4653
4654         data.dsize = sizeof(recmasterrole);
4655         data.dptr  = (uint8_t *)&recmasterrole;
4656
4657         ret = ctdb_control(ctdb, destnode, 0, 
4658                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4659                            NULL, NULL, &res, &timeout, NULL);
4660         if (ret != 0 || res != 0) {
4661                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4662                 return -1;
4663         }
4664
4665         return 0;
4666 }
4667
4668 /* enable an eventscript
4669  */
4670 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4671 {
4672         int ret;
4673         TDB_DATA data;
4674         int32_t res;
4675
4676         data.dsize = strlen(script) + 1;
4677         data.dptr  = discard_const(script);
4678
4679         ret = ctdb_control(ctdb, destnode, 0, 
4680                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4681                            NULL, NULL, &res, &timeout, NULL);
4682         if (ret != 0 || res != 0) {
4683                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4684                 return -1;
4685         }
4686
4687         return 0;
4688 }
4689
4690 /* disable an eventscript
4691  */
4692 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4693 {
4694         int ret;
4695         TDB_DATA data;
4696         int32_t res;
4697
4698         data.dsize = strlen(script) + 1;
4699         data.dptr  = discard_const(script);
4700
4701         ret = ctdb_control(ctdb, destnode, 0, 
4702                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4703                            NULL, NULL, &res, &timeout, NULL);
4704         if (ret != 0 || res != 0) {
4705                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4706                 return -1;
4707         }
4708
4709         return 0;
4710 }
4711
4712
4713 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4714 {
4715         int ret;
4716         TDB_DATA data;
4717         int32_t res;
4718
4719         data.dsize = sizeof(*bantime);
4720         data.dptr  = (uint8_t *)bantime;
4721
4722         ret = ctdb_control(ctdb, destnode, 0, 
4723                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4724                            NULL, NULL, &res, &timeout, NULL);
4725         if (ret != 0 || res != 0) {
4726                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4727                 return -1;
4728         }
4729
4730         return 0;
4731 }
4732
4733
4734 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4735 {
4736         int ret;
4737         TDB_DATA outdata;
4738         int32_t res;
4739         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4740
4741         ret = ctdb_control(ctdb, destnode, 0, 
4742                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4743                            tmp_ctx, &outdata, &res, &timeout, NULL);
4744         if (ret != 0 || res != 0) {
4745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4746                 talloc_free(tmp_ctx);
4747                 return -1;
4748         }
4749
4750         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4751         talloc_free(tmp_ctx);
4752
4753         return 0;
4754 }
4755
4756
4757 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4758 {
4759         int ret;
4760         int32_t res;
4761         TDB_DATA data;
4762         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4763
4764         data.dptr = (uint8_t*)db_prio;
4765         data.dsize = sizeof(*db_prio);
4766
4767         ret = ctdb_control(ctdb, destnode, 0, 
4768                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4769                            tmp_ctx, NULL, &res, &timeout, NULL);
4770         if (ret != 0 || res != 0) {
4771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4772                 talloc_free(tmp_ctx);
4773                 return -1;
4774         }
4775
4776         talloc_free(tmp_ctx);
4777
4778         return 0;
4779 }
4780
4781 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4782 {
4783         int ret;
4784         int32_t res;
4785         TDB_DATA data;
4786         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4787
4788         data.dptr = (uint8_t*)&db_id;
4789         data.dsize = sizeof(db_id);
4790
4791         ret = ctdb_control(ctdb, destnode, 0, 
4792                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4793                            tmp_ctx, NULL, &res, &timeout, NULL);
4794         if (ret != 0 || res < 0) {
4795                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4796                 talloc_free(tmp_ctx);
4797                 return -1;
4798         }
4799
4800         if (priority) {
4801                 *priority = res;
4802         }
4803
4804         talloc_free(tmp_ctx);
4805
4806         return 0;
4807 }
4808
4809 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4810 {
4811         int ret;
4812         TDB_DATA outdata;
4813         int32_t res;
4814
4815         ret = ctdb_control(ctdb, destnode, 0, 
4816                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4817                            mem_ctx, &outdata, &res, &timeout, NULL);
4818         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4819                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4820                 return -1;
4821         }
4822
4823         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4824         talloc_free(outdata.dptr);
4825                     
4826         return 0;
4827 }
4828
4829 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4830 {
4831         if (h == NULL) {
4832                 return NULL;
4833         }
4834
4835         return &h->header;
4836 }
4837
4838
4839 struct ctdb_client_control_state *
4840 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4841 {
4842         struct ctdb_client_control_state *handle;
4843         struct ctdb_marshall_buffer *m;
4844         struct ctdb_rec_data *rec;
4845         TDB_DATA outdata;
4846
4847         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4848         if (m == NULL) {
4849                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4850                 return NULL;
4851         }
4852
4853         m->db_id = ctdb_db->db_id;
4854
4855         rec = ctdb_marshall_record(m, 0, key, header, data);
4856         if (rec == NULL) {
4857                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4858                 talloc_free(m);
4859                 return NULL;
4860         }
4861         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4862         if (m == NULL) {
4863                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4864                 talloc_free(m);
4865                 return NULL;
4866         }
4867         m->count++;
4868         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4869
4870
4871         outdata.dptr = (uint8_t *)m;
4872         outdata.dsize = talloc_get_size(m);
4873
4874         handle = ctdb_control_send(ctdb, destnode, 0, 
4875                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4876                            mem_ctx, &timeout, NULL);
4877         talloc_free(m);
4878         return handle;
4879 }
4880
4881 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4882 {
4883         int ret;
4884         int32_t res;
4885
4886         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4887         if ( (ret != 0) || (res != 0) ){
4888                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4889                 return -1;
4890         }
4891
4892         return 0;
4893 }
4894
4895 int
4896 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4897 {
4898         struct ctdb_client_control_state *state;
4899
4900         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4901         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4902 }
4903
4904
4905
4906
4907
4908
4909 /*
4910   set a database to be readonly
4911  */
4912 struct ctdb_client_control_state *
4913 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4914 {
4915         TDB_DATA data;
4916
4917         data.dptr = (uint8_t *)&dbid;
4918         data.dsize = sizeof(dbid);
4919
4920         return ctdb_control_send(ctdb, destnode, 0, 
4921                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4922                            ctdb, NULL, NULL);
4923 }
4924
4925 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4926 {
4927         int ret;
4928         int32_t res;
4929
4930         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4931         if (ret != 0 || res != 0) {
4932           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4933                 return -1;
4934         }
4935
4936         return 0;
4937 }
4938
4939 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4940 {
4941         struct ctdb_client_control_state *state;
4942
4943         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4944         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4945 }
4946
4947 /*
4948   set a database to be sticky
4949  */
4950 struct ctdb_client_control_state *
4951 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4952 {
4953         TDB_DATA data;
4954
4955         data.dptr = (uint8_t *)&dbid;
4956         data.dsize = sizeof(dbid);
4957
4958         return ctdb_control_send(ctdb, destnode, 0, 
4959                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4960                            ctdb, NULL, NULL);
4961 }
4962
4963 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4964 {
4965         int ret;
4966         int32_t res;
4967
4968         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4969         if (ret != 0 || res != 0) {
4970                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4971                 return -1;
4972         }
4973
4974         return 0;
4975 }
4976
4977 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4978 {
4979         struct ctdb_client_control_state *state;
4980
4981         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4982         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4983 }