client: Always use jenkins hash when attaching volatile databases
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data && updatetdb) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
211                 exit(1);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
276                 return -1;
277         }
278
279         set_nonblocking(ctdb->daemon.sd);
280         set_close_on_exec(ctdb->daemon.sd);
281         
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return call->status;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
372         if (ret != 0) {
373                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
374         }
375
376         return state;
377 }
378
379 /*
380   make a ctdb call to the local daemon - async send. Called from client context.
381
382   This constructs a ctdb_call request and queues it for processing. 
383   This call never blocks.
384 */
385 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
386                                               struct ctdb_call *call)
387 {
388         struct ctdb_client_call_state *state;
389         struct ctdb_context *ctdb = ctdb_db->ctdb;
390         struct ctdb_ltdb_header header;
391         TDB_DATA data;
392         int ret;
393         size_t len;
394         struct ctdb_req_call *c;
395
396         /* if the domain socket is not yet open, open it */
397         if (ctdb->daemon.sd==-1) {
398                 ctdb_socket_connect(ctdb);
399         }
400
401         ret = ctdb_ltdb_lock(ctdb_db, call->key);
402         if (ret != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
404                 return NULL;
405         }
406
407         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
408
409         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
410                 ret = -1;
411         }
412
413         if (ret == 0 && header.dmaster == ctdb->pnn) {
414                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
415                 talloc_free(data.dptr);
416                 ctdb_ltdb_unlock(ctdb_db, call->key);
417                 return state;
418         }
419
420         ctdb_ltdb_unlock(ctdb_db, call->key);
421         talloc_free(data.dptr);
422
423         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
424         if (state == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
426                 return NULL;
427         }
428         state->call = talloc_zero(state, struct ctdb_call);
429         if (state->call == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
431                 return NULL;
432         }
433
434         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
435         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
436         if (c == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
438                 return NULL;
439         }
440
441         state->reqid     = ctdb_reqid_new(ctdb, state);
442         state->ctdb_db = ctdb_db;
443         talloc_set_destructor(state, ctdb_client_call_destructor);
444
445         c->hdr.reqid     = state->reqid;
446         c->flags         = call->flags;
447         c->db_id         = ctdb_db->db_id;
448         c->callid        = call->call_id;
449         c->hopcount      = 0;
450         c->keylen        = call->key.dsize;
451         c->calldatalen   = call->call_data.dsize;
452         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
453         memcpy(&c->data[call->key.dsize], 
454                call->call_data.dptr, call->call_data.dsize);
455         *(state->call)              = *call;
456         state->call->call_data.dptr = &c->data[call->key.dsize];
457         state->call->key.dptr       = &c->data[0];
458
459         state->state  = CTDB_CALL_WAIT;
460
461
462         ctdb_client_queue_pkt(ctdb, &c->hdr);
463
464         return state;
465 }
466
467
468 /*
469   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
470 */
471 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
472 {
473         struct ctdb_client_call_state *state;
474
475         state = ctdb_call_send(ctdb_db, call);
476         return ctdb_call_recv(state, call);
477 }
478
479
480 /*
481   tell the daemon what messaging srvid we will use, and register the message
482   handler function in the client
483 */
484 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
485                              ctdb_msg_fn_t handler,
486                              void *private_data)
487                                     
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
501 }
502
503 /*
504   tell the daemon we no longer want a srvid
505 */
506 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         ctdb_deregister_message_handler(ctdb, srvid, private_data);
520         return 0;
521 }
522
523
524 /*
525   send a message - from client context
526  */
527 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
528                       uint64_t srvid, TDB_DATA data)
529 {
530         struct ctdb_req_message *r;
531         int len, res;
532
533         len = offsetof(struct ctdb_req_message, data) + data.dsize;
534         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
535                                len, struct ctdb_req_message);
536         CTDB_NO_MEMORY(ctdb, r);
537
538         r->hdr.destnode  = pnn;
539         r->srvid         = srvid;
540         r->datalen       = data.dsize;
541         memcpy(&r->data[0], data.dptr, data.dsize);
542         
543         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
544         talloc_free(r);
545         return res;
546 }
547
548
549 /*
550   cancel a ctdb_fetch_lock operation, releasing the lock
551  */
552 static int fetch_lock_destructor(struct ctdb_record_handle *h)
553 {
554         ctdb_ltdb_unlock(h->ctdb_db, h->key);
555         return 0;
556 }
557
558 /*
559   force the migration of a record to this node
560  */
561 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
562 {
563         struct ctdb_call call;
564         ZERO_STRUCT(call);
565         call.call_id = CTDB_NULL_FUNC;
566         call.key = key;
567         call.flags = CTDB_IMMEDIATE_MIGRATION;
568         return ctdb_call(ctdb_db, &call);
569 }
570
571 /*
572   try to fetch a readonly copy of a record
573  */
574 static int
575 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
576 {
577         int ret;
578
579         struct ctdb_call call;
580         ZERO_STRUCT(call);
581
582         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
583         call.call_data.dptr = NULL;
584         call.call_data.dsize = 0;
585         call.key = key;
586         call.flags = CTDB_WANT_READONLY;
587         ret = ctdb_call(ctdb_db, &call);
588
589         if (ret != 0) {
590                 return -1;
591         }
592         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
593                 return -1;
594         }
595
596         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
597         if (*hdr == NULL) {
598                 talloc_free(call.reply_data.dptr);
599                 return -1;
600         }
601
602         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
603         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
604         if (data->dptr == NULL) {
605                 talloc_free(call.reply_data.dptr);
606                 talloc_free(hdr);
607                 return -1;
608         }
609
610         return 0;
611 }
612
613 /*
614   get a lock on a record, and return the records data. Blocks until it gets the lock
615  */
616 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
617                                            TDB_DATA key, TDB_DATA *data)
618 {
619         int ret;
620         struct ctdb_record_handle *h;
621
622         /*
623           procedure is as follows:
624
625           1) get the chain lock. 
626           2) check if we are dmaster
627           3) if we are the dmaster then return handle 
628           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
629              reply from ctdbd
630           5) when we get the reply, goto (1)
631          */
632
633         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
634         if (h == NULL) {
635                 return NULL;
636         }
637
638         h->ctdb_db = ctdb_db;
639         h->key     = key;
640         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
641         if (h->key.dptr == NULL) {
642                 talloc_free(h);
643                 return NULL;
644         }
645         h->data    = data;
646
647         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
648                  (const char *)key.dptr));
649
650 again:
651         /* step 1 - get the chain lock */
652         ret = ctdb_ltdb_lock(ctdb_db, key);
653         if (ret != 0) {
654                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
655                 talloc_free(h);
656                 return NULL;
657         }
658
659         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
660
661         talloc_set_destructor(h, fetch_lock_destructor);
662
663         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
664
665         /* when torturing, ensure we test the remote path */
666         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
667             random() % 5 == 0) {
668                 h->header.dmaster = (uint32_t)-1;
669         }
670
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
673
674         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
675                 ctdb_ltdb_unlock(ctdb_db, key);
676                 ret = ctdb_client_force_migration(ctdb_db, key);
677                 if (ret != 0) {
678                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
679                         talloc_free(h);
680                         return NULL;
681                 }
682                 goto again;
683         }
684
685         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
686         return h;
687 }
688
689 /*
690   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
691  */
692 struct ctdb_record_handle *
693 ctdb_fetch_readonly_lock(
694         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
695         TDB_DATA key, TDB_DATA *data,
696         int read_only)
697 {
698         int ret;
699         struct ctdb_record_handle *h;
700         struct ctdb_ltdb_header *roheader = NULL;
701
702         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
703         if (h == NULL) {
704                 return NULL;
705         }
706
707         h->ctdb_db = ctdb_db;
708         h->key     = key;
709         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
710         if (h->key.dptr == NULL) {
711                 talloc_free(h);
712                 return NULL;
713         }
714         h->data    = data;
715
716         data->dptr = NULL;
717         data->dsize = 0;
718
719
720 again:
721         talloc_free(roheader);
722         roheader = NULL;
723
724         talloc_free(data->dptr);
725         data->dptr = NULL;
726         data->dsize = 0;
727
728         /* Lock the record/chain */
729         ret = ctdb_ltdb_lock(ctdb_db, key);
730         if (ret != 0) {
731                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
732                 talloc_free(h);
733                 return NULL;
734         }
735
736         talloc_set_destructor(h, fetch_lock_destructor);
737
738         /* Check if record exists yet in the TDB */
739         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
740         if (ret != 0) {
741                 ctdb_ltdb_unlock(ctdb_db, key);
742                 ret = ctdb_client_force_migration(ctdb_db, key);
743                 if (ret != 0) {
744                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
745                         talloc_free(h);
746                         return NULL;
747                 }
748                 goto again;
749         }
750
751         /* if this is a request for read/write and we have delegations
752            we have to revoke all delegations first
753         */
754         if ((read_only == 0) 
755         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
756         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
757                 ctdb_ltdb_unlock(ctdb_db, key);
758                 ret = ctdb_client_force_migration(ctdb_db, key);
759                 if (ret != 0) {
760                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
761                         talloc_free(h);
762                         return NULL;
763                 }
764                 goto again;
765         }
766
767         /* if we are dmaster, just return the handle */
768         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
769                 return h;
770         }
771
772         if (read_only != 0) {
773                 TDB_DATA rodata = {NULL, 0};
774
775                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
776                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
777                         return h;
778                 }
779
780                 ctdb_ltdb_unlock(ctdb_db, key);
781                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
782                 if (ret != 0) {
783                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
784                         ret = ctdb_client_force_migration(ctdb_db, key);
785                         if (ret != 0) {
786                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
787                                 talloc_free(h);
788                                 return NULL;
789                         }
790
791                         goto again;
792                 }
793
794                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
795                         ret = ctdb_client_force_migration(ctdb_db, key);
796                         if (ret != 0) {
797                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
798                                 talloc_free(h);
799                                 return NULL;
800                         }
801
802                         goto again;
803                 }
804
805                 ret = ctdb_ltdb_lock(ctdb_db, key);
806                 if (ret != 0) {
807                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
808                         talloc_free(h);
809                         return NULL;
810                 }
811
812                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
813                 if (ret != 0) {
814                         ctdb_ltdb_unlock(ctdb_db, key);
815
816                         ret = ctdb_client_force_migration(ctdb_db, key);
817                         if (ret != 0) {
818                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
819                                 talloc_free(h);
820                                 return NULL;
821                         }
822
823                         goto again;
824                 }
825
826                 return h;
827         }
828
829         /* we are not dmaster and this was not a request for a readonly lock
830          * so unlock the record, migrate it and try again
831          */
832         ctdb_ltdb_unlock(ctdb_db, key);
833         ret = ctdb_client_force_migration(ctdb_db, key);
834         if (ret != 0) {
835                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
836                 talloc_free(h);
837                 return NULL;
838         }
839         goto again;
840 }
841
842 /*
843   store some data to the record that was locked with ctdb_fetch_lock()
844 */
845 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
846 {
847         if (h->ctdb_db->persistent) {
848                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
849                 return -1;
850         }
851
852         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
853 }
854
855 /*
856   non-locking fetch of a record
857  */
858 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
859                TDB_DATA key, TDB_DATA *data)
860 {
861         struct ctdb_call call;
862         int ret;
863
864         call.call_id = CTDB_FETCH_FUNC;
865         call.call_data.dptr = NULL;
866         call.call_data.dsize = 0;
867         call.key = key;
868
869         ret = ctdb_call(ctdb_db, &call);
870
871         if (ret == 0) {
872                 *data = call.reply_data;
873                 talloc_steal(mem_ctx, data->dptr);
874         }
875
876         return ret;
877 }
878
879
880
881 /*
882    called when a control completes or timesout to invoke the callback
883    function the user provided
884 */
885 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
886         struct timeval t, void *private_data)
887 {
888         struct ctdb_client_control_state *state;
889         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
890         int ret;
891
892         state = talloc_get_type(private_data, struct ctdb_client_control_state);
893         talloc_steal(tmp_ctx, state);
894
895         ret = ctdb_control_recv(state->ctdb, state, state,
896                         NULL, 
897                         NULL, 
898                         NULL);
899         if (ret != 0) {
900                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
901         }
902
903         talloc_free(tmp_ctx);
904 }
905
906 /*
907   called when a CTDB_REPLY_CONTROL packet comes in in the client
908
909   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
910   contains any reply data from the control
911 */
912 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
913                                       struct ctdb_req_header *hdr)
914 {
915         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
916         struct ctdb_client_control_state *state;
917
918         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
919         if (state == NULL) {
920                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
921                 return;
922         }
923
924         if (hdr->reqid != state->reqid) {
925                 /* we found a record  but it was the wrong one */
926                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
927                 return;
928         }
929
930         state->outdata.dptr = c->data;
931         state->outdata.dsize = c->datalen;
932         state->status = c->status;
933         if (c->errorlen) {
934                 state->errormsg = talloc_strndup(state, 
935                                                  (char *)&c->data[c->datalen], 
936                                                  c->errorlen);
937         }
938
939         /* state->outdata now uses resources from c so we dont want c
940            to just dissappear from under us while state is still alive
941         */
942         talloc_steal(state, c);
943
944         state->state = CTDB_CONTROL_DONE;
945
946         /* if we had a callback registered for this control, pull the response
947            and call the callback.
948         */
949         if (state->async.fn) {
950                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
951         }
952 }
953
954
955 /*
956   destroy a ctdb_control in client
957 */
958 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
959 {
960         ctdb_reqid_remove(state->ctdb, state->reqid);
961         return 0;
962 }
963
964
965 /* time out handler for ctdb_control */
966 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
967         struct timeval t, void *private_data)
968 {
969         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
970
971         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
972                          "dstnode:%u\n", state->reqid, state->c->opcode,
973                          state->c->hdr.destnode));
974
975         state->state = CTDB_CONTROL_TIMEOUT;
976
977         /* if we had a callback registered for this control, pull the response
978            and call the callback.
979         */
980         if (state->async.fn) {
981                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
982         }
983 }
984
985 /* async version of send control request */
986 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
987                 uint32_t destnode, uint64_t srvid, 
988                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
989                 TALLOC_CTX *mem_ctx,
990                 struct timeval *timeout,
991                 char **errormsg)
992 {
993         struct ctdb_client_control_state *state;
994         size_t len;
995         struct ctdb_req_control *c;
996         int ret;
997
998         if (errormsg) {
999                 *errormsg = NULL;
1000         }
1001
1002         /* if the domain socket is not yet open, open it */
1003         if (ctdb->daemon.sd==-1) {
1004                 ctdb_socket_connect(ctdb);
1005         }
1006
1007         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1008         CTDB_NO_MEMORY_NULL(ctdb, state);
1009
1010         state->ctdb       = ctdb;
1011         state->reqid      = ctdb_reqid_new(ctdb, state);
1012         state->state      = CTDB_CONTROL_WAIT;
1013         state->errormsg   = NULL;
1014
1015         talloc_set_destructor(state, ctdb_client_control_destructor);
1016
1017         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1018         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1019                                len, struct ctdb_req_control);
1020         state->c            = c;        
1021         CTDB_NO_MEMORY_NULL(ctdb, c);
1022         c->hdr.reqid        = state->reqid;
1023         c->hdr.destnode     = destnode;
1024         c->opcode           = opcode;
1025         c->client_id        = 0;
1026         c->flags            = flags;
1027         c->srvid            = srvid;
1028         c->datalen          = data.dsize;
1029         if (data.dsize) {
1030                 memcpy(&c->data[0], data.dptr, data.dsize);
1031         }
1032
1033         /* timeout */
1034         if (timeout && !timeval_is_zero(timeout)) {
1035                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1036         }
1037
1038         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1039         if (ret != 0) {
1040                 talloc_free(state);
1041                 return NULL;
1042         }
1043
1044         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1045                 talloc_free(state);
1046                 return NULL;
1047         }
1048
1049         return state;
1050 }
1051
1052
1053 /* async version of receive control reply */
1054 int ctdb_control_recv(struct ctdb_context *ctdb, 
1055                 struct ctdb_client_control_state *state, 
1056                 TALLOC_CTX *mem_ctx,
1057                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1058 {
1059         TALLOC_CTX *tmp_ctx;
1060
1061         if (status != NULL) {
1062                 *status = -1;
1063         }
1064         if (errormsg != NULL) {
1065                 *errormsg = NULL;
1066         }
1067
1068         if (state == NULL) {
1069                 return -1;
1070         }
1071
1072         /* prevent double free of state */
1073         tmp_ctx = talloc_new(ctdb);
1074         talloc_steal(tmp_ctx, state);
1075
1076         /* loop one event at a time until we either timeout or the control
1077            completes.
1078         */
1079         while (state->state == CTDB_CONTROL_WAIT) {
1080                 event_loop_once(ctdb->ev);
1081         }
1082
1083         if (state->state != CTDB_CONTROL_DONE) {
1084                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1085                 if (state->async.fn) {
1086                         state->async.fn(state);
1087                 }
1088                 talloc_free(tmp_ctx);
1089                 return -1;
1090         }
1091
1092         if (state->errormsg) {
1093                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1094                 if (errormsg) {
1095                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1096                 }
1097                 if (state->async.fn) {
1098                         state->async.fn(state);
1099                 }
1100                 talloc_free(tmp_ctx);
1101                 return -1;
1102         }
1103
1104         if (outdata) {
1105                 *outdata = state->outdata;
1106                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1107         }
1108
1109         if (status) {
1110                 *status = state->status;
1111         }
1112
1113         if (state->async.fn) {
1114                 state->async.fn(state);
1115         }
1116
1117         talloc_free(tmp_ctx);
1118         return 0;
1119 }
1120
1121
1122
1123 /*
1124   send a ctdb control message
1125   timeout specifies how long we should wait for a reply.
1126   if timeout is NULL we wait indefinitely
1127  */
1128 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1129                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1130                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1131                  struct timeval *timeout,
1132                  char **errormsg)
1133 {
1134         struct ctdb_client_control_state *state;
1135
1136         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1137                         flags, data, mem_ctx,
1138                         timeout, errormsg);
1139
1140         /* FIXME: Error conditions in ctdb_control_send return NULL without
1141          * setting errormsg.  So, there is no way to distinguish between sucess
1142          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1143         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1144                 if (status != NULL) {
1145                         *status = 0;
1146                 }
1147                 return 0;
1148         }
1149
1150         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1151                         errormsg);
1152 }
1153
1154
1155
1156
1157 /*
1158   a process exists call. Returns 0 if process exists, -1 otherwise
1159  */
1160 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1161 {
1162         int ret;
1163         TDB_DATA data;
1164         int32_t status;
1165
1166         data.dptr = (uint8_t*)&pid;
1167         data.dsize = sizeof(pid);
1168
1169         ret = ctdb_control(ctdb, destnode, 0, 
1170                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1171                            NULL, NULL, &status, NULL, NULL);
1172         if (ret != 0) {
1173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1174                 return -1;
1175         }
1176
1177         return status;
1178 }
1179
1180 /*
1181   get remote statistics
1182  */
1183 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1184 {
1185         int ret;
1186         TDB_DATA data;
1187         int32_t res;
1188
1189         ret = ctdb_control(ctdb, destnode, 0, 
1190                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1191                            ctdb, &data, &res, NULL, NULL);
1192         if (ret != 0 || res != 0) {
1193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1194                 return -1;
1195         }
1196
1197         if (data.dsize != sizeof(struct ctdb_statistics)) {
1198                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1199                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1200                       return -1;
1201         }
1202
1203         *status = *(struct ctdb_statistics *)data.dptr;
1204         talloc_free(data.dptr);
1205                         
1206         return 0;
1207 }
1208
1209 /*
1210   shutdown a remote ctdb node
1211  */
1212 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1213 {
1214         struct ctdb_client_control_state *state;
1215
1216         state = ctdb_control_send(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1218                            NULL, &timeout, NULL);
1219         if (state == NULL) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1221                 return -1;
1222         }
1223
1224         return 0;
1225 }
1226
1227 /*
1228   get vnn map from a remote node
1229  */
1230 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1231 {
1232         int ret;
1233         TDB_DATA outdata;
1234         int32_t res;
1235         struct ctdb_vnn_map_wire *map;
1236
1237         ret = ctdb_control(ctdb, destnode, 0, 
1238                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1239                            mem_ctx, &outdata, &res, &timeout, NULL);
1240         if (ret != 0 || res != 0) {
1241                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1242                 return -1;
1243         }
1244         
1245         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1246         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1247             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1248                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1249                 return -1;
1250         }
1251
1252         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1253         CTDB_NO_MEMORY(ctdb, *vnnmap);
1254         (*vnnmap)->generation = map->generation;
1255         (*vnnmap)->size       = map->size;
1256         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1257
1258         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1259         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1260         talloc_free(outdata.dptr);
1261                     
1262         return 0;
1263 }
1264
1265
1266 /*
1267   get the recovery mode of a remote node
1268  */
1269 struct ctdb_client_control_state *
1270 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1271 {
1272         return ctdb_control_send(ctdb, destnode, 0, 
1273                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1274                            mem_ctx, &timeout, NULL);
1275 }
1276
1277 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1278 {
1279         int ret;
1280         int32_t res;
1281
1282         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1285                 return -1;
1286         }
1287
1288         if (recmode) {
1289                 *recmode = (uint32_t)res;
1290         }
1291
1292         return 0;
1293 }
1294
1295 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1296 {
1297         struct ctdb_client_control_state *state;
1298
1299         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1300         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1301 }
1302
1303
1304
1305
1306 /*
1307   set the recovery mode of a remote node
1308  */
1309 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1310 {
1311         int ret;
1312         TDB_DATA data;
1313         int32_t res;
1314
1315         data.dsize = sizeof(uint32_t);
1316         data.dptr = (unsigned char *)&recmode;
1317
1318         ret = ctdb_control(ctdb, destnode, 0, 
1319                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1320                            NULL, NULL, &res, &timeout, NULL);
1321         if (ret != 0 || res != 0) {
1322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1323                 return -1;
1324         }
1325
1326         return 0;
1327 }
1328
1329
1330
1331 /*
1332   get the recovery master of a remote node
1333  */
1334 struct ctdb_client_control_state *
1335 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1336                         struct timeval timeout, uint32_t destnode)
1337 {
1338         return ctdb_control_send(ctdb, destnode, 0, 
1339                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1340                            mem_ctx, &timeout, NULL);
1341 }
1342
1343 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1344 {
1345         int ret;
1346         int32_t res;
1347
1348         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1349         if (ret != 0) {
1350                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1351                 return -1;
1352         }
1353
1354         if (recmaster) {
1355                 *recmaster = (uint32_t)res;
1356         }
1357
1358         return 0;
1359 }
1360
1361 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1362 {
1363         struct ctdb_client_control_state *state;
1364
1365         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1366         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1367 }
1368
1369
1370 /*
1371   set the recovery master of a remote node
1372  */
1373 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1374 {
1375         int ret;
1376         TDB_DATA data;
1377         int32_t res;
1378
1379         ZERO_STRUCT(data);
1380         data.dsize = sizeof(uint32_t);
1381         data.dptr = (unsigned char *)&recmaster;
1382
1383         ret = ctdb_control(ctdb, destnode, 0, 
1384                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1385                            NULL, NULL, &res, &timeout, NULL);
1386         if (ret != 0 || res != 0) {
1387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1388                 return -1;
1389         }
1390
1391         return 0;
1392 }
1393
1394
1395 /*
1396   get a list of databases off a remote node
1397  */
1398 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1399                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1400 {
1401         int ret;
1402         TDB_DATA outdata;
1403         int32_t res;
1404
1405         ret = ctdb_control(ctdb, destnode, 0, 
1406                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1407                            mem_ctx, &outdata, &res, &timeout, NULL);
1408         if (ret != 0 || res != 0) {
1409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1410                 return -1;
1411         }
1412
1413         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1414         talloc_free(outdata.dptr);
1415                     
1416         return 0;
1417 }
1418
1419 /*
1420   get a list of nodes (vnn and flags ) from a remote node
1421  */
1422 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1423                 struct timeval timeout, uint32_t destnode, 
1424                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1425 {
1426         int ret;
1427         TDB_DATA outdata;
1428         int32_t res;
1429
1430         ret = ctdb_control(ctdb, destnode, 0, 
1431                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1432                            mem_ctx, &outdata, &res, &timeout, NULL);
1433         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1435                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1436         }
1437         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1438                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1439                 return -1;
1440         }
1441
1442         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1443         talloc_free(outdata.dptr);
1444                     
1445         return 0;
1446 }
1447
1448 /*
1449   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1450  */
1451 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1452                 struct timeval timeout, uint32_t destnode, 
1453                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1454 {
1455         int ret, i, len;
1456         TDB_DATA outdata;
1457         struct ctdb_node_mapv4 *nodemapv4;
1458         int32_t res;
1459
1460         ret = ctdb_control(ctdb, destnode, 0, 
1461                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1462                            mem_ctx, &outdata, &res, &timeout, NULL);
1463         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1465                 return -1;
1466         }
1467
1468         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1469
1470         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1471         (*nodemap) = talloc_zero_size(mem_ctx, len);
1472         CTDB_NO_MEMORY(ctdb, (*nodemap));
1473
1474         (*nodemap)->num = nodemapv4->num;
1475         for (i=0; i<nodemapv4->num; i++) {
1476                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1477                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1478                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1479                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1480         }
1481                 
1482         talloc_free(outdata.dptr);
1483                     
1484         return 0;
1485 }
1486
1487 /*
1488   drop the transport, reload the nodes file and restart the transport
1489  */
1490 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1491                     struct timeval timeout, uint32_t destnode)
1492 {
1493         int ret;
1494         int32_t res;
1495
1496         ret = ctdb_control(ctdb, destnode, 0, 
1497                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1498                            NULL, NULL, &res, &timeout, NULL);
1499         if (ret != 0 || res != 0) {
1500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1501                 return -1;
1502         }
1503
1504         return 0;
1505 }
1506
1507
1508 /*
1509   set vnn map on a node
1510  */
1511 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1512                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1513 {
1514         int ret;
1515         TDB_DATA data;
1516         int32_t res;
1517         struct ctdb_vnn_map_wire *map;
1518         size_t len;
1519
1520         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1521         map = talloc_size(mem_ctx, len);
1522         CTDB_NO_MEMORY(ctdb, map);
1523
1524         map->generation = vnnmap->generation;
1525         map->size = vnnmap->size;
1526         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1527         
1528         data.dsize = len;
1529         data.dptr  = (uint8_t *)map;
1530
1531         ret = ctdb_control(ctdb, destnode, 0, 
1532                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1533                            NULL, NULL, &res, &timeout, NULL);
1534         if (ret != 0 || res != 0) {
1535                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1536                 return -1;
1537         }
1538
1539         talloc_free(map);
1540
1541         return 0;
1542 }
1543
1544
1545 /*
1546   async send for pull database
1547  */
1548 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1549         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1550         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1551 {
1552         TDB_DATA indata;
1553         struct ctdb_control_pulldb *pull;
1554         struct ctdb_client_control_state *state;
1555
1556         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1557         CTDB_NO_MEMORY_NULL(ctdb, pull);
1558
1559         pull->db_id   = dbid;
1560         pull->lmaster = lmaster;
1561
1562         indata.dsize = sizeof(struct ctdb_control_pulldb);
1563         indata.dptr  = (unsigned char *)pull;
1564
1565         state = ctdb_control_send(ctdb, destnode, 0, 
1566                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1567                                   mem_ctx, &timeout, NULL);
1568         talloc_free(pull);
1569
1570         return state;
1571 }
1572
1573 /*
1574   async recv for pull database
1575  */
1576 int ctdb_ctrl_pulldb_recv(
1577         struct ctdb_context *ctdb, 
1578         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1579         TDB_DATA *outdata)
1580 {
1581         int ret;
1582         int32_t res;
1583
1584         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1585         if ( (ret != 0) || (res != 0) ){
1586                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1587                 return -1;
1588         }
1589
1590         return 0;
1591 }
1592
1593 /*
1594   pull all keys and records for a specific database on a node
1595  */
1596 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1597                 uint32_t dbid, uint32_t lmaster, 
1598                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1599                 TDB_DATA *outdata)
1600 {
1601         struct ctdb_client_control_state *state;
1602
1603         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1604                                       timeout);
1605         
1606         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1607 }
1608
1609
1610 /*
1611   change dmaster for all keys in the database to the new value
1612  */
1613 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1614                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1615 {
1616         int ret;
1617         TDB_DATA indata;
1618         int32_t res;
1619
1620         indata.dsize = 2*sizeof(uint32_t);
1621         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1622
1623         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1624         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1631                 return -1;
1632         }
1633
1634         return 0;
1635 }
1636
1637 /*
1638   ping a node, return number of clients connected
1639  */
1640 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1641 {
1642         int ret;
1643         int32_t res;
1644
1645         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1646                            tdb_null, NULL, NULL, &res, NULL, NULL);
1647         if (ret != 0) {
1648                 return -1;
1649         }
1650         return res;
1651 }
1652
1653 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1654                            struct timeval timeout, 
1655                            uint32_t destnode,
1656                            uint32_t *runstate)
1657 {
1658         TDB_DATA outdata;
1659         int32_t res;
1660         int ret;
1661
1662         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1663                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1664         if (ret != 0 || res != 0) {
1665                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1666                 return ret != 0 ? ret : res;
1667         }
1668
1669         if (outdata.dsize != sizeof(uint32_t)) {
1670                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1671                 talloc_free(outdata.dptr);
1672                 return -1;
1673         }
1674
1675         if (runstate != NULL) {
1676                 *runstate = *(uint32_t *)outdata.dptr;
1677         }
1678         talloc_free(outdata.dptr);
1679
1680         return 0;
1681 }
1682
1683 /*
1684   find the real path to a ltdb 
1685  */
1686 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1687                    const char **path)
1688 {
1689         int ret;
1690         int32_t res;
1691         TDB_DATA data;
1692
1693         data.dptr = (uint8_t *)&dbid;
1694         data.dsize = sizeof(dbid);
1695
1696         ret = ctdb_control(ctdb, destnode, 0, 
1697                            CTDB_CONTROL_GETDBPATH, 0, data, 
1698                            mem_ctx, &data, &res, &timeout, NULL);
1699         if (ret != 0 || res != 0) {
1700                 return -1;
1701         }
1702
1703         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1704         if ((*path) == NULL) {
1705                 return -1;
1706         }
1707
1708         talloc_free(data.dptr);
1709
1710         return 0;
1711 }
1712
1713 /*
1714   find the name of a db 
1715  */
1716 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1717                    const char **name)
1718 {
1719         int ret;
1720         int32_t res;
1721         TDB_DATA data;
1722
1723         data.dptr = (uint8_t *)&dbid;
1724         data.dsize = sizeof(dbid);
1725
1726         ret = ctdb_control(ctdb, destnode, 0, 
1727                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1728                            mem_ctx, &data, &res, &timeout, NULL);
1729         if (ret != 0 || res != 0) {
1730                 return -1;
1731         }
1732
1733         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1734         if ((*name) == NULL) {
1735                 return -1;
1736         }
1737
1738         talloc_free(data.dptr);
1739
1740         return 0;
1741 }
1742
1743 /*
1744   get the health status of a db
1745  */
1746 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1747                           struct timeval timeout,
1748                           uint32_t destnode,
1749                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1750                           const char **reason)
1751 {
1752         int ret;
1753         int32_t res;
1754         TDB_DATA data;
1755
1756         data.dptr = (uint8_t *)&dbid;
1757         data.dsize = sizeof(dbid);
1758
1759         ret = ctdb_control(ctdb, destnode, 0,
1760                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1761                            mem_ctx, &data, &res, &timeout, NULL);
1762         if (ret != 0 || res != 0) {
1763                 return -1;
1764         }
1765
1766         if (data.dsize == 0) {
1767                 (*reason) = NULL;
1768                 return 0;
1769         }
1770
1771         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1772         if ((*reason) == NULL) {
1773                 return -1;
1774         }
1775
1776         talloc_free(data.dptr);
1777
1778         return 0;
1779 }
1780
1781 /*
1782   create a database
1783  */
1784 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1785                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1786 {
1787         int ret;
1788         int32_t res;
1789         TDB_DATA data;
1790
1791         data.dptr = discard_const(name);
1792         data.dsize = strlen(name)+1;
1793
1794         ret = ctdb_control(ctdb, destnode, 0, 
1795                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1796                            0, data, 
1797                            mem_ctx, &data, &res, &timeout, NULL);
1798
1799         if (ret != 0 || res != 0) {
1800                 return -1;
1801         }
1802
1803         return 0;
1804 }
1805
1806 /*
1807   get debug level on a node
1808  */
1809 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1810 {
1811         int ret;
1812         int32_t res;
1813         TDB_DATA data;
1814
1815         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1816                            ctdb, &data, &res, NULL, NULL);
1817         if (ret != 0 || res != 0) {
1818                 return -1;
1819         }
1820         if (data.dsize != sizeof(int32_t)) {
1821                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1822                          (unsigned)data.dsize));
1823                 return -1;
1824         }
1825         *level = *(int32_t *)data.dptr;
1826         talloc_free(data.dptr);
1827         return 0;
1828 }
1829
1830 /*
1831   set debug level on a node
1832  */
1833 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1834 {
1835         int ret;
1836         int32_t res;
1837         TDB_DATA data;
1838
1839         data.dptr = (uint8_t *)&level;
1840         data.dsize = sizeof(level);
1841
1842         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1843                            NULL, NULL, &res, NULL, NULL);
1844         if (ret != 0 || res != 0) {
1845                 return -1;
1846         }
1847         return 0;
1848 }
1849
1850
1851 /*
1852   get a list of connected nodes
1853  */
1854 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1855                                 struct timeval timeout,
1856                                 TALLOC_CTX *mem_ctx,
1857                                 uint32_t *num_nodes)
1858 {
1859         struct ctdb_node_map *map=NULL;
1860         int ret, i;
1861         uint32_t *nodes;
1862
1863         *num_nodes = 0;
1864
1865         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1866         if (ret != 0) {
1867                 return NULL;
1868         }
1869
1870         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1871         if (nodes == NULL) {
1872                 return NULL;
1873         }
1874
1875         for (i=0;i<map->num;i++) {
1876                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1877                         nodes[*num_nodes] = map->nodes[i].pnn;
1878                         (*num_nodes)++;
1879                 }
1880         }
1881
1882         return nodes;
1883 }
1884
1885
1886 /*
1887   reset remote status
1888  */
1889 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1890 {
1891         int ret;
1892         int32_t res;
1893
1894         ret = ctdb_control(ctdb, destnode, 0, 
1895                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1896                            NULL, NULL, &res, NULL, NULL);
1897         if (ret != 0 || res != 0) {
1898                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1899                 return -1;
1900         }
1901         return 0;
1902 }
1903
1904 /*
1905   attach to a specific database - client call
1906 */
1907 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1908                                     struct timeval timeout,
1909                                     const char *name,
1910                                     bool persistent,
1911                                     uint32_t tdb_flags)
1912 {
1913         struct ctdb_db_context *ctdb_db;
1914         TDB_DATA data;
1915         int ret;
1916         int32_t res;
1917
1918         ctdb_db = ctdb_db_handle(ctdb, name);
1919         if (ctdb_db) {
1920                 return ctdb_db;
1921         }
1922
1923         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1924         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1925
1926         ctdb_db->ctdb = ctdb;
1927         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1928         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1929
1930         data.dptr = discard_const(name);
1931         data.dsize = strlen(name)+1;
1932
1933         /* CTDB has switched to using jenkins hash for volatile databases.
1934          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
1935          * always set it.
1936          */
1937         if (!persistent) {
1938                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
1939         }
1940
1941         /* tell ctdb daemon to attach */
1942         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1943                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1944                            0, data, ctdb_db, &data, &res, NULL, NULL);
1945         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1946                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1947                 talloc_free(ctdb_db);
1948                 return NULL;
1949         }
1950         
1951         ctdb_db->db_id = *(uint32_t *)data.dptr;
1952         talloc_free(data.dptr);
1953
1954         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1955         if (ret != 0) {
1956                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1957                 talloc_free(ctdb_db);
1958                 return NULL;
1959         }
1960
1961         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1962         if (ctdb->valgrinding) {
1963                 tdb_flags |= TDB_NOMMAP;
1964         }
1965         tdb_flags |= TDB_DISALLOW_NESTING;
1966
1967         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1968         if (ctdb_db->ltdb == NULL) {
1969                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1970                 talloc_free(ctdb_db);
1971                 return NULL;
1972         }
1973
1974         ctdb_db->persistent = persistent;
1975
1976         DLIST_ADD(ctdb->db_list, ctdb_db);
1977
1978         /* add well known functions */
1979         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1980         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1981         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1982
1983         return ctdb_db;
1984 }
1985
1986
1987 /*
1988   setup a call for a database
1989  */
1990 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1991 {
1992         struct ctdb_registered_call *call;
1993
1994 #if 0
1995         TDB_DATA data;
1996         int32_t status;
1997         struct ctdb_control_set_call c;
1998         int ret;
1999
2000         /* this is no longer valid with the separate daemon architecture */
2001         c.db_id = ctdb_db->db_id;
2002         c.fn    = fn;
2003         c.id    = id;
2004
2005         data.dptr = (uint8_t *)&c;
2006         data.dsize = sizeof(c);
2007
2008         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2009                            data, NULL, NULL, &status, NULL, NULL);
2010         if (ret != 0 || status != 0) {
2011                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2012                 return -1;
2013         }
2014 #endif
2015
2016         /* also register locally */
2017         call = talloc(ctdb_db, struct ctdb_registered_call);
2018         call->fn = fn;
2019         call->id = id;
2020
2021         DLIST_ADD(ctdb_db->calls, call);        
2022         return 0;
2023 }
2024
2025
2026 struct traverse_state {
2027         bool done;
2028         uint32_t count;
2029         ctdb_traverse_func fn;
2030         void *private_data;
2031         bool listemptyrecords;
2032 };
2033
2034 /*
2035   called on each key during a ctdb_traverse
2036  */
2037 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2038 {
2039         struct traverse_state *state = (struct traverse_state *)p;
2040         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2041         TDB_DATA key;
2042
2043         if (data.dsize < sizeof(uint32_t) ||
2044             d->length != data.dsize) {
2045                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2046                 state->done = true;
2047                 return;
2048         }
2049
2050         key.dsize = d->keylen;
2051         key.dptr  = &d->data[0];
2052         data.dsize = d->datalen;
2053         data.dptr = &d->data[d->keylen];
2054
2055         if (key.dsize == 0 && data.dsize == 0) {
2056                 /* end of traverse */
2057                 state->done = true;
2058                 return;
2059         }
2060
2061         if (!state->listemptyrecords &&
2062             data.dsize == sizeof(struct ctdb_ltdb_header))
2063         {
2064                 /* empty records are deleted records in ctdb */
2065                 return;
2066         }
2067
2068         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2069                 state->done = true;
2070         }
2071
2072         state->count++;
2073 }
2074
2075 /**
2076  * start a cluster wide traverse, calling the supplied fn on each record
2077  * return the number of records traversed, or -1 on error
2078  *
2079  * Extendet variant with a flag to signal whether empty records should
2080  * be listed.
2081  */
2082 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2083                              ctdb_traverse_func fn,
2084                              bool withemptyrecords,
2085                              void *private_data)
2086 {
2087         TDB_DATA data;
2088         struct ctdb_traverse_start_ext t;
2089         int32_t status;
2090         int ret;
2091         uint64_t srvid = (getpid() | 0xFLL<<60);
2092         struct traverse_state state;
2093
2094         state.done = false;
2095         state.count = 0;
2096         state.private_data = private_data;
2097         state.fn = fn;
2098         state.listemptyrecords = withemptyrecords;
2099
2100         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2103                 return -1;
2104         }
2105
2106         t.db_id = ctdb_db->db_id;
2107         t.srvid = srvid;
2108         t.reqid = 0;
2109         t.withemptyrecords = withemptyrecords;
2110
2111         data.dptr = (uint8_t *)&t;
2112         data.dsize = sizeof(t);
2113
2114         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2115                            data, NULL, NULL, &status, NULL, NULL);
2116         if (ret != 0 || status != 0) {
2117                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2118                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2119                 return -1;
2120         }
2121
2122         while (!state.done) {
2123                 event_loop_once(ctdb_db->ctdb->ev);
2124         }
2125
2126         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2127         if (ret != 0) {
2128                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2129                 return -1;
2130         }
2131
2132         return state.count;
2133 }
2134
2135 /**
2136  * start a cluster wide traverse, calling the supplied fn on each record
2137  * return the number of records traversed, or -1 on error
2138  *
2139  * Standard version which does not list the empty records:
2140  * These are considered deleted.
2141  */
2142 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2143 {
2144         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2145 }
2146
2147 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2148 /*
2149   called on each key during a catdb
2150  */
2151 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2152 {
2153         int i;
2154         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2155         FILE *f = c->f;
2156         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2157
2158         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2159         for (i=0;i<key.dsize;i++) {
2160                 if (ISASCII(key.dptr[i])) {
2161                         fprintf(f, "%c", key.dptr[i]);
2162                 } else {
2163                         fprintf(f, "\\%02X", key.dptr[i]);
2164                 }
2165         }
2166         fprintf(f, "\"\n");
2167
2168         fprintf(f, "dmaster: %u\n", h->dmaster);
2169         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2170
2171         if (c->printlmaster && ctdb->vnn_map != NULL) {
2172                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2173         }
2174
2175         if (c->printhash) {
2176                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2177         }
2178
2179         if (c->printrecordflags) {
2180                 fprintf(f, "flags: 0x%08x", h->flags);
2181                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2182                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2183                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2184                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2185                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2186                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2187                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2188                 fprintf(f, "\n");
2189         }
2190
2191         if (c->printdatasize) {
2192                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2193         } else {
2194                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2195                 for (i=sizeof(*h);i<data.dsize;i++) {
2196                         if (ISASCII(data.dptr[i])) {
2197                                 fprintf(f, "%c", data.dptr[i]);
2198                         } else {
2199                                 fprintf(f, "\\%02X", data.dptr[i]);
2200                         }
2201                 }
2202                 fprintf(f, "\"\n");
2203         }
2204
2205         fprintf(f, "\n");
2206
2207         return 0;
2208 }
2209
2210 /*
2211   convenience function to list all keys to stdout
2212  */
2213 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2214                  struct ctdb_dump_db_context *ctx)
2215 {
2216         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2217                                  ctx->printemptyrecords, ctx);
2218 }
2219
2220 /*
2221   get the pid of a ctdb daemon
2222  */
2223 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2224 {
2225         int ret;
2226         int32_t res;
2227
2228         ret = ctdb_control(ctdb, destnode, 0, 
2229                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2230                            NULL, NULL, &res, &timeout, NULL);
2231         if (ret != 0) {
2232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2233                 return -1;
2234         }
2235
2236         *pid = res;
2237
2238         return 0;
2239 }
2240
2241
2242 /*
2243   async freeze send control
2244  */
2245 struct ctdb_client_control_state *
2246 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2247 {
2248         return ctdb_control_send(ctdb, destnode, priority, 
2249                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2250                            mem_ctx, &timeout, NULL);
2251 }
2252
2253 /* 
2254    async freeze recv control
2255 */
2256 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2257 {
2258         int ret;
2259         int32_t res;
2260
2261         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2262         if ( (ret != 0) || (res != 0) ){
2263                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2264                 return -1;
2265         }
2266
2267         return 0;
2268 }
2269
2270 /*
2271   freeze databases of a certain priority
2272  */
2273 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2274 {
2275         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2276         struct ctdb_client_control_state *state;
2277         int ret;
2278
2279         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2280         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2281         talloc_free(tmp_ctx);
2282
2283         return ret;
2284 }
2285
2286 /* Freeze all databases */
2287 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2288 {
2289         int i;
2290
2291         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2292                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2293                         return -1;
2294                 }
2295         }
2296         return 0;
2297 }
2298
2299 /*
2300   thaw databases of a certain priority
2301  */
2302 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2303 {
2304         int ret;
2305         int32_t res;
2306
2307         ret = ctdb_control(ctdb, destnode, priority, 
2308                            CTDB_CONTROL_THAW, 0, tdb_null, 
2309                            NULL, NULL, &res, &timeout, NULL);
2310         if (ret != 0 || res != 0) {
2311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2312                 return -1;
2313         }
2314
2315         return 0;
2316 }
2317
2318 /* thaw all databases */
2319 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2320 {
2321         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2322 }
2323
2324 /*
2325   get pnn of a node, or -1
2326  */
2327 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2328 {
2329         int ret;
2330         int32_t res;
2331
2332         ret = ctdb_control(ctdb, destnode, 0, 
2333                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2334                            NULL, NULL, &res, &timeout, NULL);
2335         if (ret != 0) {
2336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2337                 return -1;
2338         }
2339
2340         return res;
2341 }
2342
2343 /*
2344   get the monitoring mode of a remote node
2345  */
2346 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2347 {
2348         int ret;
2349         int32_t res;
2350
2351         ret = ctdb_control(ctdb, destnode, 0, 
2352                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2353                            NULL, NULL, &res, &timeout, NULL);
2354         if (ret != 0) {
2355                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2356                 return -1;
2357         }
2358
2359         *monmode = res;
2360
2361         return 0;
2362 }
2363
2364
2365 /*
2366  set the monitoring mode of a remote node to active
2367  */
2368 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2369 {
2370         int ret;
2371         
2372
2373         ret = ctdb_control(ctdb, destnode, 0, 
2374                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2375                            NULL, NULL,NULL, &timeout, NULL);
2376         if (ret != 0) {
2377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2378                 return -1;
2379         }
2380
2381         
2382
2383         return 0;
2384 }
2385
2386 /*
2387   set the monitoring mode of a remote node to disable
2388  */
2389 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2390 {
2391         int ret;
2392         
2393
2394         ret = ctdb_control(ctdb, destnode, 0, 
2395                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2396                            NULL, NULL, NULL, &timeout, NULL);
2397         if (ret != 0) {
2398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2399                 return -1;
2400         }
2401
2402         
2403
2404         return 0;
2405 }
2406
2407
2408
2409 /* 
2410   sent to a node to make it take over an ip address
2411 */
2412 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2413                           uint32_t destnode, struct ctdb_public_ip *ip)
2414 {
2415         TDB_DATA data;
2416         struct ctdb_public_ipv4 ipv4;
2417         int ret;
2418         int32_t res;
2419
2420         if (ip->addr.sa.sa_family == AF_INET) {
2421                 ipv4.pnn = ip->pnn;
2422                 ipv4.sin = ip->addr.ip;
2423
2424                 data.dsize = sizeof(ipv4);
2425                 data.dptr  = (uint8_t *)&ipv4;
2426
2427                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2428                            NULL, &res, &timeout, NULL);
2429         } else {
2430                 data.dsize = sizeof(*ip);
2431                 data.dptr  = (uint8_t *)ip;
2432
2433                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2434                            NULL, &res, &timeout, NULL);
2435         }
2436
2437         if (ret != 0 || res != 0) {
2438                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2439                 return -1;
2440         }
2441
2442         return 0;       
2443 }
2444
2445
2446 /* 
2447   sent to a node to make it release an ip address
2448 */
2449 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2450                          uint32_t destnode, struct ctdb_public_ip *ip)
2451 {
2452         TDB_DATA data;
2453         struct ctdb_public_ipv4 ipv4;
2454         int ret;
2455         int32_t res;
2456
2457         if (ip->addr.sa.sa_family == AF_INET) {
2458                 ipv4.pnn = ip->pnn;
2459                 ipv4.sin = ip->addr.ip;
2460
2461                 data.dsize = sizeof(ipv4);
2462                 data.dptr  = (uint8_t *)&ipv4;
2463
2464                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2465                                    NULL, &res, &timeout, NULL);
2466         } else {
2467                 data.dsize = sizeof(*ip);
2468                 data.dptr  = (uint8_t *)ip;
2469
2470                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2471                                    NULL, &res, &timeout, NULL);
2472         }
2473
2474         if (ret != 0 || res != 0) {
2475                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2476                 return -1;
2477         }
2478
2479         return 0;       
2480 }
2481
2482
2483 /*
2484   get a tunable
2485  */
2486 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2487                           struct timeval timeout, 
2488                           uint32_t destnode,
2489                           const char *name, uint32_t *value)
2490 {
2491         struct ctdb_control_get_tunable *t;
2492         TDB_DATA data, outdata;
2493         int32_t res;
2494         int ret;
2495
2496         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2497         data.dptr  = talloc_size(ctdb, data.dsize);
2498         CTDB_NO_MEMORY(ctdb, data.dptr);
2499
2500         t = (struct ctdb_control_get_tunable *)data.dptr;
2501         t->length = strlen(name)+1;
2502         memcpy(t->name, name, t->length);
2503
2504         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2505                            &outdata, &res, &timeout, NULL);
2506         talloc_free(data.dptr);
2507         if (ret != 0 || res != 0) {
2508                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2509                 return ret != 0 ? ret : res;
2510         }
2511
2512         if (outdata.dsize != sizeof(uint32_t)) {
2513                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2514                 talloc_free(outdata.dptr);
2515                 return -1;
2516         }
2517         
2518         *value = *(uint32_t *)outdata.dptr;
2519         talloc_free(outdata.dptr);
2520
2521         return 0;
2522 }
2523
2524 /*
2525   set a tunable
2526  */
2527 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2528                           struct timeval timeout, 
2529                           uint32_t destnode,
2530                           const char *name, uint32_t value)
2531 {
2532         struct ctdb_control_set_tunable *t;
2533         TDB_DATA data;
2534         int32_t res;
2535         int ret;
2536
2537         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2538         data.dptr  = talloc_size(ctdb, data.dsize);
2539         CTDB_NO_MEMORY(ctdb, data.dptr);
2540
2541         t = (struct ctdb_control_set_tunable *)data.dptr;
2542         t->length = strlen(name)+1;
2543         memcpy(t->name, name, t->length);
2544         t->value = value;
2545
2546         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2547                            NULL, &res, &timeout, NULL);
2548         talloc_free(data.dptr);
2549         if (ret != 0 || res != 0) {
2550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2551                 return -1;
2552         }
2553
2554         return 0;
2555 }
2556
2557 /*
2558   list tunables
2559  */
2560 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2561                             struct timeval timeout, 
2562                             uint32_t destnode,
2563                             TALLOC_CTX *mem_ctx,
2564                             const char ***list, uint32_t *count)
2565 {
2566         TDB_DATA outdata;
2567         int32_t res;
2568         int ret;
2569         struct ctdb_control_list_tunable *t;
2570         char *p, *s, *ptr;
2571
2572         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2573                            mem_ctx, &outdata, &res, &timeout, NULL);
2574         if (ret != 0 || res != 0) {
2575                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2576                 return -1;
2577         }
2578
2579         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2580         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2581             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2582                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2583                 talloc_free(outdata.dptr);
2584                 return -1;              
2585         }
2586         
2587         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2588         CTDB_NO_MEMORY(ctdb, p);
2589
2590         talloc_free(outdata.dptr);
2591         
2592         (*list) = NULL;
2593         (*count) = 0;
2594
2595         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2596                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2597                 CTDB_NO_MEMORY(ctdb, *list);
2598                 (*list)[*count] = talloc_strdup(*list, s);
2599                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2600                 (*count)++;
2601         }
2602
2603         talloc_free(p);
2604
2605         return 0;
2606 }
2607
2608
2609 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2610                                    struct timeval timeout, uint32_t destnode,
2611                                    TALLOC_CTX *mem_ctx,
2612                                    uint32_t flags,
2613                                    struct ctdb_all_public_ips **ips)
2614 {
2615         int ret;
2616         TDB_DATA outdata;
2617         int32_t res;
2618
2619         ret = ctdb_control(ctdb, destnode, 0, 
2620                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2621                            mem_ctx, &outdata, &res, &timeout, NULL);
2622         if (ret == 0 && res == -1) {
2623                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2624                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2625         }
2626         if (ret != 0 || res != 0) {
2627           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2628                 return -1;
2629         }
2630
2631         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2632         talloc_free(outdata.dptr);
2633                     
2634         return 0;
2635 }
2636
2637 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2638                              struct timeval timeout, uint32_t destnode,
2639                              TALLOC_CTX *mem_ctx,
2640                              struct ctdb_all_public_ips **ips)
2641 {
2642         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2643                                               destnode, mem_ctx,
2644                                               0, ips);
2645 }
2646
2647 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2648                         struct timeval timeout, uint32_t destnode, 
2649                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2650 {
2651         int ret, i, len;
2652         TDB_DATA outdata;
2653         int32_t res;
2654         struct ctdb_all_public_ipsv4 *ipsv4;
2655
2656         ret = ctdb_control(ctdb, destnode, 0, 
2657                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2658                            mem_ctx, &outdata, &res, &timeout, NULL);
2659         if (ret != 0 || res != 0) {
2660                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2661                 return -1;
2662         }
2663
2664         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2665         len = offsetof(struct ctdb_all_public_ips, ips) +
2666                 ipsv4->num*sizeof(struct ctdb_public_ip);
2667         *ips = talloc_zero_size(mem_ctx, len);
2668         CTDB_NO_MEMORY(ctdb, *ips);
2669         (*ips)->num = ipsv4->num;
2670         for (i=0; i<ipsv4->num; i++) {
2671                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2672                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2673         }
2674
2675         talloc_free(outdata.dptr);
2676                     
2677         return 0;
2678 }
2679
2680 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2681                                  struct timeval timeout, uint32_t destnode,
2682                                  TALLOC_CTX *mem_ctx,
2683                                  const ctdb_sock_addr *addr,
2684                                  struct ctdb_control_public_ip_info **_info)
2685 {
2686         int ret;
2687         TDB_DATA indata;
2688         TDB_DATA outdata;
2689         int32_t res;
2690         struct ctdb_control_public_ip_info *info;
2691         uint32_t len;
2692         uint32_t i;
2693
2694         indata.dptr = discard_const_p(uint8_t, addr);
2695         indata.dsize = sizeof(*addr);
2696
2697         ret = ctdb_control(ctdb, destnode, 0,
2698                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2699                            mem_ctx, &outdata, &res, &timeout, NULL);
2700         if (ret != 0 || res != 0) {
2701                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2702                                 "failed ret:%d res:%d\n",
2703                                 ret, res));
2704                 return -1;
2705         }
2706
2707         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2708         if (len > outdata.dsize) {
2709                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2710                                 "returned invalid data with size %u > %u\n",
2711                                 (unsigned int)outdata.dsize,
2712                                 (unsigned int)len));
2713                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2714                 return -1;
2715         }
2716
2717         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2718         len += info->num*sizeof(struct ctdb_control_iface_info);
2719
2720         if (len > outdata.dsize) {
2721                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2722                                 "returned invalid data with size %u > %u\n",
2723                                 (unsigned int)outdata.dsize,
2724                                 (unsigned int)len));
2725                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2726                 return -1;
2727         }
2728
2729         /* make sure we null terminate the returned strings */
2730         for (i=0; i < info->num; i++) {
2731                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2732         }
2733
2734         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2735                                                                 outdata.dptr,
2736                                                                 outdata.dsize);
2737         talloc_free(outdata.dptr);
2738         if (*_info == NULL) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2740                                 "talloc_memdup size %u failed\n",
2741                                 (unsigned int)outdata.dsize));
2742                 return -1;
2743         }
2744
2745         return 0;
2746 }
2747
2748 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2749                          struct timeval timeout, uint32_t destnode,
2750                          TALLOC_CTX *mem_ctx,
2751                          struct ctdb_control_get_ifaces **_ifaces)
2752 {
2753         int ret;
2754         TDB_DATA outdata;
2755         int32_t res;
2756         struct ctdb_control_get_ifaces *ifaces;
2757         uint32_t len;
2758         uint32_t i;
2759
2760         ret = ctdb_control(ctdb, destnode, 0,
2761                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2762                            mem_ctx, &outdata, &res, &timeout, NULL);
2763         if (ret != 0 || res != 0) {
2764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2765                                 "failed ret:%d res:%d\n",
2766                                 ret, res));
2767                 return -1;
2768         }
2769
2770         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2771         if (len > outdata.dsize) {
2772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2773                                 "returned invalid data with size %u > %u\n",
2774                                 (unsigned int)outdata.dsize,
2775                                 (unsigned int)len));
2776                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2777                 return -1;
2778         }
2779
2780         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2781         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2782
2783         if (len > outdata.dsize) {
2784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2785                                 "returned invalid data with size %u > %u\n",
2786                                 (unsigned int)outdata.dsize,
2787                                 (unsigned int)len));
2788                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2789                 return -1;
2790         }
2791
2792         /* make sure we null terminate the returned strings */
2793         for (i=0; i < ifaces->num; i++) {
2794                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2795         }
2796
2797         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2798                                                                   outdata.dptr,
2799                                                                   outdata.dsize);
2800         talloc_free(outdata.dptr);
2801         if (*_ifaces == NULL) {
2802                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2803                                 "talloc_memdup size %u failed\n",
2804                                 (unsigned int)outdata.dsize));
2805                 return -1;
2806         }
2807
2808         return 0;
2809 }
2810
2811 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2812                              struct timeval timeout, uint32_t destnode,
2813                              TALLOC_CTX *mem_ctx,
2814                              const struct ctdb_control_iface_info *info)
2815 {
2816         int ret;
2817         TDB_DATA indata;
2818         int32_t res;
2819
2820         indata.dptr = discard_const_p(uint8_t, info);
2821         indata.dsize = sizeof(*info);
2822
2823         ret = ctdb_control(ctdb, destnode, 0,
2824                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2825                            mem_ctx, NULL, &res, &timeout, NULL);
2826         if (ret != 0 || res != 0) {
2827                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2828                                 "failed ret:%d res:%d\n",
2829                                 ret, res));
2830                 return -1;
2831         }
2832
2833         return 0;
2834 }
2835
2836 /*
2837   set/clear the permanent disabled bit on a remote node
2838  */
2839 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2840                        uint32_t set, uint32_t clear)
2841 {
2842         int ret;
2843         TDB_DATA data;
2844         struct ctdb_node_map *nodemap=NULL;
2845         struct ctdb_node_flag_change c;
2846         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2847         uint32_t recmaster;
2848         uint32_t *nodes;
2849
2850
2851         /* find the recovery master */
2852         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2853         if (ret != 0) {
2854                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2855                 talloc_free(tmp_ctx);
2856                 return ret;
2857         }
2858
2859
2860         /* read the node flags from the recmaster */
2861         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2862         if (ret != 0) {
2863                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2864                 talloc_free(tmp_ctx);
2865                 return -1;
2866         }
2867         if (destnode >= nodemap->num) {
2868                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2869                 talloc_free(tmp_ctx);
2870                 return -1;
2871         }
2872
2873         c.pnn       = destnode;
2874         c.old_flags = nodemap->nodes[destnode].flags;
2875         c.new_flags = c.old_flags;
2876         c.new_flags |= set;
2877         c.new_flags &= ~clear;
2878
2879         data.dsize = sizeof(c);
2880         data.dptr = (unsigned char *)&c;
2881
2882         /* send the flags update to all connected nodes */
2883         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2884
2885         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2886                                         nodes, 0,
2887                                         timeout, false, data,
2888                                         NULL, NULL,
2889                                         NULL) != 0) {
2890                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2891
2892                 talloc_free(tmp_ctx);
2893                 return -1;
2894         }
2895
2896         talloc_free(tmp_ctx);
2897         return 0;
2898 }
2899
2900
2901 /*
2902   get all tunables
2903  */
2904 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2905                                struct timeval timeout, 
2906                                uint32_t destnode,
2907                                struct ctdb_tunable *tunables)
2908 {
2909         TDB_DATA outdata;
2910         int ret;
2911         int32_t res;
2912
2913         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2914                            &outdata, &res, &timeout, NULL);
2915         if (ret != 0 || res != 0) {
2916                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2917                 return -1;
2918         }
2919
2920         if (outdata.dsize != sizeof(*tunables)) {
2921                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2922                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2923                 return -1;              
2924         }
2925
2926         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2927         talloc_free(outdata.dptr);
2928         return 0;
2929 }
2930
2931 /*
2932   add a public address to a node
2933  */
2934 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2935                       struct timeval timeout, 
2936                       uint32_t destnode,
2937                       struct ctdb_control_ip_iface *pub)
2938 {
2939         TDB_DATA data;
2940         int32_t res;
2941         int ret;
2942
2943         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2944         data.dptr  = (unsigned char *)pub;
2945
2946         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2947                            NULL, &res, &timeout, NULL);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2950                 return -1;
2951         }
2952
2953         return 0;
2954 }
2955
2956 /*
2957   delete a public address from a node
2958  */
2959 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2960                       struct timeval timeout, 
2961                       uint32_t destnode,
2962                       struct ctdb_control_ip_iface *pub)
2963 {
2964         TDB_DATA data;
2965         int32_t res;
2966         int ret;
2967
2968         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2969         data.dptr  = (unsigned char *)pub;
2970
2971         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2972                            NULL, &res, &timeout, NULL);
2973         if (ret != 0 || res != 0) {
2974                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2975                 return -1;
2976         }
2977
2978         return 0;
2979 }
2980
2981 /*
2982   kill a tcp connection
2983  */
2984 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2985                       struct timeval timeout, 
2986                       uint32_t destnode,
2987                       struct ctdb_control_killtcp *killtcp)
2988 {
2989         TDB_DATA data;
2990         int32_t res;
2991         int ret;
2992
2993         data.dsize = sizeof(struct ctdb_control_killtcp);
2994         data.dptr  = (unsigned char *)killtcp;
2995
2996         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2997                            NULL, &res, &timeout, NULL);
2998         if (ret != 0 || res != 0) {
2999                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3000                 return -1;
3001         }
3002
3003         return 0;
3004 }
3005
3006 /*
3007   send a gratious arp
3008  */
3009 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3010                       struct timeval timeout, 
3011                       uint32_t destnode,
3012                       ctdb_sock_addr *addr,
3013                       const char *ifname)
3014 {
3015         TDB_DATA data;
3016         int32_t res;
3017         int ret, len;
3018         struct ctdb_control_gratious_arp *gratious_arp;
3019         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3020
3021
3022         len = strlen(ifname)+1;
3023         gratious_arp = talloc_size(tmp_ctx, 
3024                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3025         CTDB_NO_MEMORY(ctdb, gratious_arp);
3026
3027         gratious_arp->addr = *addr;
3028         gratious_arp->len = len;
3029         memcpy(&gratious_arp->iface[0], ifname, len);
3030
3031
3032         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3033         data.dptr  = (unsigned char *)gratious_arp;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3036                            NULL, &res, &timeout, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3039                 talloc_free(tmp_ctx);
3040                 return -1;
3041         }
3042
3043         talloc_free(tmp_ctx);
3044         return 0;
3045 }
3046
3047 /*
3048   get a list of all tcp tickles that a node knows about for a particular vnn
3049  */
3050 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3051                               struct timeval timeout, uint32_t destnode, 
3052                               TALLOC_CTX *mem_ctx, 
3053                               ctdb_sock_addr *addr,
3054                               struct ctdb_control_tcp_tickle_list **list)
3055 {
3056         int ret;
3057         TDB_DATA data, outdata;
3058         int32_t status;
3059
3060         data.dptr = (uint8_t*)addr;
3061         data.dsize = sizeof(ctdb_sock_addr);
3062
3063         ret = ctdb_control(ctdb, destnode, 0, 
3064                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3065                            mem_ctx, &outdata, &status, NULL, NULL);
3066         if (ret != 0 || status != 0) {
3067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3068                 return -1;
3069         }
3070
3071         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3072
3073         return status;
3074 }
3075
3076 /*
3077   register a server id
3078  */
3079 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3080                       struct timeval timeout, 
3081                       struct ctdb_server_id *id)
3082 {
3083         TDB_DATA data;
3084         int32_t res;
3085         int ret;
3086
3087         data.dsize = sizeof(struct ctdb_server_id);
3088         data.dptr  = (unsigned char *)id;
3089
3090         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3091                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3092                         0, data, NULL,
3093                         NULL, &res, &timeout, NULL);
3094         if (ret != 0 || res != 0) {
3095                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3096                 return -1;
3097         }
3098
3099         return 0;
3100 }
3101
3102 /*
3103   unregister a server id
3104  */
3105 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3106                       struct timeval timeout, 
3107                       struct ctdb_server_id *id)
3108 {
3109         TDB_DATA data;
3110         int32_t res;
3111         int ret;
3112
3113         data.dsize = sizeof(struct ctdb_server_id);
3114         data.dptr  = (unsigned char *)id;
3115
3116         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3117                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3118                         0, data, NULL,
3119                         NULL, &res, &timeout, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128
3129 /*
3130   check if a server id exists
3131
3132   if a server id does exist, return *status == 1, otherwise *status == 0
3133  */
3134 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3135                       struct timeval timeout, 
3136                       uint32_t destnode,
3137                       struct ctdb_server_id *id,
3138                       uint32_t *status)
3139 {
3140         TDB_DATA data;
3141         int32_t res;
3142         int ret;
3143
3144         data.dsize = sizeof(struct ctdb_server_id);
3145         data.dptr  = (unsigned char *)id;
3146
3147         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3148                         0, data, NULL,
3149                         NULL, &res, &timeout, NULL);
3150         if (ret != 0) {
3151                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3152                 return -1;
3153         }
3154
3155         if (res) {
3156                 *status = 1;
3157         } else {
3158                 *status = 0;
3159         }
3160
3161         return 0;
3162 }
3163
3164 /*
3165    get the list of server ids that are registered on a node
3166 */
3167 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3168                 TALLOC_CTX *mem_ctx,
3169                 struct timeval timeout, uint32_t destnode, 
3170                 struct ctdb_server_id_list **svid_list)
3171 {
3172         int ret;
3173         TDB_DATA outdata;
3174         int32_t res;
3175
3176         ret = ctdb_control(ctdb, destnode, 0, 
3177                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3178                            mem_ctx, &outdata, &res, &timeout, NULL);
3179         if (ret != 0 || res != 0) {
3180                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3181                 return -1;
3182         }
3183
3184         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3185                     
3186         return 0;
3187 }
3188
3189 /*
3190   initialise the ctdb daemon for client applications
3191
3192   NOTE: In current code the daemon does not fork. This is for testing purposes only
3193   and to simplify the code.
3194 */
3195 struct ctdb_context *ctdb_init(struct event_context *ev)
3196 {
3197         int ret;
3198         struct ctdb_context *ctdb;
3199
3200         ctdb = talloc_zero(ev, struct ctdb_context);
3201         if (ctdb == NULL) {
3202                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3203                 return NULL;
3204         }
3205         ctdb->ev  = ev;
3206         ctdb->idr = idr_init(ctdb);
3207         /* Wrap early to exercise code. */
3208         ctdb->lastid = INT_MAX-200;
3209         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3210
3211         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3212         if (ret != 0) {
3213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3214                 talloc_free(ctdb);
3215                 return NULL;
3216         }
3217
3218         ctdb->statistics.statistics_start_time = timeval_current();
3219
3220         return ctdb;
3221 }
3222
3223
3224 /*
3225   set some ctdb flags
3226 */
3227 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3228 {
3229         ctdb->flags |= flags;
3230 }
3231
3232 /*
3233   setup the local socket name
3234 */
3235 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3236 {
3237         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3238         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3239
3240         return 0;
3241 }
3242
3243 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3244 {
3245         return ctdb->daemon.name;
3246 }
3247
3248 /*
3249   return the pnn of this node
3250 */
3251 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3252 {
3253         return ctdb->pnn;
3254 }
3255
3256
3257 /*
3258   get the uptime of a remote node
3259  */
3260 struct ctdb_client_control_state *
3261 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3262 {
3263         return ctdb_control_send(ctdb, destnode, 0, 
3264                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3265                            mem_ctx, &timeout, NULL);
3266 }
3267
3268 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3269 {
3270         int ret;
3271         int32_t res;
3272         TDB_DATA outdata;
3273
3274         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3275         if (ret != 0 || res != 0) {
3276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3277                 return -1;
3278         }
3279
3280         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3281
3282         return 0;
3283 }
3284
3285 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3286 {
3287         struct ctdb_client_control_state *state;
3288
3289         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3290         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3291 }
3292
3293 /*
3294   send a control to execute the "recovered" event script on a node
3295  */
3296 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3297 {
3298         int ret;
3299         int32_t status;
3300
3301         ret = ctdb_control(ctdb, destnode, 0, 
3302                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3303                            NULL, NULL, &status, &timeout, NULL);
3304         if (ret != 0 || status != 0) {
3305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3306                 return -1;
3307         }
3308
3309         return 0;
3310 }
3311
3312 /* 
3313   callback for the async helpers used when sending the same control
3314   to multiple nodes in parallell.
3315 */
3316 static void async_callback(struct ctdb_client_control_state *state)
3317 {
3318         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3319         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3320         int ret;
3321         TDB_DATA outdata;
3322         int32_t res = -1;
3323         uint32_t destnode = state->c->hdr.destnode;
3324
3325         /* one more node has responded with recmode data */
3326         data->count--;
3327
3328         /* if we failed to push the db, then return an error and let
3329            the main loop try again.
3330         */
3331         if (state->state != CTDB_CONTROL_DONE) {
3332                 if ( !data->dont_log_errors) {
3333                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3334                 }
3335                 data->fail_count++;
3336                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3337                         res = -ETIME;
3338                 } else {
3339                         res = -1;
3340                 }
3341                 if (data->fail_callback) {
3342                         data->fail_callback(ctdb, destnode, res, outdata,
3343                                         data->callback_data);
3344                 }
3345                 return;
3346         }
3347         
3348         state->async.fn = NULL;
3349
3350         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3351         if ((ret != 0) || (res != 0)) {
3352                 if ( !data->dont_log_errors) {
3353                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3354                 }
3355                 data->fail_count++;
3356                 if (data->fail_callback) {
3357                         data->fail_callback(ctdb, destnode, res, outdata,
3358                                         data->callback_data);
3359                 }
3360         }
3361         if ((ret == 0) && (data->callback != NULL)) {
3362                 data->callback(ctdb, destnode, res, outdata,
3363                                         data->callback_data);
3364         }
3365 }
3366
3367
3368 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3369 {
3370         /* set up the callback functions */
3371         state->async.fn = async_callback;
3372         state->async.private_data = data;
3373         
3374         /* one more control to wait for to complete */
3375         data->count++;
3376 }
3377
3378
3379 /* wait for up to the maximum number of seconds allowed
3380    or until all nodes we expect a response from has replied
3381 */
3382 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3383 {
3384         while (data->count > 0) {
3385                 event_loop_once(ctdb->ev);
3386         }
3387         if (data->fail_count != 0) {
3388                 if (!data->dont_log_errors) {
3389                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3390                                  data->fail_count));
3391                 }
3392                 return -1;
3393         }
3394         return 0;
3395 }
3396
3397
3398 /* 
3399    perform a simple control on the listed nodes
3400    The control cannot return data
3401  */
3402 int ctdb_client_async_control(struct ctdb_context *ctdb,
3403                                 enum ctdb_controls opcode,
3404                                 uint32_t *nodes,
3405                                 uint64_t srvid,
3406                                 struct timeval timeout,
3407                                 bool dont_log_errors,
3408                                 TDB_DATA data,
3409                                 client_async_callback client_callback,
3410                                 client_async_callback fail_callback,
3411                                 void *callback_data)
3412 {
3413         struct client_async_data *async_data;
3414         struct ctdb_client_control_state *state;
3415         int j, num_nodes;
3416
3417         async_data = talloc_zero(ctdb, struct client_async_data);
3418         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3419         async_data->dont_log_errors = dont_log_errors;
3420         async_data->callback = client_callback;
3421         async_data->fail_callback = fail_callback;
3422         async_data->callback_data = callback_data;
3423         async_data->opcode        = opcode;
3424
3425         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3426
3427         /* loop over all nodes and send an async control to each of them */
3428         for (j=0; j<num_nodes; j++) {
3429                 uint32_t pnn = nodes[j];
3430
3431                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3432                                           0, data, async_data, &timeout, NULL);
3433                 if (state == NULL) {
3434                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3435                         talloc_free(async_data);
3436                         return -1;
3437                 }
3438                 
3439                 ctdb_client_async_add(async_data, state);
3440         }
3441
3442         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3443                 talloc_free(async_data);
3444                 return -1;
3445         }
3446
3447         talloc_free(async_data);
3448         return 0;
3449 }
3450
3451 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3452                                 struct ctdb_vnn_map *vnn_map,
3453                                 TALLOC_CTX *mem_ctx,
3454                                 bool include_self)
3455 {
3456         int i, j, num_nodes;
3457         uint32_t *nodes;
3458
3459         for (i=num_nodes=0;i<vnn_map->size;i++) {
3460                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3461                         continue;
3462                 }
3463                 num_nodes++;
3464         } 
3465
3466         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3467         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3468
3469         for (i=j=0;i<vnn_map->size;i++) {
3470                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3471                         continue;
3472                 }
3473                 nodes[j++] = vnn_map->map[i];
3474         } 
3475
3476         return nodes;
3477 }
3478
3479 /* Get list of nodes not including those with flags specified by mask.
3480  * If exclude_pnn is not -1 then exclude that pnn from the list.
3481  */
3482 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3483                         struct ctdb_node_map *node_map,
3484                         TALLOC_CTX *mem_ctx,
3485                         uint32_t mask,
3486                         int exclude_pnn)
3487 {
3488         int i, j, num_nodes;
3489         uint32_t *nodes;
3490
3491         for (i=num_nodes=0;i<node_map->num;i++) {
3492                 if (node_map->nodes[i].flags & mask) {
3493                         continue;
3494                 }
3495                 if (node_map->nodes[i].pnn == exclude_pnn) {
3496                         continue;
3497                 }
3498                 num_nodes++;
3499         } 
3500
3501         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3502         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3503
3504         for (i=j=0;i<node_map->num;i++) {
3505                 if (node_map->nodes[i].flags & mask) {
3506                         continue;
3507                 }
3508                 if (node_map->nodes[i].pnn == exclude_pnn) {
3509                         continue;
3510                 }
3511                 nodes[j++] = node_map->nodes[i].pnn;
3512         } 
3513
3514         return nodes;
3515 }
3516
3517 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3518                                 struct ctdb_node_map *node_map,
3519                                 TALLOC_CTX *mem_ctx,
3520                                 bool include_self)
3521 {
3522         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3523                              include_self ? -1 : ctdb->pnn);
3524 }
3525
3526 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3527                                 struct ctdb_node_map *node_map,
3528                                 TALLOC_CTX *mem_ctx,
3529                                 uint32_t pnn)
3530 {
3531         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE, pnn);
3532 }
3533
3534 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3535                                 struct ctdb_node_map *node_map,
3536                                 TALLOC_CTX *mem_ctx,
3537                                 bool include_self)
3538 {
3539         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3540                              include_self ? -1 : ctdb->pnn);
3541 }
3542
3543 /* 
3544   this is used to test if a pnn lock exists and if it exists will return
3545   the number of connections that pnn has reported or -1 if that recovery
3546   daemon is not running.
3547 */
3548 int
3549 ctdb_read_pnn_lock(int fd, int32_t pnn)
3550 {
3551         struct flock lock;
3552         char c;
3553
3554         lock.l_type = F_WRLCK;
3555         lock.l_whence = SEEK_SET;
3556         lock.l_start = pnn;
3557         lock.l_len = 1;
3558         lock.l_pid = 0;
3559
3560         if (fcntl(fd, F_GETLK, &lock) != 0) {
3561                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3562                 return -1;
3563         }
3564
3565         if (lock.l_type == F_UNLCK) {
3566                 return -1;
3567         }
3568
3569         if (pread(fd, &c, 1, pnn) == -1) {
3570                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3571                 return -1;
3572         }
3573
3574         return c;
3575 }
3576
3577 /*
3578   get capabilities of a remote node
3579  */
3580 struct ctdb_client_control_state *
3581 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3582 {
3583         return ctdb_control_send(ctdb, destnode, 0, 
3584                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3585                            mem_ctx, &timeout, NULL);
3586 }
3587
3588 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3589 {
3590         int ret;
3591         int32_t res;
3592         TDB_DATA outdata;
3593
3594         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3595         if ( (ret != 0) || (res != 0) ) {
3596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3597                 return -1;
3598         }
3599
3600         if (capabilities) {
3601                 *capabilities = *((uint32_t *)outdata.dptr);
3602         }
3603
3604         return 0;
3605 }
3606
3607 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3608 {
3609         struct ctdb_client_control_state *state;
3610         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3611         int ret;
3612
3613         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3614         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3615         talloc_free(tmp_ctx);
3616         return ret;
3617 }
3618
3619 /**
3620  * check whether a transaction is active on a given db on a given node
3621  */
3622 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3623                                      uint32_t destnode,
3624                                      uint32_t db_id)
3625 {
3626         int32_t status;
3627         int ret;
3628         TDB_DATA indata;
3629
3630         indata.dptr = (uint8_t *)&db_id;
3631         indata.dsize = sizeof(db_id);
3632
3633         ret = ctdb_control(ctdb, destnode, 0,
3634                            CTDB_CONTROL_TRANS2_ACTIVE,
3635                            0, indata, NULL, NULL, &status,
3636                            NULL, NULL);
3637
3638         if (ret != 0) {
3639                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3640                 return -1;
3641         }
3642
3643         return status;
3644 }
3645
3646
3647 struct ctdb_transaction_handle {
3648         struct ctdb_db_context *ctdb_db;
3649         bool in_replay;
3650         /*
3651          * we store the reads and writes done under a transaction:
3652          * - one list stores both reads and writes (m_all),
3653          * - the other just writes (m_write)
3654          */
3655         struct ctdb_marshall_buffer *m_all;
3656         struct ctdb_marshall_buffer *m_write;
3657 };
3658
3659 /* start a transaction on a database */
3660 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3661 {
3662         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3663         return 0;
3664 }
3665
3666 /* start a transaction on a database */
3667 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3668 {
3669         struct ctdb_record_handle *rh;
3670         TDB_DATA key;
3671         TDB_DATA data;
3672         struct ctdb_ltdb_header header;
3673         TALLOC_CTX *tmp_ctx;
3674         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3675         int ret;
3676         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3677         pid_t pid;
3678         int32_t status;
3679
3680         key.dptr = discard_const(keyname);
3681         key.dsize = strlen(keyname);
3682
3683         if (!ctdb_db->persistent) {
3684                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3685                 return -1;
3686         }
3687
3688 again:
3689         tmp_ctx = talloc_new(h);
3690
3691         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3692         if (rh == NULL) {
3693                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3694                 talloc_free(tmp_ctx);
3695                 return -1;
3696         }
3697
3698         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3699                                               CTDB_CURRENT_NODE,
3700                                               ctdb_db->db_id);
3701         if (status == 1) {
3702                 unsigned long int usec = (1000 + random()) % 100000;
3703                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3704                                     "on db_id[0x%08x]. waiting for %lu "
3705                                     "microseconds\n",
3706                                     ctdb_db->db_id, usec));
3707                 talloc_free(tmp_ctx);
3708                 usleep(usec);
3709                 goto again;
3710         }
3711
3712         /*
3713          * store the pid in the database:
3714          * it is not enough that the node is dmaster...
3715          */
3716         pid = getpid();
3717         data.dptr = (unsigned char *)&pid;
3718         data.dsize = sizeof(pid_t);
3719         rh->header.rsn++;
3720         rh->header.dmaster = ctdb_db->ctdb->pnn;
3721         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3722         if (ret != 0) {
3723                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3724                                   "transaction record\n"));
3725                 talloc_free(tmp_ctx);
3726                 return -1;
3727         }
3728
3729         talloc_free(rh);
3730
3731         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3732         if (ret != 0) {
3733                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3734                 talloc_free(tmp_ctx);
3735                 return -1;
3736         }
3737
3738         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3739         if (ret != 0) {
3740                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3741                                  "lock record inside transaction\n"));
3742                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3743                 talloc_free(tmp_ctx);
3744                 goto again;
3745         }
3746
3747         if (header.dmaster != ctdb_db->ctdb->pnn) {
3748                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3749                                    "transaction lock record\n"));
3750                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3751                 talloc_free(tmp_ctx);
3752                 goto again;
3753         }
3754
3755         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3756                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3757                                     "the transaction lock record\n"));
3758                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3759                 talloc_free(tmp_ctx);
3760                 goto again;
3761         }
3762
3763         talloc_free(tmp_ctx);
3764
3765         return 0;
3766 }
3767
3768
3769 /* start a transaction on a database */
3770 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3771                                                        TALLOC_CTX *mem_ctx)
3772 {
3773         struct ctdb_transaction_handle *h;
3774         int ret;
3775
3776         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3777         if (h == NULL) {
3778                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3779                 return NULL;
3780         }
3781
3782         h->ctdb_db = ctdb_db;
3783
3784         ret = ctdb_transaction_fetch_start(h);
3785         if (ret != 0) {
3786                 talloc_free(h);
3787                 return NULL;
3788         }
3789
3790         talloc_set_destructor(h, ctdb_transaction_destructor);
3791
3792         return h;
3793 }
3794
3795
3796
3797 /*
3798   fetch a record inside a transaction
3799  */
3800 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3801                            TALLOC_CTX *mem_ctx, 
3802                            TDB_DATA key, TDB_DATA *data)
3803 {
3804         struct ctdb_ltdb_header header;
3805         int ret;
3806
3807         ZERO_STRUCT(header);
3808
3809         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3810         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3811                 /* record doesn't exist yet */
3812                 *data = tdb_null;
3813                 ret = 0;
3814         }
3815         
3816         if (ret != 0) {
3817                 return ret;
3818         }
3819
3820         if (!h->in_replay) {
3821                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3822                 if (h->m_all == NULL) {
3823                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3824                         return -1;
3825                 }
3826         }
3827
3828         return 0;
3829 }
3830
3831 /*
3832   stores a record inside a transaction
3833  */
3834 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3835                            TDB_DATA key, TDB_DATA data)
3836 {
3837         TALLOC_CTX *tmp_ctx = talloc_new(h);
3838         struct ctdb_ltdb_header header;
3839         TDB_DATA olddata;
3840         int ret;
3841
3842         ZERO_STRUCT(header);
3843
3844         /* we need the header so we can update the RSN */
3845         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3846         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3847                 /* the record doesn't exist - create one with us as dmaster.
3848                    This is only safe because we are in a transaction and this
3849                    is a persistent database */
3850                 ZERO_STRUCT(header);
3851         } else if (ret != 0) {
3852                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3853                 talloc_free(tmp_ctx);
3854                 return ret;
3855         }
3856
3857         if (data.dsize == olddata.dsize &&
3858             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3859                 /* save writing the same data */
3860                 talloc_free(tmp_ctx);
3861                 return 0;
3862         }
3863
3864         header.dmaster = h->ctdb_db->ctdb->pnn;
3865         header.rsn++;
3866
3867         if (!h->in_replay) {
3868                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3869                 if (h->m_all == NULL) {
3870                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3871                         talloc_free(tmp_ctx);
3872                         return -1;
3873                 }
3874         }               
3875
3876         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3877         if (h->m_write == NULL) {
3878                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3879                 talloc_free(tmp_ctx);
3880                 return -1;
3881         }
3882         
3883         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3884
3885         talloc_free(tmp_ctx);
3886         
3887         return ret;
3888 }
3889
3890 /*
3891   replay a transaction
3892  */
3893 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3894 {
3895         int ret, i;
3896         struct ctdb_rec_data *rec = NULL;
3897
3898         h->in_replay = true;
3899         talloc_free(h->m_write);
3900         h->m_write = NULL;
3901
3902         ret = ctdb_transaction_fetch_start(h);
3903         if (ret != 0) {
3904                 return ret;
3905         }
3906
3907         for (i=0;i<h->m_all->count;i++) {
3908                 TDB_DATA key, data;
3909
3910                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3911                 if (rec == NULL) {
3912                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3913                         goto failed;
3914                 }
3915
3916                 if (rec->reqid == 0) {
3917                         /* its a store */
3918                         if (ctdb_transaction_store(h, key, data) != 0) {
3919                                 goto failed;
3920                         }
3921                 } else {
3922                         TDB_DATA data2;
3923                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3924
3925                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3926                                 talloc_free(tmp_ctx);
3927                                 goto failed;
3928                         }
3929                         if (data2.dsize != data.dsize ||
3930                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3931                                 /* the record has changed on us - we have to give up */
3932                                 talloc_free(tmp_ctx);
3933                                 goto failed;
3934                         }
3935                         talloc_free(tmp_ctx);
3936                 }
3937         }
3938         
3939         return 0;
3940
3941 failed:
3942         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3943         return -1;
3944 }
3945
3946
3947 /*
3948   commit a transaction
3949  */
3950 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3951 {
3952         int ret, retries=0;
3953         int32_t status;
3954         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3955         struct timeval timeout;
3956         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3957
3958         talloc_set_destructor(h, NULL);
3959
3960         /* our commit strategy is quite complex.
3961
3962            - we first try to commit the changes to all other nodes
3963
3964            - if that works, then we commit locally and we are done
3965
3966            - if a commit on another node fails, then we need to cancel
3967              the transaction, then restart the transaction (thus
3968              opening a window of time for a pending recovery to
3969              complete), then replay the transaction, checking all the
3970              reads and writes (checking that reads give the same data,
3971              and writes succeed). Then we retry the transaction to the
3972              other nodes
3973         */
3974
3975 again:
3976         if (h->m_write == NULL) {
3977                 /* no changes were made */
3978                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3979                 talloc_free(h);
3980                 return 0;
3981         }
3982
3983         /* tell ctdbd to commit to the other nodes */
3984         timeout = timeval_current_ofs(1, 0);
3985         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3986                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3987                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3988                            &timeout, NULL);
3989         if (ret != 0 || status != 0) {
3990                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3991                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3992                                      ", retrying after 1 second...\n",
3993                                      (retries==0)?"":"retry "));
3994                 sleep(1);
3995
3996                 if (ret != 0) {
3997                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3998                 } else {
3999                         /* work out what error code we will give if we 
4000                            have to fail the operation */
4001                         switch ((enum ctdb_trans2_commit_error)status) {
4002                         case CTDB_TRANS2_COMMIT_SUCCESS:
4003                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4004                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4005                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4006                                 break;
4007                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4008                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4009                                 break;
4010                         }
4011                 }
4012
4013                 if (++retries == 100) {
4014                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4015                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4016                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4017                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4018                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4019                         talloc_free(h);
4020                         return -1;
4021                 }               
4022
4023                 if (ctdb_replay_transaction(h) != 0) {
4024                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4025                                           "transaction on db 0x%08x, "
4026                                           "failure control =%u\n",
4027                                           h->ctdb_db->db_id,
4028                                           (unsigned)failure_control));
4029                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4030                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4031                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4032                         talloc_free(h);
4033                         return -1;
4034                 }
4035                 goto again;
4036         } else {
4037                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4038         }
4039
4040         /* do the real commit locally */
4041         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4042         if (ret != 0) {
4043                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4044                                   "on db id 0x%08x locally, "
4045                                   "failure_control=%u\n",
4046                                   h->ctdb_db->db_id,
4047                                   (unsigned)failure_control));
4048                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4049                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4050                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4051                 talloc_free(h);
4052                 return ret;
4053         }
4054
4055         /* tell ctdbd that we are finished with our local commit */
4056         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4057                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4058                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4059         talloc_free(h);
4060         return 0;
4061 }
4062
4063 /*
4064   recovery daemon ping to main daemon
4065  */
4066 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4067 {
4068         int ret;
4069         int32_t res;
4070
4071         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4072                            ctdb, NULL, &res, NULL, NULL);
4073         if (ret != 0 || res != 0) {
4074                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4075                 return -1;
4076         }
4077
4078         return 0;
4079 }
4080
4081 /* When forking the main daemon and the child process needs to connect
4082  * back to the daemon as a client process, this function can be used
4083  * to change the ctdb context from daemon into client mode.  The child
4084  * process must be created using ctdb_fork() and not fork() -
4085  * ctdb_fork() does some necessary housekeeping.
4086  */
4087 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4088 {
4089         int ret;
4090         va_list ap;
4091
4092         /* Add extra information so we can identify this in the logs */
4093         va_start(ap, fmt);
4094         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4095         va_end(ap);
4096
4097         /* get a new event context */
4098         ctdb->ev = event_context_init(ctdb);
4099         tevent_loop_allow_nesting(ctdb->ev);
4100
4101         /* Connect to main CTDB daemon */
4102         ret = ctdb_socket_connect(ctdb);
4103         if (ret != 0) {
4104                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4105                 return -1;
4106         }
4107
4108         ctdb->can_send_controls = true;
4109
4110         return 0;
4111 }
4112
4113 /*
4114   get the status of running the monitor eventscripts: NULL means never run.
4115  */
4116 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4117                 struct timeval timeout, uint32_t destnode, 
4118                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4119                 struct ctdb_scripts_wire **scripts)
4120 {
4121         int ret;
4122         TDB_DATA outdata, indata;
4123         int32_t res;
4124         uint32_t uinttype = type;
4125
4126         indata.dptr = (uint8_t *)&uinttype;
4127         indata.dsize = sizeof(uinttype);
4128
4129         ret = ctdb_control(ctdb, destnode, 0, 
4130                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4131                            mem_ctx, &outdata, &res, &timeout, NULL);
4132         if (ret != 0 || res != 0) {
4133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4134                 return -1;
4135         }
4136
4137         if (outdata.dsize == 0) {
4138                 *scripts = NULL;
4139         } else {
4140                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4141                 talloc_free(outdata.dptr);
4142         }
4143                     
4144         return 0;
4145 }
4146
4147 /*
4148   tell the main daemon how long it took to lock the reclock file
4149  */
4150 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4151 {
4152         int ret;
4153         int32_t res;
4154         TDB_DATA data;
4155
4156         data.dptr = (uint8_t *)&latency;
4157         data.dsize = sizeof(latency);
4158
4159         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4160                            ctdb, NULL, &res, NULL, NULL);
4161         if (ret != 0 || res != 0) {
4162                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4163                 return -1;
4164         }
4165
4166         return 0;
4167 }
4168
4169 /*
4170   get the name of the reclock file
4171  */
4172 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4173                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4174                          const char **name)
4175 {
4176         int ret;
4177         int32_t res;
4178         TDB_DATA data;
4179
4180         ret = ctdb_control(ctdb, destnode, 0, 
4181                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4182                            mem_ctx, &data, &res, &timeout, NULL);
4183         if (ret != 0 || res != 0) {
4184                 return -1;
4185         }
4186
4187         if (data.dsize == 0) {
4188                 *name = NULL;
4189         } else {
4190                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4191         }
4192         talloc_free(data.dptr);
4193
4194         return 0;
4195 }
4196
4197 /*
4198   set the reclock filename for a node
4199  */
4200 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4201 {
4202         int ret;
4203         TDB_DATA data;
4204         int32_t res;
4205
4206         if (reclock == NULL) {
4207                 data.dsize = 0;
4208                 data.dptr  = NULL;
4209         } else {
4210                 data.dsize = strlen(reclock) + 1;
4211                 data.dptr  = discard_const(reclock);
4212         }
4213
4214         ret = ctdb_control(ctdb, destnode, 0, 
4215                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4216                            NULL, NULL, &res, &timeout, NULL);
4217         if (ret != 0 || res != 0) {
4218                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4219                 return -1;
4220         }
4221
4222         return 0;
4223 }
4224
4225 /*
4226   stop a node
4227  */
4228 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4229 {
4230         int ret;
4231         int32_t res;
4232
4233         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4234                            ctdb, NULL, &res, &timeout, NULL);
4235         if (ret != 0 || res != 0) {
4236                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4237                 return -1;
4238         }
4239
4240         return 0;
4241 }
4242
4243 /*
4244   continue a node
4245  */
4246 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4247 {
4248         int ret;
4249
4250         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4251                            ctdb, NULL, NULL, &timeout, NULL);
4252         if (ret != 0) {
4253                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4254                 return -1;
4255         }
4256
4257         return 0;
4258 }
4259
4260 /*
4261   set the natgw state for a node
4262  */
4263 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4264 {
4265         int ret;
4266         TDB_DATA data;
4267         int32_t res;
4268
4269         data.dsize = sizeof(natgwstate);
4270         data.dptr  = (uint8_t *)&natgwstate;
4271
4272         ret = ctdb_control(ctdb, destnode, 0, 
4273                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4274                            NULL, NULL, &res, &timeout, NULL);
4275         if (ret != 0 || res != 0) {
4276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4277                 return -1;
4278         }
4279
4280         return 0;
4281 }
4282
4283 /*
4284   set the lmaster role for a node
4285  */
4286 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4287 {
4288         int ret;
4289         TDB_DATA data;
4290         int32_t res;
4291
4292         data.dsize = sizeof(lmasterrole);
4293         data.dptr  = (uint8_t *)&lmasterrole;
4294
4295         ret = ctdb_control(ctdb, destnode, 0, 
4296                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4297                            NULL, NULL, &res, &timeout, NULL);
4298         if (ret != 0 || res != 0) {
4299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4300                 return -1;
4301         }
4302
4303         return 0;
4304 }
4305
4306 /*
4307   set the recmaster role for a node
4308  */
4309 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4310 {
4311         int ret;
4312         TDB_DATA data;
4313         int32_t res;
4314
4315         data.dsize = sizeof(recmasterrole);
4316         data.dptr  = (uint8_t *)&recmasterrole;
4317
4318         ret = ctdb_control(ctdb, destnode, 0, 
4319                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4320                            NULL, NULL, &res, &timeout, NULL);
4321         if (ret != 0 || res != 0) {
4322                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4323                 return -1;
4324         }
4325
4326         return 0;
4327 }
4328
4329 /* enable an eventscript
4330  */
4331 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4332 {
4333         int ret;
4334         TDB_DATA data;
4335         int32_t res;
4336
4337         data.dsize = strlen(script) + 1;
4338         data.dptr  = discard_const(script);
4339
4340         ret = ctdb_control(ctdb, destnode, 0, 
4341                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4342                            NULL, NULL, &res, &timeout, NULL);
4343         if (ret != 0 || res != 0) {
4344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4345                 return -1;
4346         }
4347
4348         return 0;
4349 }
4350
4351 /* disable an eventscript
4352  */
4353 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4354 {
4355         int ret;
4356         TDB_DATA data;
4357         int32_t res;
4358
4359         data.dsize = strlen(script) + 1;
4360         data.dptr  = discard_const(script);
4361
4362         ret = ctdb_control(ctdb, destnode, 0, 
4363                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4364                            NULL, NULL, &res, &timeout, NULL);
4365         if (ret != 0 || res != 0) {
4366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4367                 return -1;
4368         }
4369
4370         return 0;
4371 }
4372
4373
4374 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4375 {
4376         int ret;
4377         TDB_DATA data;
4378         int32_t res;
4379
4380         data.dsize = sizeof(*bantime);
4381         data.dptr  = (uint8_t *)bantime;
4382
4383         ret = ctdb_control(ctdb, destnode, 0, 
4384                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4385                            NULL, NULL, &res, &timeout, NULL);
4386         if (ret != 0 || res != 0) {
4387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4388                 return -1;
4389         }
4390
4391         return 0;
4392 }
4393
4394
4395 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4396 {
4397         int ret;
4398         TDB_DATA outdata;
4399         int32_t res;
4400         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4401
4402         ret = ctdb_control(ctdb, destnode, 0, 
4403                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4404                            tmp_ctx, &outdata, &res, &timeout, NULL);
4405         if (ret != 0 || res != 0) {
4406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4407                 talloc_free(tmp_ctx);
4408                 return -1;
4409         }
4410
4411         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4412         talloc_free(tmp_ctx);
4413
4414         return 0;
4415 }
4416
4417
4418 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4419 {
4420         int ret;
4421         int32_t res;
4422         TDB_DATA data;
4423         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4424
4425         data.dptr = (uint8_t*)db_prio;
4426         data.dsize = sizeof(*db_prio);
4427
4428         ret = ctdb_control(ctdb, destnode, 0, 
4429                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4430                            tmp_ctx, NULL, &res, &timeout, NULL);
4431         if (ret != 0 || res != 0) {
4432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4433                 talloc_free(tmp_ctx);
4434                 return -1;
4435         }
4436
4437         talloc_free(tmp_ctx);
4438
4439         return 0;
4440 }
4441
4442 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4443 {
4444         int ret;
4445         int32_t res;
4446         TDB_DATA data;
4447         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4448
4449         data.dptr = (uint8_t*)&db_id;
4450         data.dsize = sizeof(db_id);
4451
4452         ret = ctdb_control(ctdb, destnode, 0, 
4453                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4454                            tmp_ctx, NULL, &res, &timeout, NULL);
4455         if (ret != 0 || res < 0) {
4456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4457                 talloc_free(tmp_ctx);
4458                 return -1;
4459         }
4460
4461         if (priority) {
4462                 *priority = res;
4463         }
4464
4465         talloc_free(tmp_ctx);
4466
4467         return 0;
4468 }
4469
4470 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4471 {
4472         int ret;
4473         TDB_DATA outdata;
4474         int32_t res;
4475
4476         ret = ctdb_control(ctdb, destnode, 0, 
4477                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4478                            mem_ctx, &outdata, &res, &timeout, NULL);
4479         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4481                 return -1;
4482         }
4483
4484         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4485         talloc_free(outdata.dptr);
4486                     
4487         return 0;
4488 }
4489
4490 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4491 {
4492         if (h == NULL) {
4493                 return NULL;
4494         }
4495
4496         return &h->header;
4497 }
4498
4499
4500 struct ctdb_client_control_state *
4501 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4502 {
4503         struct ctdb_client_control_state *handle;
4504         struct ctdb_marshall_buffer *m;
4505         struct ctdb_rec_data *rec;
4506         TDB_DATA outdata;
4507
4508         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4509         if (m == NULL) {
4510                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4511                 return NULL;
4512         }
4513
4514         m->db_id = ctdb_db->db_id;
4515
4516         rec = ctdb_marshall_record(m, 0, key, header, data);
4517         if (rec == NULL) {
4518                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4519                 talloc_free(m);
4520                 return NULL;
4521         }
4522         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4523         if (m == NULL) {
4524                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4525                 talloc_free(m);
4526                 return NULL;
4527         }
4528         m->count++;
4529         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4530
4531
4532         outdata.dptr = (uint8_t *)m;
4533         outdata.dsize = talloc_get_size(m);
4534
4535         handle = ctdb_control_send(ctdb, destnode, 0, 
4536                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4537                            mem_ctx, &timeout, NULL);
4538         talloc_free(m);
4539         return handle;
4540 }
4541
4542 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4543 {
4544         int ret;
4545         int32_t res;
4546
4547         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4548         if ( (ret != 0) || (res != 0) ){
4549                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4550                 return -1;
4551         }
4552
4553         return 0;
4554 }
4555
4556 int
4557 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4558 {
4559         struct ctdb_client_control_state *state;
4560
4561         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4562         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4563 }
4564
4565
4566
4567
4568
4569
4570 /*
4571   set a database to be readonly
4572  */
4573 struct ctdb_client_control_state *
4574 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4575 {
4576         TDB_DATA data;
4577
4578         data.dptr = (uint8_t *)&dbid;
4579         data.dsize = sizeof(dbid);
4580
4581         return ctdb_control_send(ctdb, destnode, 0, 
4582                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4583                            ctdb, NULL, NULL);
4584 }
4585
4586 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4587 {
4588         int ret;
4589         int32_t res;
4590
4591         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4592         if (ret != 0 || res != 0) {
4593           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4594                 return -1;
4595         }
4596
4597         return 0;
4598 }
4599
4600 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4601 {
4602         struct ctdb_client_control_state *state;
4603
4604         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4605         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4606 }
4607
4608 /*
4609   set a database to be sticky
4610  */
4611 struct ctdb_client_control_state *
4612 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4613 {
4614         TDB_DATA data;
4615
4616         data.dptr = (uint8_t *)&dbid;
4617         data.dsize = sizeof(dbid);
4618
4619         return ctdb_control_send(ctdb, destnode, 0, 
4620                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4621                            ctdb, NULL, NULL);
4622 }
4623
4624 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4625 {
4626         int ret;
4627         int32_t res;
4628
4629         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4630         if (ret != 0 || res != 0) {
4631                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4632                 return -1;
4633         }
4634
4635         return 0;
4636 }
4637
4638 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4639 {
4640         struct ctdb_client_control_state *state;
4641
4642         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4643         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4644 }