ReadOnly: Rename the function ctdb_ltdb_fetch_readonly() to ctdb_ltdb_fetch_with_head...
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391
392         return state;
393 }
394
395 /*
396   make a ctdb call to the local daemon - async send. Called from client context.
397
398   This constructs a ctdb_call request and queues it for processing. 
399   This call never blocks.
400 */
401 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
402                                               struct ctdb_call *call)
403 {
404         struct ctdb_client_call_state *state;
405         struct ctdb_context *ctdb = ctdb_db->ctdb;
406         struct ctdb_ltdb_header header;
407         TDB_DATA data;
408         int ret;
409         size_t len;
410         struct ctdb_req_call *c;
411
412         /* if the domain socket is not yet open, open it */
413         if (ctdb->daemon.sd==-1) {
414                 ctdb_socket_connect(ctdb);
415         }
416
417         ret = ctdb_ltdb_lock(ctdb_db, call->key);
418         if (ret != 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
420                 return NULL;
421         }
422
423         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424
425         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
426                 ret = -1;
427         }
428
429         if (ret == 0 && header.dmaster == ctdb->pnn) {
430                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
431                 talloc_free(data.dptr);
432                 ctdb_ltdb_unlock(ctdb_db, call->key);
433                 return state;
434         }
435
436         ctdb_ltdb_unlock(ctdb_db, call->key);
437         talloc_free(data.dptr);
438
439         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
440         if (state == NULL) {
441                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
442                 return NULL;
443         }
444         state->call = talloc_zero(state, struct ctdb_call);
445         if (state->call == NULL) {
446                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
447                 return NULL;
448         }
449
450         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
451         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
452         if (c == NULL) {
453                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
454                 return NULL;
455         }
456
457         state->reqid     = ctdb_reqid_new(ctdb, state);
458         state->ctdb_db = ctdb_db;
459         talloc_set_destructor(state, ctdb_client_call_destructor);
460
461         c->hdr.reqid     = state->reqid;
462         c->flags         = call->flags;
463         c->db_id         = ctdb_db->db_id;
464         c->callid        = call->call_id;
465         c->hopcount      = 0;
466         c->keylen        = call->key.dsize;
467         c->calldatalen   = call->call_data.dsize;
468         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
469         memcpy(&c->data[call->key.dsize], 
470                call->call_data.dptr, call->call_data.dsize);
471         *(state->call)              = *call;
472         state->call->call_data.dptr = &c->data[call->key.dsize];
473         state->call->key.dptr       = &c->data[0];
474
475         state->state  = CTDB_CALL_WAIT;
476
477
478         ctdb_client_queue_pkt(ctdb, &c->hdr);
479
480         return state;
481 }
482
483
484 /*
485   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
486 */
487 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
488 {
489         struct ctdb_client_call_state *state;
490
491         state = ctdb_call_send(ctdb_db, call);
492         return ctdb_call_recv(state, call);
493 }
494
495
496 /*
497   tell the daemon what messaging srvid we will use, and register the message
498   handler function in the client
499 */
500 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
501                              ctdb_msg_fn_t handler,
502                              void *private_data)
503                                     
504 {
505         int res;
506         int32_t status;
507         
508         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
509                            tdb_null, NULL, NULL, &status, NULL, NULL);
510         if (res != 0 || status != 0) {
511                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
512                 return -1;
513         }
514
515         /* also need to register the handler with our own ctdb structure */
516         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
517 }
518
519 /*
520   tell the daemon we no longer want a srvid
521 */
522 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
523 {
524         int res;
525         int32_t status;
526         
527         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
528                            tdb_null, NULL, NULL, &status, NULL, NULL);
529         if (res != 0 || status != 0) {
530                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
531                 return -1;
532         }
533
534         /* also need to register the handler with our own ctdb structure */
535         ctdb_deregister_message_handler(ctdb, srvid, private_data);
536         return 0;
537 }
538
539
540 /*
541   send a message - from client context
542  */
543 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
544                       uint64_t srvid, TDB_DATA data)
545 {
546         struct ctdb_req_message *r;
547         int len, res;
548
549         len = offsetof(struct ctdb_req_message, data) + data.dsize;
550         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
551                                len, struct ctdb_req_message);
552         CTDB_NO_MEMORY(ctdb, r);
553
554         r->hdr.destnode  = pnn;
555         r->srvid         = srvid;
556         r->datalen       = data.dsize;
557         memcpy(&r->data[0], data.dptr, data.dsize);
558         
559         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
560         if (res != 0) {
561                 return res;
562         }
563
564         talloc_free(r);
565         return 0;
566 }
567
568
569 /*
570   cancel a ctdb_fetch_lock operation, releasing the lock
571  */
572 static int fetch_lock_destructor(struct ctdb_record_handle *h)
573 {
574         ctdb_ltdb_unlock(h->ctdb_db, h->key);
575         return 0;
576 }
577
578 /*
579   force the migration of a record to this node
580  */
581 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
582 {
583         struct ctdb_call call;
584         ZERO_STRUCT(call);
585         call.call_id = CTDB_NULL_FUNC;
586         call.key = key;
587         call.flags = CTDB_IMMEDIATE_MIGRATION;
588         return ctdb_call(ctdb_db, &call);
589 }
590
591 /*
592   try to fetch a readonly copy of a record
593  */
594 static int
595 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
596 {
597         int ret;
598
599         struct ctdb_call call;
600         ZERO_STRUCT(call);
601
602         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
603         call.call_data.dptr = NULL;
604         call.call_data.dsize = 0;
605         call.key = key;
606         call.flags = CTDB_WANT_READONLY;
607         ret = ctdb_call(ctdb_db, &call);
608
609         if (ret != 0) {
610                 return -1;
611         }
612         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
613                 return -1;
614         }
615
616         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
617         if (*hdr == NULL) {
618                 talloc_free(call.reply_data.dptr);
619                 return -1;
620         }
621
622         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
623         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
624         if (data->dptr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 talloc_free(hdr);
627                 return -1;
628         }
629
630         return 0;
631 }
632
633 /*
634   get a lock on a record, and return the records data. Blocks until it gets the lock
635  */
636 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
637                                            TDB_DATA key, TDB_DATA *data)
638 {
639         int ret;
640         struct ctdb_record_handle *h;
641
642         /*
643           procedure is as follows:
644
645           1) get the chain lock. 
646           2) check if we are dmaster
647           3) if we are the dmaster then return handle 
648           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
649              reply from ctdbd
650           5) when we get the reply, goto (1)
651          */
652
653         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
654         if (h == NULL) {
655                 return NULL;
656         }
657
658         h->ctdb_db = ctdb_db;
659         h->key     = key;
660         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
661         if (h->key.dptr == NULL) {
662                 talloc_free(h);
663                 return NULL;
664         }
665         h->data    = data;
666
667         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
668                  (const char *)key.dptr));
669
670 again:
671         /* step 1 - get the chain lock */
672         ret = ctdb_ltdb_lock(ctdb_db, key);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
675                 talloc_free(h);
676                 return NULL;
677         }
678
679         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
680
681         talloc_set_destructor(h, fetch_lock_destructor);
682
683         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
684
685         /* when torturing, ensure we test the remote path */
686         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
687             random() % 5 == 0) {
688                 h->header.dmaster = (uint32_t)-1;
689         }
690
691
692         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
693
694         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
695                 ctdb_ltdb_unlock(ctdb_db, key);
696                 ret = ctdb_client_force_migration(ctdb_db, key);
697                 if (ret != 0) {
698                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
699                         talloc_free(h);
700                         return NULL;
701                 }
702                 goto again;
703         }
704
705         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
706         return h;
707 }
708
709 /*
710   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
711  */
712 struct ctdb_record_handle *
713 ctdb_fetch_readonly_lock(
714         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
715         TDB_DATA key, TDB_DATA *data,
716         int read_only)
717 {
718         int ret;
719         struct ctdb_record_handle *h;
720         struct ctdb_ltdb_header *roheader = NULL;
721
722         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
723         if (h == NULL) {
724                 return NULL;
725         }
726
727         h->ctdb_db = ctdb_db;
728         h->key     = key;
729         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
730         if (h->key.dptr == NULL) {
731                 talloc_free(h);
732                 return NULL;
733         }
734         h->data    = data;
735
736         data->dptr = NULL;
737         data->dsize = 0;
738
739
740 again:
741         talloc_free(roheader);
742         roheader = NULL;
743
744         talloc_free(data->dptr);
745         data->dptr = NULL;
746         data->dsize = 0;
747
748         /* Lock the record/chain */
749         ret = ctdb_ltdb_lock(ctdb_db, key);
750         if (ret != 0) {
751                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
752                 talloc_free(h);
753                 return NULL;
754         }
755
756         talloc_set_destructor(h, fetch_lock_destructor);
757
758         /* Check if record exists yet in the TDB */
759         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
760         if (ret != 0) {
761                 ctdb_ltdb_unlock(ctdb_db, key);
762                 ret = ctdb_client_force_migration(ctdb_db, key);
763                 if (ret != 0) {
764                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
765                         talloc_free(h);
766                         return NULL;
767                 }
768                 goto again;
769         }
770
771         /* if this is a request for read/write and we have delegations
772            we have to revoke all delegations first
773         */
774         if ((read_only == 0) 
775         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
776         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
777                 ctdb_ltdb_unlock(ctdb_db, key);
778                 ret = ctdb_client_force_migration(ctdb_db, key);
779                 if (ret != 0) {
780                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
781                         talloc_free(h);
782                         return NULL;
783                 }
784                 goto again;
785         }
786
787         /* if we are dmaster, just return the handle */
788         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
789                 return h;
790         }
791
792         if (read_only != 0) {
793                 TDB_DATA rodata = {NULL, 0};
794
795                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
796                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                         return h;
798                 }
799
800                 ctdb_ltdb_unlock(ctdb_db, key);
801                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
802                 if (ret != 0) {
803                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
804                         ret = ctdb_client_force_migration(ctdb_db, key);
805                         if (ret != 0) {
806                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
807                                 talloc_free(h);
808                                 return NULL;
809                         }
810
811                         goto again;
812                 }
813
814                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
815                         ret = ctdb_client_force_migration(ctdb_db, key);
816                         if (ret != 0) {
817                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
818                                 talloc_free(h);
819                                 return NULL;
820                         }
821
822                         goto again;
823                 }
824
825                 ret = ctdb_ltdb_lock(ctdb_db, key);
826                 if (ret != 0) {
827                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
828                         talloc_free(h);
829                         return NULL;
830                 }
831
832                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
833                 if (ret != 0) {
834                         ctdb_ltdb_unlock(ctdb_db, key);
835
836                         ret = ctdb_client_force_migration(ctdb_db, key);
837                         if (ret != 0) {
838                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
839                                 talloc_free(h);
840                                 return NULL;
841                         }
842
843                         goto again;
844                 }
845
846                 if (h->header.rsn >= roheader->rsn) {
847                         DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
848                         ctdb_ltdb_unlock(ctdb_db, key);
849
850                         ret = ctdb_client_force_migration(ctdb_db, key);
851                         if (ret != 0) {
852                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
853                                 talloc_free(h);
854                                 return NULL;
855                         }
856
857                         goto again;
858                 }
859
860                 if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
861                         ctdb_ltdb_unlock(ctdb_db, key);
862
863                         ret = ctdb_client_force_migration(ctdb_db, key);
864                         if (ret != 0) {
865                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
866                                 talloc_free(h);
867                                 return NULL;
868                         }
869
870                         goto again;
871                 }
872                 return h;
873         }
874
875         /* we are not dmaster and this was not a request for a readonly lock
876          * so unlock the record, migrate it and try again
877          */
878         ctdb_ltdb_unlock(ctdb_db, key);
879         ret = ctdb_client_force_migration(ctdb_db, key);
880         if (ret != 0) {
881                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
882                 talloc_free(h);
883                 return NULL;
884         }
885         goto again;
886 }
887
888 /*
889   store some data to the record that was locked with ctdb_fetch_lock()
890 */
891 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
892 {
893         if (h->ctdb_db->persistent) {
894                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
895                 return -1;
896         }
897
898         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
899 }
900
901 /*
902   non-locking fetch of a record
903  */
904 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
905                TDB_DATA key, TDB_DATA *data)
906 {
907         struct ctdb_call call;
908         int ret;
909
910         call.call_id = CTDB_FETCH_FUNC;
911         call.call_data.dptr = NULL;
912         call.call_data.dsize = 0;
913         call.key = key;
914
915         ret = ctdb_call(ctdb_db, &call);
916
917         if (ret == 0) {
918                 *data = call.reply_data;
919                 talloc_steal(mem_ctx, data->dptr);
920         }
921
922         return ret;
923 }
924
925
926
927 /*
928    called when a control completes or timesout to invoke the callback
929    function the user provided
930 */
931 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
932         struct timeval t, void *private_data)
933 {
934         struct ctdb_client_control_state *state;
935         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
936         int ret;
937
938         state = talloc_get_type(private_data, struct ctdb_client_control_state);
939         talloc_steal(tmp_ctx, state);
940
941         ret = ctdb_control_recv(state->ctdb, state, state,
942                         NULL, 
943                         NULL, 
944                         NULL);
945
946         talloc_free(tmp_ctx);
947 }
948
949 /*
950   called when a CTDB_REPLY_CONTROL packet comes in in the client
951
952   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
953   contains any reply data from the control
954 */
955 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
956                                       struct ctdb_req_header *hdr)
957 {
958         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
959         struct ctdb_client_control_state *state;
960
961         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
962         if (state == NULL) {
963                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
964                 return;
965         }
966
967         if (hdr->reqid != state->reqid) {
968                 /* we found a record  but it was the wrong one */
969                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
970                 return;
971         }
972
973         state->outdata.dptr = c->data;
974         state->outdata.dsize = c->datalen;
975         state->status = c->status;
976         if (c->errorlen) {
977                 state->errormsg = talloc_strndup(state, 
978                                                  (char *)&c->data[c->datalen], 
979                                                  c->errorlen);
980         }
981
982         /* state->outdata now uses resources from c so we dont want c
983            to just dissappear from under us while state is still alive
984         */
985         talloc_steal(state, c);
986
987         state->state = CTDB_CONTROL_DONE;
988
989         /* if we had a callback registered for this control, pull the response
990            and call the callback.
991         */
992         if (state->async.fn) {
993                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
994         }
995 }
996
997
998 /*
999   destroy a ctdb_control in client
1000 */
1001 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
1002 {
1003         ctdb_reqid_remove(state->ctdb, state->reqid);
1004         return 0;
1005 }
1006
1007
1008 /* time out handler for ctdb_control */
1009 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1010         struct timeval t, void *private_data)
1011 {
1012         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1013
1014         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1015                          "dstnode:%u\n", state->reqid, state->c->opcode,
1016                          state->c->hdr.destnode));
1017
1018         state->state = CTDB_CONTROL_TIMEOUT;
1019
1020         /* if we had a callback registered for this control, pull the response
1021            and call the callback.
1022         */
1023         if (state->async.fn) {
1024                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1025         }
1026 }
1027
1028 /* async version of send control request */
1029 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1030                 uint32_t destnode, uint64_t srvid, 
1031                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1032                 TALLOC_CTX *mem_ctx,
1033                 struct timeval *timeout,
1034                 char **errormsg)
1035 {
1036         struct ctdb_client_control_state *state;
1037         size_t len;
1038         struct ctdb_req_control *c;
1039         int ret;
1040
1041         if (errormsg) {
1042                 *errormsg = NULL;
1043         }
1044
1045         /* if the domain socket is not yet open, open it */
1046         if (ctdb->daemon.sd==-1) {
1047                 ctdb_socket_connect(ctdb);
1048         }
1049
1050         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1051         CTDB_NO_MEMORY_NULL(ctdb, state);
1052
1053         state->ctdb       = ctdb;
1054         state->reqid      = ctdb_reqid_new(ctdb, state);
1055         state->state      = CTDB_CONTROL_WAIT;
1056         state->errormsg   = NULL;
1057
1058         talloc_set_destructor(state, ctdb_control_destructor);
1059
1060         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1061         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1062                                len, struct ctdb_req_control);
1063         state->c            = c;        
1064         CTDB_NO_MEMORY_NULL(ctdb, c);
1065         c->hdr.reqid        = state->reqid;
1066         c->hdr.destnode     = destnode;
1067         c->opcode           = opcode;
1068         c->client_id        = 0;
1069         c->flags            = flags;
1070         c->srvid            = srvid;
1071         c->datalen          = data.dsize;
1072         if (data.dsize) {
1073                 memcpy(&c->data[0], data.dptr, data.dsize);
1074         }
1075
1076         /* timeout */
1077         if (timeout && !timeval_is_zero(timeout)) {
1078                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1079         }
1080
1081         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1082         if (ret != 0) {
1083                 talloc_free(state);
1084                 return NULL;
1085         }
1086
1087         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1088                 talloc_free(state);
1089                 return NULL;
1090         }
1091
1092         return state;
1093 }
1094
1095
1096 /* async version of receive control reply */
1097 int ctdb_control_recv(struct ctdb_context *ctdb, 
1098                 struct ctdb_client_control_state *state, 
1099                 TALLOC_CTX *mem_ctx,
1100                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1101 {
1102         TALLOC_CTX *tmp_ctx;
1103
1104         if (status != NULL) {
1105                 *status = -1;
1106         }
1107         if (errormsg != NULL) {
1108                 *errormsg = NULL;
1109         }
1110
1111         if (state == NULL) {
1112                 return -1;
1113         }
1114
1115         /* prevent double free of state */
1116         tmp_ctx = talloc_new(ctdb);
1117         talloc_steal(tmp_ctx, state);
1118
1119         /* loop one event at a time until we either timeout or the control
1120            completes.
1121         */
1122         while (state->state == CTDB_CONTROL_WAIT) {
1123                 event_loop_once(ctdb->ev);
1124         }
1125
1126         if (state->state != CTDB_CONTROL_DONE) {
1127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1128                 if (state->async.fn) {
1129                         state->async.fn(state);
1130                 }
1131                 talloc_free(tmp_ctx);
1132                 return -1;
1133         }
1134
1135         if (state->errormsg) {
1136                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1137                 if (errormsg) {
1138                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1139                 }
1140                 if (state->async.fn) {
1141                         state->async.fn(state);
1142                 }
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         if (outdata) {
1148                 *outdata = state->outdata;
1149                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1150         }
1151
1152         if (status) {
1153                 *status = state->status;
1154         }
1155
1156         if (state->async.fn) {
1157                 state->async.fn(state);
1158         }
1159
1160         talloc_free(tmp_ctx);
1161         return 0;
1162 }
1163
1164
1165
1166 /*
1167   send a ctdb control message
1168   timeout specifies how long we should wait for a reply.
1169   if timeout is NULL we wait indefinitely
1170  */
1171 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1172                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1173                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1174                  struct timeval *timeout,
1175                  char **errormsg)
1176 {
1177         struct ctdb_client_control_state *state;
1178
1179         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1180                         flags, data, mem_ctx,
1181                         timeout, errormsg);
1182         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1183                         errormsg);
1184 }
1185
1186
1187
1188
1189 /*
1190   a process exists call. Returns 0 if process exists, -1 otherwise
1191  */
1192 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1193 {
1194         int ret;
1195         TDB_DATA data;
1196         int32_t status;
1197
1198         data.dptr = (uint8_t*)&pid;
1199         data.dsize = sizeof(pid);
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1203                            NULL, NULL, &status, NULL, NULL);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1206                 return -1;
1207         }
1208
1209         return status;
1210 }
1211
1212 /*
1213   get remote statistics
1214  */
1215 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1216 {
1217         int ret;
1218         TDB_DATA data;
1219         int32_t res;
1220
1221         ret = ctdb_control(ctdb, destnode, 0, 
1222                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1223                            ctdb, &data, &res, NULL, NULL);
1224         if (ret != 0 || res != 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1226                 return -1;
1227         }
1228
1229         if (data.dsize != sizeof(struct ctdb_statistics)) {
1230                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1231                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1232                       return -1;
1233         }
1234
1235         *status = *(struct ctdb_statistics *)data.dptr;
1236         talloc_free(data.dptr);
1237                         
1238         return 0;
1239 }
1240
1241 /*
1242   shutdown a remote ctdb node
1243  */
1244 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1245 {
1246         struct ctdb_client_control_state *state;
1247
1248         state = ctdb_control_send(ctdb, destnode, 0, 
1249                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1250                            NULL, &timeout, NULL);
1251         if (state == NULL) {
1252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1253                 return -1;
1254         }
1255
1256         return 0;
1257 }
1258
1259 /*
1260   get vnn map from a remote node
1261  */
1262 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1263 {
1264         int ret;
1265         TDB_DATA outdata;
1266         int32_t res;
1267         struct ctdb_vnn_map_wire *map;
1268
1269         ret = ctdb_control(ctdb, destnode, 0, 
1270                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1271                            mem_ctx, &outdata, &res, &timeout, NULL);
1272         if (ret != 0 || res != 0) {
1273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1274                 return -1;
1275         }
1276         
1277         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1278         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1279             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1280                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1281                 return -1;
1282         }
1283
1284         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1285         CTDB_NO_MEMORY(ctdb, *vnnmap);
1286         (*vnnmap)->generation = map->generation;
1287         (*vnnmap)->size       = map->size;
1288         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1289
1290         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1291         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1292         talloc_free(outdata.dptr);
1293                     
1294         return 0;
1295 }
1296
1297
1298 /*
1299   get the recovery mode of a remote node
1300  */
1301 struct ctdb_client_control_state *
1302 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1303 {
1304         return ctdb_control_send(ctdb, destnode, 0, 
1305                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1306                            mem_ctx, &timeout, NULL);
1307 }
1308
1309 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1310 {
1311         int ret;
1312         int32_t res;
1313
1314         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1317                 return -1;
1318         }
1319
1320         if (recmode) {
1321                 *recmode = (uint32_t)res;
1322         }
1323
1324         return 0;
1325 }
1326
1327 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1328 {
1329         struct ctdb_client_control_state *state;
1330
1331         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1332         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1333 }
1334
1335
1336
1337
1338 /*
1339   set the recovery mode of a remote node
1340  */
1341 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1342 {
1343         int ret;
1344         TDB_DATA data;
1345         int32_t res;
1346
1347         data.dsize = sizeof(uint32_t);
1348         data.dptr = (unsigned char *)&recmode;
1349
1350         ret = ctdb_control(ctdb, destnode, 0, 
1351                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1352                            NULL, NULL, &res, &timeout, NULL);
1353         if (ret != 0 || res != 0) {
1354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1355                 return -1;
1356         }
1357
1358         return 0;
1359 }
1360
1361
1362
1363 /*
1364   get the recovery master of a remote node
1365  */
1366 struct ctdb_client_control_state *
1367 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1368                         struct timeval timeout, uint32_t destnode)
1369 {
1370         return ctdb_control_send(ctdb, destnode, 0, 
1371                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1372                            mem_ctx, &timeout, NULL);
1373 }
1374
1375 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1376 {
1377         int ret;
1378         int32_t res;
1379
1380         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1381         if (ret != 0) {
1382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1383                 return -1;
1384         }
1385
1386         if (recmaster) {
1387                 *recmaster = (uint32_t)res;
1388         }
1389
1390         return 0;
1391 }
1392
1393 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1394 {
1395         struct ctdb_client_control_state *state;
1396
1397         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1398         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1399 }
1400
1401
1402 /*
1403   set the recovery master of a remote node
1404  */
1405 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1406 {
1407         int ret;
1408         TDB_DATA data;
1409         int32_t res;
1410
1411         ZERO_STRUCT(data);
1412         data.dsize = sizeof(uint32_t);
1413         data.dptr = (unsigned char *)&recmaster;
1414
1415         ret = ctdb_control(ctdb, destnode, 0, 
1416                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1417                            NULL, NULL, &res, &timeout, NULL);
1418         if (ret != 0 || res != 0) {
1419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1420                 return -1;
1421         }
1422
1423         return 0;
1424 }
1425
1426
1427 /*
1428   get a list of databases off a remote node
1429  */
1430 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1431                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1432 {
1433         int ret;
1434         TDB_DATA outdata;
1435         int32_t res;
1436
1437         ret = ctdb_control(ctdb, destnode, 0, 
1438                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1439                            mem_ctx, &outdata, &res, &timeout, NULL);
1440         if (ret != 0 || res != 0) {
1441                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1442                 return -1;
1443         }
1444
1445         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1446         talloc_free(outdata.dptr);
1447                     
1448         return 0;
1449 }
1450
1451 /*
1452   get a list of nodes (vnn and flags ) from a remote node
1453  */
1454 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1455                 struct timeval timeout, uint32_t destnode, 
1456                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1457 {
1458         int ret;
1459         TDB_DATA outdata;
1460         int32_t res;
1461
1462         ret = ctdb_control(ctdb, destnode, 0, 
1463                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1464                            mem_ctx, &outdata, &res, &timeout, NULL);
1465         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1466                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1467                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1468         }
1469         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1471                 return -1;
1472         }
1473
1474         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1475         talloc_free(outdata.dptr);
1476                     
1477         return 0;
1478 }
1479
1480 /*
1481   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1482  */
1483 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1484                 struct timeval timeout, uint32_t destnode, 
1485                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1486 {
1487         int ret, i, len;
1488         TDB_DATA outdata;
1489         struct ctdb_node_mapv4 *nodemapv4;
1490         int32_t res;
1491
1492         ret = ctdb_control(ctdb, destnode, 0, 
1493                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1494                            mem_ctx, &outdata, &res, &timeout, NULL);
1495         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1497                 return -1;
1498         }
1499
1500         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1501
1502         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1503         (*nodemap) = talloc_zero_size(mem_ctx, len);
1504         CTDB_NO_MEMORY(ctdb, (*nodemap));
1505
1506         (*nodemap)->num = nodemapv4->num;
1507         for (i=0; i<nodemapv4->num; i++) {
1508                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1509                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1510                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1511                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1512         }
1513                 
1514         talloc_free(outdata.dptr);
1515                     
1516         return 0;
1517 }
1518
1519 /*
1520   drop the transport, reload the nodes file and restart the transport
1521  */
1522 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1523                     struct timeval timeout, uint32_t destnode)
1524 {
1525         int ret;
1526         int32_t res;
1527
1528         ret = ctdb_control(ctdb, destnode, 0, 
1529                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1530                            NULL, NULL, &res, &timeout, NULL);
1531         if (ret != 0 || res != 0) {
1532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1533                 return -1;
1534         }
1535
1536         return 0;
1537 }
1538
1539
1540 /*
1541   set vnn map on a node
1542  */
1543 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1544                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1545 {
1546         int ret;
1547         TDB_DATA data;
1548         int32_t res;
1549         struct ctdb_vnn_map_wire *map;
1550         size_t len;
1551
1552         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1553         map = talloc_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, map);
1555
1556         map->generation = vnnmap->generation;
1557         map->size = vnnmap->size;
1558         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1559         
1560         data.dsize = len;
1561         data.dptr  = (uint8_t *)map;
1562
1563         ret = ctdb_control(ctdb, destnode, 0, 
1564                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1565                            NULL, NULL, &res, &timeout, NULL);
1566         if (ret != 0 || res != 0) {
1567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1568                 return -1;
1569         }
1570
1571         talloc_free(map);
1572
1573         return 0;
1574 }
1575
1576
1577 /*
1578   async send for pull database
1579  */
1580 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1581         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1582         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1583 {
1584         TDB_DATA indata;
1585         struct ctdb_control_pulldb *pull;
1586         struct ctdb_client_control_state *state;
1587
1588         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1589         CTDB_NO_MEMORY_NULL(ctdb, pull);
1590
1591         pull->db_id   = dbid;
1592         pull->lmaster = lmaster;
1593
1594         indata.dsize = sizeof(struct ctdb_control_pulldb);
1595         indata.dptr  = (unsigned char *)pull;
1596
1597         state = ctdb_control_send(ctdb, destnode, 0, 
1598                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1599                                   mem_ctx, &timeout, NULL);
1600         talloc_free(pull);
1601
1602         return state;
1603 }
1604
1605 /*
1606   async recv for pull database
1607  */
1608 int ctdb_ctrl_pulldb_recv(
1609         struct ctdb_context *ctdb, 
1610         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1611         TDB_DATA *outdata)
1612 {
1613         int ret;
1614         int32_t res;
1615
1616         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1617         if ( (ret != 0) || (res != 0) ){
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1619                 return -1;
1620         }
1621
1622         return 0;
1623 }
1624
1625 /*
1626   pull all keys and records for a specific database on a node
1627  */
1628 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1629                 uint32_t dbid, uint32_t lmaster, 
1630                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1631                 TDB_DATA *outdata)
1632 {
1633         struct ctdb_client_control_state *state;
1634
1635         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1636                                       timeout);
1637         
1638         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1639 }
1640
1641
1642 /*
1643   change dmaster for all keys in the database to the new value
1644  */
1645 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1646                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1647 {
1648         int ret;
1649         TDB_DATA indata;
1650         int32_t res;
1651
1652         indata.dsize = 2*sizeof(uint32_t);
1653         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1654
1655         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1656         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1657
1658         ret = ctdb_control(ctdb, destnode, 0, 
1659                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1660                            NULL, NULL, &res, &timeout, NULL);
1661         if (ret != 0 || res != 0) {
1662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1663                 return -1;
1664         }
1665
1666         return 0;
1667 }
1668
1669 /*
1670   ping a node, return number of clients connected
1671  */
1672 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1673 {
1674         int ret;
1675         int32_t res;
1676
1677         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1678                            tdb_null, NULL, NULL, &res, NULL, NULL);
1679         if (ret != 0) {
1680                 return -1;
1681         }
1682         return res;
1683 }
1684
1685 /*
1686   find the real path to a ltdb 
1687  */
1688 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1689                    const char **path)
1690 {
1691         int ret;
1692         int32_t res;
1693         TDB_DATA data;
1694
1695         data.dptr = (uint8_t *)&dbid;
1696         data.dsize = sizeof(dbid);
1697
1698         ret = ctdb_control(ctdb, destnode, 0, 
1699                            CTDB_CONTROL_GETDBPATH, 0, data, 
1700                            mem_ctx, &data, &res, &timeout, NULL);
1701         if (ret != 0 || res != 0) {
1702                 return -1;
1703         }
1704
1705         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1706         if ((*path) == NULL) {
1707                 return -1;
1708         }
1709
1710         talloc_free(data.dptr);
1711
1712         return 0;
1713 }
1714
1715 /*
1716   find the name of a db 
1717  */
1718 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1719                    const char **name)
1720 {
1721         int ret;
1722         int32_t res;
1723         TDB_DATA data;
1724
1725         data.dptr = (uint8_t *)&dbid;
1726         data.dsize = sizeof(dbid);
1727
1728         ret = ctdb_control(ctdb, destnode, 0, 
1729                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1730                            mem_ctx, &data, &res, &timeout, NULL);
1731         if (ret != 0 || res != 0) {
1732                 return -1;
1733         }
1734
1735         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1736         if ((*name) == NULL) {
1737                 return -1;
1738         }
1739
1740         talloc_free(data.dptr);
1741
1742         return 0;
1743 }
1744
1745 /*
1746   get the health status of a db
1747  */
1748 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1749                           struct timeval timeout,
1750                           uint32_t destnode,
1751                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1752                           const char **reason)
1753 {
1754         int ret;
1755         int32_t res;
1756         TDB_DATA data;
1757
1758         data.dptr = (uint8_t *)&dbid;
1759         data.dsize = sizeof(dbid);
1760
1761         ret = ctdb_control(ctdb, destnode, 0,
1762                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1763                            mem_ctx, &data, &res, &timeout, NULL);
1764         if (ret != 0 || res != 0) {
1765                 return -1;
1766         }
1767
1768         if (data.dsize == 0) {
1769                 (*reason) = NULL;
1770                 return 0;
1771         }
1772
1773         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1774         if ((*reason) == NULL) {
1775                 return -1;
1776         }
1777
1778         talloc_free(data.dptr);
1779
1780         return 0;
1781 }
1782
1783 /*
1784   create a database
1785  */
1786 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1787                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1788 {
1789         int ret;
1790         int32_t res;
1791         TDB_DATA data;
1792
1793         data.dptr = discard_const(name);
1794         data.dsize = strlen(name)+1;
1795
1796         ret = ctdb_control(ctdb, destnode, 0, 
1797                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1798                            0, data, 
1799                            mem_ctx, &data, &res, &timeout, NULL);
1800
1801         if (ret != 0 || res != 0) {
1802                 return -1;
1803         }
1804
1805         return 0;
1806 }
1807
1808 /*
1809   get debug level on a node
1810  */
1811 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1812 {
1813         int ret;
1814         int32_t res;
1815         TDB_DATA data;
1816
1817         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1818                            ctdb, &data, &res, NULL, NULL);
1819         if (ret != 0 || res != 0) {
1820                 return -1;
1821         }
1822         if (data.dsize != sizeof(int32_t)) {
1823                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1824                          (unsigned)data.dsize));
1825                 return -1;
1826         }
1827         *level = *(int32_t *)data.dptr;
1828         talloc_free(data.dptr);
1829         return 0;
1830 }
1831
1832 /*
1833   set debug level on a node
1834  */
1835 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&level;
1842         data.dsize = sizeof(level);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1845                            NULL, NULL, &res, NULL, NULL);
1846         if (ret != 0 || res != 0) {
1847                 return -1;
1848         }
1849         return 0;
1850 }
1851
1852
1853 /*
1854   get a list of connected nodes
1855  */
1856 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1857                                 struct timeval timeout,
1858                                 TALLOC_CTX *mem_ctx,
1859                                 uint32_t *num_nodes)
1860 {
1861         struct ctdb_node_map *map=NULL;
1862         int ret, i;
1863         uint32_t *nodes;
1864
1865         *num_nodes = 0;
1866
1867         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1868         if (ret != 0) {
1869                 return NULL;
1870         }
1871
1872         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1873         if (nodes == NULL) {
1874                 return NULL;
1875         }
1876
1877         for (i=0;i<map->num;i++) {
1878                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1879                         nodes[*num_nodes] = map->nodes[i].pnn;
1880                         (*num_nodes)++;
1881                 }
1882         }
1883
1884         return nodes;
1885 }
1886
1887
1888 /*
1889   reset remote status
1890  */
1891 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1892 {
1893         int ret;
1894         int32_t res;
1895
1896         ret = ctdb_control(ctdb, destnode, 0, 
1897                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1898                            NULL, NULL, &res, NULL, NULL);
1899         if (ret != 0 || res != 0) {
1900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1901                 return -1;
1902         }
1903         return 0;
1904 }
1905
1906 /*
1907   this is the dummy null procedure that all databases support
1908 */
1909 static int ctdb_null_func(struct ctdb_call_info *call)
1910 {
1911         return 0;
1912 }
1913
1914 /*
1915   this is a plain fetch procedure that all databases support
1916 */
1917 static int ctdb_fetch_func(struct ctdb_call_info *call)
1918 {
1919         call->reply_data = &call->record_data;
1920         return 0;
1921 }
1922
1923 /*
1924   this is a plain fetch procedure that all databases support
1925   this returns the full record including the ltdb header
1926 */
1927 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1928 {
1929         call->reply_data = talloc(call, TDB_DATA);
1930         if (call->reply_data == NULL) {
1931                 return -1;
1932         }
1933         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1934         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1935         if (call->reply_data->dptr == NULL) {
1936                 return -1;
1937         }
1938         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1939         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1940
1941         return 0;
1942 }
1943
1944 /*
1945   attach to a specific database - client call
1946 */
1947 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1948                                     struct timeval timeout,
1949                                     const char *name,
1950                                     bool persistent,
1951                                     uint32_t tdb_flags)
1952 {
1953         struct ctdb_db_context *ctdb_db;
1954         TDB_DATA data;
1955         int ret;
1956         int32_t res;
1957
1958         ctdb_db = ctdb_db_handle(ctdb, name);
1959         if (ctdb_db) {
1960                 return ctdb_db;
1961         }
1962
1963         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1964         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1965
1966         ctdb_db->ctdb = ctdb;
1967         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1968         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1969
1970         data.dptr = discard_const(name);
1971         data.dsize = strlen(name)+1;
1972
1973         /* tell ctdb daemon to attach */
1974         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1975                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1976                            0, data, ctdb_db, &data, &res, NULL, NULL);
1977         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1978                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1979                 talloc_free(ctdb_db);
1980                 return NULL;
1981         }
1982         
1983         ctdb_db->db_id = *(uint32_t *)data.dptr;
1984         talloc_free(data.dptr);
1985
1986         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1987         if (ret != 0) {
1988                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1989                 talloc_free(ctdb_db);
1990                 return NULL;
1991         }
1992
1993         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1994         if (ctdb->valgrinding) {
1995                 tdb_flags |= TDB_NOMMAP;
1996         }
1997         tdb_flags |= TDB_DISALLOW_NESTING;
1998
1999         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2000         if (ctdb_db->ltdb == NULL) {
2001                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2002                 talloc_free(ctdb_db);
2003                 return NULL;
2004         }
2005
2006         ctdb_db->persistent = persistent;
2007
2008         DLIST_ADD(ctdb->db_list, ctdb_db);
2009
2010         /* add well known functions */
2011         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2012         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2013         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2014
2015         return ctdb_db;
2016 }
2017
2018
2019 /*
2020   setup a call for a database
2021  */
2022 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2023 {
2024         struct ctdb_registered_call *call;
2025
2026 #if 0
2027         TDB_DATA data;
2028         int32_t status;
2029         struct ctdb_control_set_call c;
2030         int ret;
2031
2032         /* this is no longer valid with the separate daemon architecture */
2033         c.db_id = ctdb_db->db_id;
2034         c.fn    = fn;
2035         c.id    = id;
2036
2037         data.dptr = (uint8_t *)&c;
2038         data.dsize = sizeof(c);
2039
2040         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2041                            data, NULL, NULL, &status, NULL, NULL);
2042         if (ret != 0 || status != 0) {
2043                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2044                 return -1;
2045         }
2046 #endif
2047
2048         /* also register locally */
2049         call = talloc(ctdb_db, struct ctdb_registered_call);
2050         call->fn = fn;
2051         call->id = id;
2052
2053         DLIST_ADD(ctdb_db->calls, call);        
2054         return 0;
2055 }
2056
2057
2058 struct traverse_state {
2059         bool done;
2060         uint32_t count;
2061         ctdb_traverse_func fn;
2062         void *private_data;
2063 };
2064
2065 /*
2066   called on each key during a ctdb_traverse
2067  */
2068 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2069 {
2070         struct traverse_state *state = (struct traverse_state *)p;
2071         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2072         TDB_DATA key;
2073
2074         if (data.dsize < sizeof(uint32_t) ||
2075             d->length != data.dsize) {
2076                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2077                 state->done = True;
2078                 return;
2079         }
2080
2081         key.dsize = d->keylen;
2082         key.dptr  = &d->data[0];
2083         data.dsize = d->datalen;
2084         data.dptr = &d->data[d->keylen];
2085
2086         if (key.dsize == 0 && data.dsize == 0) {
2087                 /* end of traverse */
2088                 state->done = True;
2089                 return;
2090         }
2091
2092         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
2093                 /* empty records are deleted records in ctdb */
2094                 return;
2095         }
2096
2097         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2098                 state->done = True;
2099         }
2100
2101         state->count++;
2102 }
2103
2104
2105 /*
2106   start a cluster wide traverse, calling the supplied fn on each record
2107   return the number of records traversed, or -1 on error
2108  */
2109 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2110 {
2111         TDB_DATA data;
2112         struct ctdb_traverse_start t;
2113         int32_t status;
2114         int ret;
2115         uint64_t srvid = (getpid() | 0xFLL<<60);
2116         struct traverse_state state;
2117
2118         state.done = False;
2119         state.count = 0;
2120         state.private_data = private_data;
2121         state.fn = fn;
2122
2123         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2124         if (ret != 0) {
2125                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2126                 return -1;
2127         }
2128
2129         t.db_id = ctdb_db->db_id;
2130         t.srvid = srvid;
2131         t.reqid = 0;
2132
2133         data.dptr = (uint8_t *)&t;
2134         data.dsize = sizeof(t);
2135
2136         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2137                            data, NULL, NULL, &status, NULL, NULL);
2138         if (ret != 0 || status != 0) {
2139                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2140                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2141                 return -1;
2142         }
2143
2144         while (!state.done) {
2145                 event_loop_once(ctdb_db->ctdb->ev);
2146         }
2147
2148         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2149         if (ret != 0) {
2150                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2151                 return -1;
2152         }
2153
2154         return state.count;
2155 }
2156
2157 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2158 /*
2159   called on each key during a catdb
2160  */
2161 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2162 {
2163         int i;
2164         FILE *f = (FILE *)p;
2165         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2166
2167         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2168         for (i=0;i<key.dsize;i++) {
2169                 if (ISASCII(key.dptr[i])) {
2170                         fprintf(f, "%c", key.dptr[i]);
2171                 } else {
2172                         fprintf(f, "\\%02X", key.dptr[i]);
2173                 }
2174         }
2175         fprintf(f, "\"\n");
2176
2177         fprintf(f, "dmaster: %u\n", h->dmaster);
2178         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2179         fprintf(f, "flags: 0x%08x", h->flags);
2180         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2181         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2182         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2183         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2184         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2185         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2186         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2187         fprintf(f, "\n");
2188
2189         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2190         for (i=sizeof(*h);i<data.dsize;i++) {
2191                 if (ISASCII(data.dptr[i])) {
2192                         fprintf(f, "%c", data.dptr[i]);
2193                 } else {
2194                         fprintf(f, "\\%02X", data.dptr[i]);
2195                 }
2196         }
2197         fprintf(f, "\"\n");
2198
2199         fprintf(f, "\n");
2200
2201         return 0;
2202 }
2203
2204 /*
2205   convenience function to list all keys to stdout
2206  */
2207 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
2208 {
2209         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
2210 }
2211
2212 /*
2213   get the pid of a ctdb daemon
2214  */
2215 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2216 {
2217         int ret;
2218         int32_t res;
2219
2220         ret = ctdb_control(ctdb, destnode, 0, 
2221                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2222                            NULL, NULL, &res, &timeout, NULL);
2223         if (ret != 0) {
2224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2225                 return -1;
2226         }
2227
2228         *pid = res;
2229
2230         return 0;
2231 }
2232
2233
2234 /*
2235   async freeze send control
2236  */
2237 struct ctdb_client_control_state *
2238 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2239 {
2240         return ctdb_control_send(ctdb, destnode, priority, 
2241                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2242                            mem_ctx, &timeout, NULL);
2243 }
2244
2245 /* 
2246    async freeze recv control
2247 */
2248 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2249 {
2250         int ret;
2251         int32_t res;
2252
2253         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2254         if ( (ret != 0) || (res != 0) ){
2255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2256                 return -1;
2257         }
2258
2259         return 0;
2260 }
2261
2262 /*
2263   freeze databases of a certain priority
2264  */
2265 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2266 {
2267         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2268         struct ctdb_client_control_state *state;
2269         int ret;
2270
2271         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2272         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2273         talloc_free(tmp_ctx);
2274
2275         return ret;
2276 }
2277
2278 /* Freeze all databases */
2279 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2280 {
2281         int i;
2282
2283         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2284                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2285                         return -1;
2286                 }
2287         }
2288         return 0;
2289 }
2290
2291 /*
2292   thaw databases of a certain priority
2293  */
2294 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2295 {
2296         int ret;
2297         int32_t res;
2298
2299         ret = ctdb_control(ctdb, destnode, priority, 
2300                            CTDB_CONTROL_THAW, 0, tdb_null, 
2301                            NULL, NULL, &res, &timeout, NULL);
2302         if (ret != 0 || res != 0) {
2303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2304                 return -1;
2305         }
2306
2307         return 0;
2308 }
2309
2310 /* thaw all databases */
2311 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2312 {
2313         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2314 }
2315
2316 /*
2317   get pnn of a node, or -1
2318  */
2319 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2320 {
2321         int ret;
2322         int32_t res;
2323
2324         ret = ctdb_control(ctdb, destnode, 0, 
2325                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2326                            NULL, NULL, &res, &timeout, NULL);
2327         if (ret != 0) {
2328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2329                 return -1;
2330         }
2331
2332         return res;
2333 }
2334
2335 /*
2336   get the monitoring mode of a remote node
2337  */
2338 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2339 {
2340         int ret;
2341         int32_t res;
2342
2343         ret = ctdb_control(ctdb, destnode, 0, 
2344                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2345                            NULL, NULL, &res, &timeout, NULL);
2346         if (ret != 0) {
2347                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2348                 return -1;
2349         }
2350
2351         *monmode = res;
2352
2353         return 0;
2354 }
2355
2356
2357 /*
2358  set the monitoring mode of a remote node to active
2359  */
2360 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2361 {
2362         int ret;
2363         
2364
2365         ret = ctdb_control(ctdb, destnode, 0, 
2366                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2367                            NULL, NULL,NULL, &timeout, NULL);
2368         if (ret != 0) {
2369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2370                 return -1;
2371         }
2372
2373         
2374
2375         return 0;
2376 }
2377
2378 /*
2379   set the monitoring mode of a remote node to disable
2380  */
2381 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2382 {
2383         int ret;
2384         
2385
2386         ret = ctdb_control(ctdb, destnode, 0, 
2387                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2388                            NULL, NULL, NULL, &timeout, NULL);
2389         if (ret != 0) {
2390                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2391                 return -1;
2392         }
2393
2394         
2395
2396         return 0;
2397 }
2398
2399
2400
2401 /* 
2402   sent to a node to make it take over an ip address
2403 */
2404 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2405                           uint32_t destnode, struct ctdb_public_ip *ip)
2406 {
2407         TDB_DATA data;
2408         struct ctdb_public_ipv4 ipv4;
2409         int ret;
2410         int32_t res;
2411
2412         if (ip->addr.sa.sa_family == AF_INET) {
2413                 ipv4.pnn = ip->pnn;
2414                 ipv4.sin = ip->addr.ip;
2415
2416                 data.dsize = sizeof(ipv4);
2417                 data.dptr  = (uint8_t *)&ipv4;
2418
2419                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2420                            NULL, &res, &timeout, NULL);
2421         } else {
2422                 data.dsize = sizeof(*ip);
2423                 data.dptr  = (uint8_t *)ip;
2424
2425                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2426                            NULL, &res, &timeout, NULL);
2427         }
2428
2429         if (ret != 0 || res != 0) {
2430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2431                 return -1;
2432         }
2433
2434         return 0;       
2435 }
2436
2437
2438 /* 
2439   sent to a node to make it release an ip address
2440 */
2441 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2442                          uint32_t destnode, struct ctdb_public_ip *ip)
2443 {
2444         TDB_DATA data;
2445         struct ctdb_public_ipv4 ipv4;
2446         int ret;
2447         int32_t res;
2448
2449         if (ip->addr.sa.sa_family == AF_INET) {
2450                 ipv4.pnn = ip->pnn;
2451                 ipv4.sin = ip->addr.ip;
2452
2453                 data.dsize = sizeof(ipv4);
2454                 data.dptr  = (uint8_t *)&ipv4;
2455
2456                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2457                                    NULL, &res, &timeout, NULL);
2458         } else {
2459                 data.dsize = sizeof(*ip);
2460                 data.dptr  = (uint8_t *)ip;
2461
2462                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2463                                    NULL, &res, &timeout, NULL);
2464         }
2465
2466         if (ret != 0 || res != 0) {
2467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2468                 return -1;
2469         }
2470
2471         return 0;       
2472 }
2473
2474
2475 /*
2476   get a tunable
2477  */
2478 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2479                           struct timeval timeout, 
2480                           uint32_t destnode,
2481                           const char *name, uint32_t *value)
2482 {
2483         struct ctdb_control_get_tunable *t;
2484         TDB_DATA data, outdata;
2485         int32_t res;
2486         int ret;
2487
2488         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2489         data.dptr  = talloc_size(ctdb, data.dsize);
2490         CTDB_NO_MEMORY(ctdb, data.dptr);
2491
2492         t = (struct ctdb_control_get_tunable *)data.dptr;
2493         t->length = strlen(name)+1;
2494         memcpy(t->name, name, t->length);
2495
2496         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2497                            &outdata, &res, &timeout, NULL);
2498         talloc_free(data.dptr);
2499         if (ret != 0 || res != 0) {
2500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2501                 return -1;
2502         }
2503
2504         if (outdata.dsize != sizeof(uint32_t)) {
2505                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2506                 talloc_free(outdata.dptr);
2507                 return -1;
2508         }
2509         
2510         *value = *(uint32_t *)outdata.dptr;
2511         talloc_free(outdata.dptr);
2512
2513         return 0;
2514 }
2515
2516 /*
2517   set a tunable
2518  */
2519 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2520                           struct timeval timeout, 
2521                           uint32_t destnode,
2522                           const char *name, uint32_t value)
2523 {
2524         struct ctdb_control_set_tunable *t;
2525         TDB_DATA data;
2526         int32_t res;
2527         int ret;
2528
2529         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2530         data.dptr  = talloc_size(ctdb, data.dsize);
2531         CTDB_NO_MEMORY(ctdb, data.dptr);
2532
2533         t = (struct ctdb_control_set_tunable *)data.dptr;
2534         t->length = strlen(name)+1;
2535         memcpy(t->name, name, t->length);
2536         t->value = value;
2537
2538         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2539                            NULL, &res, &timeout, NULL);
2540         talloc_free(data.dptr);
2541         if (ret != 0 || res != 0) {
2542                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2543                 return -1;
2544         }
2545
2546         return 0;
2547 }
2548
2549 /*
2550   list tunables
2551  */
2552 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2553                             struct timeval timeout, 
2554                             uint32_t destnode,
2555                             TALLOC_CTX *mem_ctx,
2556                             const char ***list, uint32_t *count)
2557 {
2558         TDB_DATA outdata;
2559         int32_t res;
2560         int ret;
2561         struct ctdb_control_list_tunable *t;
2562         char *p, *s, *ptr;
2563
2564         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2565                            mem_ctx, &outdata, &res, &timeout, NULL);
2566         if (ret != 0 || res != 0) {
2567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2568                 return -1;
2569         }
2570
2571         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2572         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2573             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2574                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2575                 talloc_free(outdata.dptr);
2576                 return -1;              
2577         }
2578         
2579         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2580         CTDB_NO_MEMORY(ctdb, p);
2581
2582         talloc_free(outdata.dptr);
2583         
2584         (*list) = NULL;
2585         (*count) = 0;
2586
2587         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2588                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2589                 CTDB_NO_MEMORY(ctdb, *list);
2590                 (*list)[*count] = talloc_strdup(*list, s);
2591                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2592                 (*count)++;
2593         }
2594
2595         talloc_free(p);
2596
2597         return 0;
2598 }
2599
2600
2601 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2602                                    struct timeval timeout, uint32_t destnode,
2603                                    TALLOC_CTX *mem_ctx,
2604                                    uint32_t flags,
2605                                    struct ctdb_all_public_ips **ips)
2606 {
2607         int ret;
2608         TDB_DATA outdata;
2609         int32_t res;
2610
2611         ret = ctdb_control(ctdb, destnode, 0, 
2612                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2613                            mem_ctx, &outdata, &res, &timeout, NULL);
2614         if (ret == 0 && res == -1) {
2615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2616                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2617         }
2618         if (ret != 0 || res != 0) {
2619           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2620                 return -1;
2621         }
2622
2623         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2624         talloc_free(outdata.dptr);
2625                     
2626         return 0;
2627 }
2628
2629 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2630                              struct timeval timeout, uint32_t destnode,
2631                              TALLOC_CTX *mem_ctx,
2632                              struct ctdb_all_public_ips **ips)
2633 {
2634         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2635                                               destnode, mem_ctx,
2636                                               0, ips);
2637 }
2638
2639 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2640                         struct timeval timeout, uint32_t destnode, 
2641                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2642 {
2643         int ret, i, len;
2644         TDB_DATA outdata;
2645         int32_t res;
2646         struct ctdb_all_public_ipsv4 *ipsv4;
2647
2648         ret = ctdb_control(ctdb, destnode, 0, 
2649                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2650                            mem_ctx, &outdata, &res, &timeout, NULL);
2651         if (ret != 0 || res != 0) {
2652                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2653                 return -1;
2654         }
2655
2656         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2657         len = offsetof(struct ctdb_all_public_ips, ips) +
2658                 ipsv4->num*sizeof(struct ctdb_public_ip);
2659         *ips = talloc_zero_size(mem_ctx, len);
2660         CTDB_NO_MEMORY(ctdb, *ips);
2661         (*ips)->num = ipsv4->num;
2662         for (i=0; i<ipsv4->num; i++) {
2663                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2664                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2665         }
2666
2667         talloc_free(outdata.dptr);
2668                     
2669         return 0;
2670 }
2671
2672 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2673                                  struct timeval timeout, uint32_t destnode,
2674                                  TALLOC_CTX *mem_ctx,
2675                                  const ctdb_sock_addr *addr,
2676                                  struct ctdb_control_public_ip_info **_info)
2677 {
2678         int ret;
2679         TDB_DATA indata;
2680         TDB_DATA outdata;
2681         int32_t res;
2682         struct ctdb_control_public_ip_info *info;
2683         uint32_t len;
2684         uint32_t i;
2685
2686         indata.dptr = discard_const_p(uint8_t, addr);
2687         indata.dsize = sizeof(*addr);
2688
2689         ret = ctdb_control(ctdb, destnode, 0,
2690                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2691                            mem_ctx, &outdata, &res, &timeout, NULL);
2692         if (ret != 0 || res != 0) {
2693                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2694                                 "failed ret:%d res:%d\n",
2695                                 ret, res));
2696                 return -1;
2697         }
2698
2699         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2700         if (len > outdata.dsize) {
2701                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2702                                 "returned invalid data with size %u > %u\n",
2703                                 (unsigned int)outdata.dsize,
2704                                 (unsigned int)len));
2705                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2706                 return -1;
2707         }
2708
2709         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2710         len += info->num*sizeof(struct ctdb_control_iface_info);
2711
2712         if (len > outdata.dsize) {
2713                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2714                                 "returned invalid data with size %u > %u\n",
2715                                 (unsigned int)outdata.dsize,
2716                                 (unsigned int)len));
2717                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2718                 return -1;
2719         }
2720
2721         /* make sure we null terminate the returned strings */
2722         for (i=0; i < info->num; i++) {
2723                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2724         }
2725
2726         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2727                                                                 outdata.dptr,
2728                                                                 outdata.dsize);
2729         talloc_free(outdata.dptr);
2730         if (*_info == NULL) {
2731                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2732                                 "talloc_memdup size %u failed\n",
2733                                 (unsigned int)outdata.dsize));
2734                 return -1;
2735         }
2736
2737         return 0;
2738 }
2739
2740 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2741                          struct timeval timeout, uint32_t destnode,
2742                          TALLOC_CTX *mem_ctx,
2743                          struct ctdb_control_get_ifaces **_ifaces)
2744 {
2745         int ret;
2746         TDB_DATA outdata;
2747         int32_t res;
2748         struct ctdb_control_get_ifaces *ifaces;
2749         uint32_t len;
2750         uint32_t i;
2751
2752         ret = ctdb_control(ctdb, destnode, 0,
2753                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2754                            mem_ctx, &outdata, &res, &timeout, NULL);
2755         if (ret != 0 || res != 0) {
2756                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2757                                 "failed ret:%d res:%d\n",
2758                                 ret, res));
2759                 return -1;
2760         }
2761
2762         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2763         if (len > outdata.dsize) {
2764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2765                                 "returned invalid data with size %u > %u\n",
2766                                 (unsigned int)outdata.dsize,
2767                                 (unsigned int)len));
2768                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2769                 return -1;
2770         }
2771
2772         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2773         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2774
2775         if (len > outdata.dsize) {
2776                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2777                                 "returned invalid data with size %u > %u\n",
2778                                 (unsigned int)outdata.dsize,
2779                                 (unsigned int)len));
2780                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2781                 return -1;
2782         }
2783
2784         /* make sure we null terminate the returned strings */
2785         for (i=0; i < ifaces->num; i++) {
2786                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2787         }
2788
2789         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2790                                                                   outdata.dptr,
2791                                                                   outdata.dsize);
2792         talloc_free(outdata.dptr);
2793         if (*_ifaces == NULL) {
2794                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2795                                 "talloc_memdup size %u failed\n",
2796                                 (unsigned int)outdata.dsize));
2797                 return -1;
2798         }
2799
2800         return 0;
2801 }
2802
2803 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2804                              struct timeval timeout, uint32_t destnode,
2805                              TALLOC_CTX *mem_ctx,
2806                              const struct ctdb_control_iface_info *info)
2807 {
2808         int ret;
2809         TDB_DATA indata;
2810         int32_t res;
2811
2812         indata.dptr = discard_const_p(uint8_t, info);
2813         indata.dsize = sizeof(*info);
2814
2815         ret = ctdb_control(ctdb, destnode, 0,
2816                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2817                            mem_ctx, NULL, &res, &timeout, NULL);
2818         if (ret != 0 || res != 0) {
2819                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2820                                 "failed ret:%d res:%d\n",
2821                                 ret, res));
2822                 return -1;
2823         }
2824
2825         return 0;
2826 }
2827
2828 /*
2829   set/clear the permanent disabled bit on a remote node
2830  */
2831 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2832                        uint32_t set, uint32_t clear)
2833 {
2834         int ret;
2835         TDB_DATA data;
2836         struct ctdb_node_map *nodemap=NULL;
2837         struct ctdb_node_flag_change c;
2838         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2839         uint32_t recmaster;
2840         uint32_t *nodes;
2841
2842
2843         /* find the recovery master */
2844         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2845         if (ret != 0) {
2846                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2847                 talloc_free(tmp_ctx);
2848                 return ret;
2849         }
2850
2851
2852         /* read the node flags from the recmaster */
2853         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2854         if (ret != 0) {
2855                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2856                 talloc_free(tmp_ctx);
2857                 return -1;
2858         }
2859         if (destnode >= nodemap->num) {
2860                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2861                 talloc_free(tmp_ctx);
2862                 return -1;
2863         }
2864
2865         c.pnn       = destnode;
2866         c.old_flags = nodemap->nodes[destnode].flags;
2867         c.new_flags = c.old_flags;
2868         c.new_flags |= set;
2869         c.new_flags &= ~clear;
2870
2871         data.dsize = sizeof(c);
2872         data.dptr = (unsigned char *)&c;
2873
2874         /* send the flags update to all connected nodes */
2875         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2876
2877         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2878                                         nodes, 0,
2879                                         timeout, false, data,
2880                                         NULL, NULL,
2881                                         NULL) != 0) {
2882                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2883
2884                 talloc_free(tmp_ctx);
2885                 return -1;
2886         }
2887
2888         talloc_free(tmp_ctx);
2889         return 0;
2890 }
2891
2892
2893 /*
2894   get all tunables
2895  */
2896 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2897                                struct timeval timeout, 
2898                                uint32_t destnode,
2899                                struct ctdb_tunable *tunables)
2900 {
2901         TDB_DATA outdata;
2902         int ret;
2903         int32_t res;
2904
2905         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2906                            &outdata, &res, &timeout, NULL);
2907         if (ret != 0 || res != 0) {
2908                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2909                 return -1;
2910         }
2911
2912         if (outdata.dsize != sizeof(*tunables)) {
2913                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2914                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2915                 return -1;              
2916         }
2917
2918         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2919         talloc_free(outdata.dptr);
2920         return 0;
2921 }
2922
2923 /*
2924   add a public address to a node
2925  */
2926 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2927                       struct timeval timeout, 
2928                       uint32_t destnode,
2929                       struct ctdb_control_ip_iface *pub)
2930 {
2931         TDB_DATA data;
2932         int32_t res;
2933         int ret;
2934
2935         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2936         data.dptr  = (unsigned char *)pub;
2937
2938         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2939                            NULL, &res, &timeout, NULL);
2940         if (ret != 0 || res != 0) {
2941                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2942                 return -1;
2943         }
2944
2945         return 0;
2946 }
2947
2948 /*
2949   delete a public address from a node
2950  */
2951 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2952                       struct timeval timeout, 
2953                       uint32_t destnode,
2954                       struct ctdb_control_ip_iface *pub)
2955 {
2956         TDB_DATA data;
2957         int32_t res;
2958         int ret;
2959
2960         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2961         data.dptr  = (unsigned char *)pub;
2962
2963         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2964                            NULL, &res, &timeout, NULL);
2965         if (ret != 0 || res != 0) {
2966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2967                 return -1;
2968         }
2969
2970         return 0;
2971 }
2972
2973 /*
2974   kill a tcp connection
2975  */
2976 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2977                       struct timeval timeout, 
2978                       uint32_t destnode,
2979                       struct ctdb_control_killtcp *killtcp)
2980 {
2981         TDB_DATA data;
2982         int32_t res;
2983         int ret;
2984
2985         data.dsize = sizeof(struct ctdb_control_killtcp);
2986         data.dptr  = (unsigned char *)killtcp;
2987
2988         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2989                            NULL, &res, &timeout, NULL);
2990         if (ret != 0 || res != 0) {
2991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2992                 return -1;
2993         }
2994
2995         return 0;
2996 }
2997
2998 /*
2999   send a gratious arp
3000  */
3001 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3002                       struct timeval timeout, 
3003                       uint32_t destnode,
3004                       ctdb_sock_addr *addr,
3005                       const char *ifname)
3006 {
3007         TDB_DATA data;
3008         int32_t res;
3009         int ret, len;
3010         struct ctdb_control_gratious_arp *gratious_arp;
3011         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3012
3013
3014         len = strlen(ifname)+1;
3015         gratious_arp = talloc_size(tmp_ctx, 
3016                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3017         CTDB_NO_MEMORY(ctdb, gratious_arp);
3018
3019         gratious_arp->addr = *addr;
3020         gratious_arp->len = len;
3021         memcpy(&gratious_arp->iface[0], ifname, len);
3022
3023
3024         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3025         data.dptr  = (unsigned char *)gratious_arp;
3026
3027         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3028                            NULL, &res, &timeout, NULL);
3029         if (ret != 0 || res != 0) {
3030                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3031                 talloc_free(tmp_ctx);
3032                 return -1;
3033         }
3034
3035         talloc_free(tmp_ctx);
3036         return 0;
3037 }
3038
3039 /*
3040   get a list of all tcp tickles that a node knows about for a particular vnn
3041  */
3042 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3043                               struct timeval timeout, uint32_t destnode, 
3044                               TALLOC_CTX *mem_ctx, 
3045                               ctdb_sock_addr *addr,
3046                               struct ctdb_control_tcp_tickle_list **list)
3047 {
3048         int ret;
3049         TDB_DATA data, outdata;
3050         int32_t status;
3051
3052         data.dptr = (uint8_t*)addr;
3053         data.dsize = sizeof(ctdb_sock_addr);
3054
3055         ret = ctdb_control(ctdb, destnode, 0, 
3056                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3057                            mem_ctx, &outdata, &status, NULL, NULL);
3058         if (ret != 0 || status != 0) {
3059                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3060                 return -1;
3061         }
3062
3063         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3064
3065         return status;
3066 }
3067
3068 /*
3069   register a server id
3070  */
3071 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3072                       struct timeval timeout, 
3073                       struct ctdb_server_id *id)
3074 {
3075         TDB_DATA data;
3076         int32_t res;
3077         int ret;
3078
3079         data.dsize = sizeof(struct ctdb_server_id);
3080         data.dptr  = (unsigned char *)id;
3081
3082         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3083                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3084                         0, data, NULL,
3085                         NULL, &res, &timeout, NULL);
3086         if (ret != 0 || res != 0) {
3087                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3088                 return -1;
3089         }
3090
3091         return 0;
3092 }
3093
3094 /*
3095   unregister a server id
3096  */
3097 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3098                       struct timeval timeout, 
3099                       struct ctdb_server_id *id)
3100 {
3101         TDB_DATA data;
3102         int32_t res;
3103         int ret;
3104
3105         data.dsize = sizeof(struct ctdb_server_id);
3106         data.dptr  = (unsigned char *)id;
3107
3108         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3109                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3110                         0, data, NULL,
3111                         NULL, &res, &timeout, NULL);
3112         if (ret != 0 || res != 0) {
3113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3114                 return -1;
3115         }
3116
3117         return 0;
3118 }
3119
3120
3121 /*
3122   check if a server id exists
3123
3124   if a server id does exist, return *status == 1, otherwise *status == 0
3125  */
3126 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3127                       struct timeval timeout, 
3128                       uint32_t destnode,
3129                       struct ctdb_server_id *id,
3130                       uint32_t *status)
3131 {
3132         TDB_DATA data;
3133         int32_t res;
3134         int ret;
3135
3136         data.dsize = sizeof(struct ctdb_server_id);
3137         data.dptr  = (unsigned char *)id;
3138
3139         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3140                         0, data, NULL,
3141                         NULL, &res, &timeout, NULL);
3142         if (ret != 0) {
3143                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3144                 return -1;
3145         }
3146
3147         if (res) {
3148                 *status = 1;
3149         } else {
3150                 *status = 0;
3151         }
3152
3153         return 0;
3154 }
3155
3156 /*
3157    get the list of server ids that are registered on a node
3158 */
3159 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3160                 TALLOC_CTX *mem_ctx,
3161                 struct timeval timeout, uint32_t destnode, 
3162                 struct ctdb_server_id_list **svid_list)
3163 {
3164         int ret;
3165         TDB_DATA outdata;
3166         int32_t res;
3167
3168         ret = ctdb_control(ctdb, destnode, 0, 
3169                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3170                            mem_ctx, &outdata, &res, &timeout, NULL);
3171         if (ret != 0 || res != 0) {
3172                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3173                 return -1;
3174         }
3175
3176         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3177                     
3178         return 0;
3179 }
3180
3181 /*
3182   initialise the ctdb daemon for client applications
3183
3184   NOTE: In current code the daemon does not fork. This is for testing purposes only
3185   and to simplify the code.
3186 */
3187 struct ctdb_context *ctdb_init(struct event_context *ev)
3188 {
3189         int ret;
3190         struct ctdb_context *ctdb;
3191
3192         ctdb = talloc_zero(ev, struct ctdb_context);
3193         if (ctdb == NULL) {
3194                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3195                 return NULL;
3196         }
3197         ctdb->ev  = ev;
3198         ctdb->idr = idr_init(ctdb);
3199         /* Wrap early to exercise code. */
3200         ctdb->lastid = INT_MAX-200;
3201         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3202
3203         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3204         if (ret != 0) {
3205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3206                 talloc_free(ctdb);
3207                 return NULL;
3208         }
3209
3210         ctdb->statistics.statistics_start_time = timeval_current();
3211
3212         return ctdb;
3213 }
3214
3215
3216 /*
3217   set some ctdb flags
3218 */
3219 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3220 {
3221         ctdb->flags |= flags;
3222 }
3223
3224 /*
3225   setup the local socket name
3226 */
3227 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3228 {
3229         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3230         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3231
3232         return 0;
3233 }
3234
3235 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3236 {
3237         return ctdb->daemon.name;
3238 }
3239
3240 /*
3241   return the pnn of this node
3242 */
3243 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3244 {
3245         return ctdb->pnn;
3246 }
3247
3248
3249 /*
3250   get the uptime of a remote node
3251  */
3252 struct ctdb_client_control_state *
3253 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3254 {
3255         return ctdb_control_send(ctdb, destnode, 0, 
3256                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3257                            mem_ctx, &timeout, NULL);
3258 }
3259
3260 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3261 {
3262         int ret;
3263         int32_t res;
3264         TDB_DATA outdata;
3265
3266         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3267         if (ret != 0 || res != 0) {
3268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3269                 return -1;
3270         }
3271
3272         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3273
3274         return 0;
3275 }
3276
3277 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3278 {
3279         struct ctdb_client_control_state *state;
3280
3281         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3282         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3283 }
3284
3285 /*
3286   send a control to execute the "recovered" event script on a node
3287  */
3288 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3289 {
3290         int ret;
3291         int32_t status;
3292
3293         ret = ctdb_control(ctdb, destnode, 0, 
3294                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3295                            NULL, NULL, &status, &timeout, NULL);
3296         if (ret != 0 || status != 0) {
3297                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3298                 return -1;
3299         }
3300
3301         return 0;
3302 }
3303
3304 /* 
3305   callback for the async helpers used when sending the same control
3306   to multiple nodes in parallell.
3307 */
3308 static void async_callback(struct ctdb_client_control_state *state)
3309 {
3310         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3311         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3312         int ret;
3313         TDB_DATA outdata;
3314         int32_t res;
3315         uint32_t destnode = state->c->hdr.destnode;
3316
3317         /* one more node has responded with recmode data */
3318         data->count--;
3319
3320         /* if we failed to push the db, then return an error and let
3321            the main loop try again.
3322         */
3323         if (state->state != CTDB_CONTROL_DONE) {
3324                 if ( !data->dont_log_errors) {
3325                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3326                 }
3327                 data->fail_count++;
3328                 if (data->fail_callback) {
3329                         data->fail_callback(ctdb, destnode, res, outdata,
3330                                         data->callback_data);
3331                 }
3332                 return;
3333         }
3334         
3335         state->async.fn = NULL;
3336
3337         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3338         if ((ret != 0) || (res != 0)) {
3339                 if ( !data->dont_log_errors) {
3340                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3341                 }
3342                 data->fail_count++;
3343                 if (data->fail_callback) {
3344                         data->fail_callback(ctdb, destnode, res, outdata,
3345                                         data->callback_data);
3346                 }
3347         }
3348         if ((ret == 0) && (data->callback != NULL)) {
3349                 data->callback(ctdb, destnode, res, outdata,
3350                                         data->callback_data);
3351         }
3352 }
3353
3354
3355 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3356 {
3357         /* set up the callback functions */
3358         state->async.fn = async_callback;
3359         state->async.private_data = data;
3360         
3361         /* one more control to wait for to complete */
3362         data->count++;
3363 }
3364
3365
3366 /* wait for up to the maximum number of seconds allowed
3367    or until all nodes we expect a response from has replied
3368 */
3369 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3370 {
3371         while (data->count > 0) {
3372                 event_loop_once(ctdb->ev);
3373         }
3374         if (data->fail_count != 0) {
3375                 if (!data->dont_log_errors) {
3376                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3377                                  data->fail_count));
3378                 }
3379                 return -1;
3380         }
3381         return 0;
3382 }
3383
3384
3385 /* 
3386    perform a simple control on the listed nodes
3387    The control cannot return data
3388  */
3389 int ctdb_client_async_control(struct ctdb_context *ctdb,
3390                                 enum ctdb_controls opcode,
3391                                 uint32_t *nodes,
3392                                 uint64_t srvid,
3393                                 struct timeval timeout,
3394                                 bool dont_log_errors,
3395                                 TDB_DATA data,
3396                                 client_async_callback client_callback,
3397                                 client_async_callback fail_callback,
3398                                 void *callback_data)
3399 {
3400         struct client_async_data *async_data;
3401         struct ctdb_client_control_state *state;
3402         int j, num_nodes;
3403
3404         async_data = talloc_zero(ctdb, struct client_async_data);
3405         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3406         async_data->dont_log_errors = dont_log_errors;
3407         async_data->callback = client_callback;
3408         async_data->fail_callback = fail_callback;
3409         async_data->callback_data = callback_data;
3410         async_data->opcode        = opcode;
3411
3412         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3413
3414         /* loop over all nodes and send an async control to each of them */
3415         for (j=0; j<num_nodes; j++) {
3416                 uint32_t pnn = nodes[j];
3417
3418                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3419                                           0, data, async_data, &timeout, NULL);
3420                 if (state == NULL) {
3421                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3422                         talloc_free(async_data);
3423                         return -1;
3424                 }
3425                 
3426                 ctdb_client_async_add(async_data, state);
3427         }
3428
3429         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3430                 talloc_free(async_data);
3431                 return -1;
3432         }
3433
3434         talloc_free(async_data);
3435         return 0;
3436 }
3437
3438 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3439                                 struct ctdb_vnn_map *vnn_map,
3440                                 TALLOC_CTX *mem_ctx,
3441                                 bool include_self)
3442 {
3443         int i, j, num_nodes;
3444         uint32_t *nodes;
3445
3446         for (i=num_nodes=0;i<vnn_map->size;i++) {
3447                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3448                         continue;
3449                 }
3450                 num_nodes++;
3451         } 
3452
3453         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3454         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3455
3456         for (i=j=0;i<vnn_map->size;i++) {
3457                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3458                         continue;
3459                 }
3460                 nodes[j++] = vnn_map->map[i];
3461         } 
3462
3463         return nodes;
3464 }
3465
3466 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3467                                 struct ctdb_node_map *node_map,
3468                                 TALLOC_CTX *mem_ctx,
3469                                 bool include_self)
3470 {
3471         int i, j, num_nodes;
3472         uint32_t *nodes;
3473
3474         for (i=num_nodes=0;i<node_map->num;i++) {
3475                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3476                         continue;
3477                 }
3478                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3479                         continue;
3480                 }
3481                 num_nodes++;
3482         } 
3483
3484         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3485         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3486
3487         for (i=j=0;i<node_map->num;i++) {
3488                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3489                         continue;
3490                 }
3491                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3492                         continue;
3493                 }
3494                 nodes[j++] = node_map->nodes[i].pnn;
3495         } 
3496
3497         return nodes;
3498 }
3499
3500 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3501                                 struct ctdb_node_map *node_map,
3502                                 TALLOC_CTX *mem_ctx,
3503                                 uint32_t pnn)
3504 {
3505         int i, j, num_nodes;
3506         uint32_t *nodes;
3507
3508         for (i=num_nodes=0;i<node_map->num;i++) {
3509                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3510                         continue;
3511                 }
3512                 if (node_map->nodes[i].pnn == pnn) {
3513                         continue;
3514                 }
3515                 num_nodes++;
3516         } 
3517
3518         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3519         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3520
3521         for (i=j=0;i<node_map->num;i++) {
3522                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3523                         continue;
3524                 }
3525                 if (node_map->nodes[i].pnn == pnn) {
3526                         continue;
3527                 }
3528                 nodes[j++] = node_map->nodes[i].pnn;
3529         } 
3530
3531         return nodes;
3532 }
3533
3534 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3535                                 struct ctdb_node_map *node_map,
3536                                 TALLOC_CTX *mem_ctx,
3537                                 bool include_self)
3538 {
3539         int i, j, num_nodes;
3540         uint32_t *nodes;
3541
3542         for (i=num_nodes=0;i<node_map->num;i++) {
3543                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3544                         continue;
3545                 }
3546                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3547                         continue;
3548                 }
3549                 num_nodes++;
3550         } 
3551
3552         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3553         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3554
3555         for (i=j=0;i<node_map->num;i++) {
3556                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3557                         continue;
3558                 }
3559                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3560                         continue;
3561                 }
3562                 nodes[j++] = node_map->nodes[i].pnn;
3563         } 
3564
3565         return nodes;
3566 }
3567
3568 /* 
3569   this is used to test if a pnn lock exists and if it exists will return
3570   the number of connections that pnn has reported or -1 if that recovery
3571   daemon is not running.
3572 */
3573 int
3574 ctdb_read_pnn_lock(int fd, int32_t pnn)
3575 {
3576         struct flock lock;
3577         char c;
3578
3579         lock.l_type = F_WRLCK;
3580         lock.l_whence = SEEK_SET;
3581         lock.l_start = pnn;
3582         lock.l_len = 1;
3583         lock.l_pid = 0;
3584
3585         if (fcntl(fd, F_GETLK, &lock) != 0) {
3586                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3587                 return -1;
3588         }
3589
3590         if (lock.l_type == F_UNLCK) {
3591                 return -1;
3592         }
3593
3594         if (pread(fd, &c, 1, pnn) == -1) {
3595                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3596                 return -1;
3597         }
3598
3599         return c;
3600 }
3601
3602 /*
3603   get capabilities of a remote node
3604  */
3605 struct ctdb_client_control_state *
3606 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3607 {
3608         return ctdb_control_send(ctdb, destnode, 0, 
3609                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3610                            mem_ctx, &timeout, NULL);
3611 }
3612
3613 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3614 {
3615         int ret;
3616         int32_t res;
3617         TDB_DATA outdata;
3618
3619         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3620         if ( (ret != 0) || (res != 0) ) {
3621                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3622                 return -1;
3623         }
3624
3625         if (capabilities) {
3626                 *capabilities = *((uint32_t *)outdata.dptr);
3627         }
3628
3629         return 0;
3630 }
3631
3632 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3633 {
3634         struct ctdb_client_control_state *state;
3635         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3636         int ret;
3637
3638         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3639         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3640         talloc_free(tmp_ctx);
3641         return ret;
3642 }
3643
3644 /**
3645  * check whether a transaction is active on a given db on a given node
3646  */
3647 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3648                                      uint32_t destnode,
3649                                      uint32_t db_id)
3650 {
3651         int32_t status;
3652         int ret;
3653         TDB_DATA indata;
3654
3655         indata.dptr = (uint8_t *)&db_id;
3656         indata.dsize = sizeof(db_id);
3657
3658         ret = ctdb_control(ctdb, destnode, 0,
3659                            CTDB_CONTROL_TRANS2_ACTIVE,
3660                            0, indata, NULL, NULL, &status,
3661                            NULL, NULL);
3662
3663         if (ret != 0) {
3664                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3665                 return -1;
3666         }
3667
3668         return status;
3669 }
3670
3671
3672 struct ctdb_transaction_handle {
3673         struct ctdb_db_context *ctdb_db;
3674         bool in_replay;
3675         /*
3676          * we store the reads and writes done under a transaction:
3677          * - one list stores both reads and writes (m_all),
3678          * - the other just writes (m_write)
3679          */
3680         struct ctdb_marshall_buffer *m_all;
3681         struct ctdb_marshall_buffer *m_write;
3682 };
3683
3684 /* start a transaction on a database */
3685 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3686 {
3687         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3688         return 0;
3689 }
3690
3691 /* start a transaction on a database */
3692 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3693 {
3694         struct ctdb_record_handle *rh;
3695         TDB_DATA key;
3696         TDB_DATA data;
3697         struct ctdb_ltdb_header header;
3698         TALLOC_CTX *tmp_ctx;
3699         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3700         int ret;
3701         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3702         pid_t pid;
3703         int32_t status;
3704
3705         key.dptr = discard_const(keyname);
3706         key.dsize = strlen(keyname);
3707
3708         if (!ctdb_db->persistent) {
3709                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3710                 return -1;
3711         }
3712
3713 again:
3714         tmp_ctx = talloc_new(h);
3715
3716         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3717         if (rh == NULL) {
3718                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3719                 talloc_free(tmp_ctx);
3720                 return -1;
3721         }
3722
3723         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3724                                               CTDB_CURRENT_NODE,
3725                                               ctdb_db->db_id);
3726         if (status == 1) {
3727                 unsigned long int usec = (1000 + random()) % 100000;
3728                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3729                                     "on db_id[0x%08x]. waiting for %lu "
3730                                     "microseconds\n",
3731                                     ctdb_db->db_id, usec));
3732                 talloc_free(tmp_ctx);
3733                 usleep(usec);
3734                 goto again;
3735         }
3736
3737         /*
3738          * store the pid in the database:
3739          * it is not enough that the node is dmaster...
3740          */
3741         pid = getpid();
3742         data.dptr = (unsigned char *)&pid;
3743         data.dsize = sizeof(pid_t);
3744         rh->header.rsn++;
3745         rh->header.dmaster = ctdb_db->ctdb->pnn;
3746         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3747         if (ret != 0) {
3748                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3749                                   "transaction record\n"));
3750                 talloc_free(tmp_ctx);
3751                 return -1;
3752         }
3753
3754         talloc_free(rh);
3755
3756         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3757         if (ret != 0) {
3758                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3759                 talloc_free(tmp_ctx);
3760                 return -1;
3761         }
3762
3763         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3764         if (ret != 0) {
3765                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3766                                  "lock record inside transaction\n"));
3767                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3768                 talloc_free(tmp_ctx);
3769                 goto again;
3770         }
3771
3772         if (header.dmaster != ctdb_db->ctdb->pnn) {
3773                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3774                                    "transaction lock record\n"));
3775                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3776                 talloc_free(tmp_ctx);
3777                 goto again;
3778         }
3779
3780         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3781                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3782                                     "the transaction lock record\n"));
3783                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3784                 talloc_free(tmp_ctx);
3785                 goto again;
3786         }
3787
3788         talloc_free(tmp_ctx);
3789
3790         return 0;
3791 }
3792
3793
3794 /* start a transaction on a database */
3795 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3796                                                        TALLOC_CTX *mem_ctx)
3797 {
3798         struct ctdb_transaction_handle *h;
3799         int ret;
3800
3801         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3802         if (h == NULL) {
3803                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3804                 return NULL;
3805         }
3806
3807         h->ctdb_db = ctdb_db;
3808
3809         ret = ctdb_transaction_fetch_start(h);
3810         if (ret != 0) {
3811                 talloc_free(h);
3812                 return NULL;
3813         }
3814
3815         talloc_set_destructor(h, ctdb_transaction_destructor);
3816
3817         return h;
3818 }
3819
3820
3821
3822 /*
3823   fetch a record inside a transaction
3824  */
3825 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3826                            TALLOC_CTX *mem_ctx, 
3827                            TDB_DATA key, TDB_DATA *data)
3828 {
3829         struct ctdb_ltdb_header header;
3830         int ret;
3831
3832         ZERO_STRUCT(header);
3833
3834         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3835         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3836                 /* record doesn't exist yet */
3837                 *data = tdb_null;
3838                 ret = 0;
3839         }
3840         
3841         if (ret != 0) {
3842                 return ret;
3843         }
3844
3845         if (!h->in_replay) {
3846                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3847                 if (h->m_all == NULL) {
3848                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3849                         return -1;
3850                 }
3851         }
3852
3853         return 0;
3854 }
3855
3856 /*
3857   stores a record inside a transaction
3858  */
3859 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3860                            TDB_DATA key, TDB_DATA data)
3861 {
3862         TALLOC_CTX *tmp_ctx = talloc_new(h);
3863         struct ctdb_ltdb_header header;
3864         TDB_DATA olddata;
3865         int ret;
3866
3867         ZERO_STRUCT(header);
3868
3869         /* we need the header so we can update the RSN */
3870         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3871         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3872                 /* the record doesn't exist - create one with us as dmaster.
3873                    This is only safe because we are in a transaction and this
3874                    is a persistent database */
3875                 ZERO_STRUCT(header);
3876         } else if (ret != 0) {
3877                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3878                 talloc_free(tmp_ctx);
3879                 return ret;
3880         }
3881
3882         if (data.dsize == olddata.dsize &&
3883             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3884                 /* save writing the same data */
3885                 talloc_free(tmp_ctx);
3886                 return 0;
3887         }
3888
3889         header.dmaster = h->ctdb_db->ctdb->pnn;
3890         header.rsn++;
3891
3892         if (!h->in_replay) {
3893                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3894                 if (h->m_all == NULL) {
3895                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3896                         talloc_free(tmp_ctx);
3897                         return -1;
3898                 }
3899         }               
3900
3901         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3902         if (h->m_write == NULL) {
3903                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3904                 talloc_free(tmp_ctx);
3905                 return -1;
3906         }
3907         
3908         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3909
3910         talloc_free(tmp_ctx);
3911         
3912         return ret;
3913 }
3914
3915 /*
3916   replay a transaction
3917  */
3918 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3919 {
3920         int ret, i;
3921         struct ctdb_rec_data *rec = NULL;
3922
3923         h->in_replay = true;
3924         talloc_free(h->m_write);
3925         h->m_write = NULL;
3926
3927         ret = ctdb_transaction_fetch_start(h);
3928         if (ret != 0) {
3929                 return ret;
3930         }
3931
3932         for (i=0;i<h->m_all->count;i++) {
3933                 TDB_DATA key, data;
3934
3935                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3936                 if (rec == NULL) {
3937                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3938                         goto failed;
3939                 }
3940
3941                 if (rec->reqid == 0) {
3942                         /* its a store */
3943                         if (ctdb_transaction_store(h, key, data) != 0) {
3944                                 goto failed;
3945                         }
3946                 } else {
3947                         TDB_DATA data2;
3948                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3949
3950                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3951                                 talloc_free(tmp_ctx);
3952                                 goto failed;
3953                         }
3954                         if (data2.dsize != data.dsize ||
3955                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3956                                 /* the record has changed on us - we have to give up */
3957                                 talloc_free(tmp_ctx);
3958                                 goto failed;
3959                         }
3960                         talloc_free(tmp_ctx);
3961                 }
3962         }
3963         
3964         return 0;
3965
3966 failed:
3967         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3968         return -1;
3969 }
3970
3971
3972 /*
3973   commit a transaction
3974  */
3975 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3976 {
3977         int ret, retries=0;
3978         int32_t status;
3979         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3980         struct timeval timeout;
3981         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3982
3983         talloc_set_destructor(h, NULL);
3984
3985         /* our commit strategy is quite complex.
3986
3987            - we first try to commit the changes to all other nodes
3988
3989            - if that works, then we commit locally and we are done
3990
3991            - if a commit on another node fails, then we need to cancel
3992              the transaction, then restart the transaction (thus
3993              opening a window of time for a pending recovery to
3994              complete), then replay the transaction, checking all the
3995              reads and writes (checking that reads give the same data,
3996              and writes succeed). Then we retry the transaction to the
3997              other nodes
3998         */
3999
4000 again:
4001         if (h->m_write == NULL) {
4002                 /* no changes were made */
4003                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4004                 talloc_free(h);
4005                 return 0;
4006         }
4007
4008         /* tell ctdbd to commit to the other nodes */
4009         timeout = timeval_current_ofs(1, 0);
4010         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4011                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4012                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4013                            &timeout, NULL);
4014         if (ret != 0 || status != 0) {
4015                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4016                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4017                                      ", retrying after 1 second...\n",
4018                                      (retries==0)?"":"retry "));
4019                 sleep(1);
4020
4021                 if (ret != 0) {
4022                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4023                 } else {
4024                         /* work out what error code we will give if we 
4025                            have to fail the operation */
4026                         switch ((enum ctdb_trans2_commit_error)status) {
4027                         case CTDB_TRANS2_COMMIT_SUCCESS:
4028                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4029                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4030                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4031                                 break;
4032                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4033                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4034                                 break;
4035                         }
4036                 }
4037
4038                 if (++retries == 100) {
4039                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4040                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4041                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4042                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4043                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4044                         talloc_free(h);
4045                         return -1;
4046                 }               
4047
4048                 if (ctdb_replay_transaction(h) != 0) {
4049                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4050                                           "transaction on db 0x%08x, "
4051                                           "failure control =%u\n",
4052                                           h->ctdb_db->db_id,
4053                                           (unsigned)failure_control));
4054                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4055                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4056                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4057                         talloc_free(h);
4058                         return -1;
4059                 }
4060                 goto again;
4061         } else {
4062                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4063         }
4064
4065         /* do the real commit locally */
4066         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4067         if (ret != 0) {
4068                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4069                                   "on db id 0x%08x locally, "
4070                                   "failure_control=%u\n",
4071                                   h->ctdb_db->db_id,
4072                                   (unsigned)failure_control));
4073                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4074                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4075                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4076                 talloc_free(h);
4077                 return ret;
4078         }
4079
4080         /* tell ctdbd that we are finished with our local commit */
4081         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4082                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4083                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4084         talloc_free(h);
4085         return 0;
4086 }
4087
4088 /*
4089   recovery daemon ping to main daemon
4090  */
4091 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4092 {
4093         int ret;
4094         int32_t res;
4095
4096         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4097                            ctdb, NULL, &res, NULL, NULL);
4098         if (ret != 0 || res != 0) {
4099                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4100                 return -1;
4101         }
4102
4103         return 0;
4104 }
4105
4106 /* when forking the main daemon and the child process needs to connect back
4107  * to the daemon as a client process, this function can be used to change
4108  * the ctdb context from daemon into client mode
4109  */
4110 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4111 {
4112         int ret;
4113         va_list ap;
4114
4115         /* Add extra information so we can identify this in the logs */
4116         va_start(ap, fmt);
4117         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4118         va_end(ap);
4119
4120         /* shutdown the transport */
4121         if (ctdb->methods) {
4122                 ctdb->methods->shutdown(ctdb);
4123         }
4124
4125         /* get a new event context */
4126         talloc_free(ctdb->ev);
4127         ctdb->ev = event_context_init(ctdb);
4128         tevent_loop_allow_nesting(ctdb->ev);
4129
4130         close(ctdb->daemon.sd);
4131         ctdb->daemon.sd = -1;
4132
4133         /* the client does not need to be realtime */
4134         if (ctdb->do_setsched) {
4135                 ctdb_restore_scheduler(ctdb);
4136         }
4137
4138         /* initialise ctdb */
4139         ret = ctdb_socket_connect(ctdb);
4140         if (ret != 0) {
4141                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4142                 return -1;
4143         }
4144
4145          return 0;
4146 }
4147
4148 /*
4149   get the status of running the monitor eventscripts: NULL means never run.
4150  */
4151 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4152                 struct timeval timeout, uint32_t destnode, 
4153                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4154                 struct ctdb_scripts_wire **script_status)
4155 {
4156         int ret;
4157         TDB_DATA outdata, indata;
4158         int32_t res;
4159         uint32_t uinttype = type;
4160
4161         indata.dptr = (uint8_t *)&uinttype;
4162         indata.dsize = sizeof(uinttype);
4163
4164         ret = ctdb_control(ctdb, destnode, 0, 
4165                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4166                            mem_ctx, &outdata, &res, &timeout, NULL);
4167         if (ret != 0 || res != 0) {
4168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4169                 return -1;
4170         }
4171
4172         if (outdata.dsize == 0) {
4173                 *script_status = NULL;
4174         } else {
4175                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4176                 talloc_free(outdata.dptr);
4177         }
4178                     
4179         return 0;
4180 }
4181
4182 /*
4183   tell the main daemon how long it took to lock the reclock file
4184  */
4185 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4186 {
4187         int ret;
4188         int32_t res;
4189         TDB_DATA data;
4190
4191         data.dptr = (uint8_t *)&latency;
4192         data.dsize = sizeof(latency);
4193
4194         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4195                            ctdb, NULL, &res, NULL, NULL);
4196         if (ret != 0 || res != 0) {
4197                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4198                 return -1;
4199         }
4200
4201         return 0;
4202 }
4203
4204 /*
4205   get the name of the reclock file
4206  */
4207 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4208                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4209                          const char **name)
4210 {
4211         int ret;
4212         int32_t res;
4213         TDB_DATA data;
4214
4215         ret = ctdb_control(ctdb, destnode, 0, 
4216                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4217                            mem_ctx, &data, &res, &timeout, NULL);
4218         if (ret != 0 || res != 0) {
4219                 return -1;
4220         }
4221
4222         if (data.dsize == 0) {
4223                 *name = NULL;
4224         } else {
4225                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4226         }
4227         talloc_free(data.dptr);
4228
4229         return 0;
4230 }
4231
4232 /*
4233   set the reclock filename for a node
4234  */
4235 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4236 {
4237         int ret;
4238         TDB_DATA data;
4239         int32_t res;
4240
4241         if (reclock == NULL) {
4242                 data.dsize = 0;
4243                 data.dptr  = NULL;
4244         } else {
4245                 data.dsize = strlen(reclock) + 1;
4246                 data.dptr  = discard_const(reclock);
4247         }
4248
4249         ret = ctdb_control(ctdb, destnode, 0, 
4250                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4251                            NULL, NULL, &res, &timeout, NULL);
4252         if (ret != 0 || res != 0) {
4253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4254                 return -1;
4255         }
4256
4257         return 0;
4258 }
4259
4260 /*
4261   stop a node
4262  */
4263 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4264 {
4265         int ret;
4266         int32_t res;
4267
4268         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4269                            ctdb, NULL, &res, &timeout, NULL);
4270         if (ret != 0 || res != 0) {
4271                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4272                 return -1;
4273         }
4274
4275         return 0;
4276 }
4277
4278 /*
4279   continue a node
4280  */
4281 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4282 {
4283         int ret;
4284
4285         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4286                            ctdb, NULL, NULL, &timeout, NULL);
4287         if (ret != 0) {
4288                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4289                 return -1;
4290         }
4291
4292         return 0;
4293 }
4294
4295 /*
4296   set the natgw state for a node
4297  */
4298 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4299 {
4300         int ret;
4301         TDB_DATA data;
4302         int32_t res;
4303
4304         data.dsize = sizeof(natgwstate);
4305         data.dptr  = (uint8_t *)&natgwstate;
4306
4307         ret = ctdb_control(ctdb, destnode, 0, 
4308                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4309                            NULL, NULL, &res, &timeout, NULL);
4310         if (ret != 0 || res != 0) {
4311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4312                 return -1;
4313         }
4314
4315         return 0;
4316 }
4317
4318 /*
4319   set the lmaster role for a node
4320  */
4321 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4322 {
4323         int ret;
4324         TDB_DATA data;
4325         int32_t res;
4326
4327         data.dsize = sizeof(lmasterrole);
4328         data.dptr  = (uint8_t *)&lmasterrole;
4329
4330         ret = ctdb_control(ctdb, destnode, 0, 
4331                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4332                            NULL, NULL, &res, &timeout, NULL);
4333         if (ret != 0 || res != 0) {
4334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4335                 return -1;
4336         }
4337
4338         return 0;
4339 }
4340
4341 /*
4342   set the recmaster role for a node
4343  */
4344 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4345 {
4346         int ret;
4347         TDB_DATA data;
4348         int32_t res;
4349
4350         data.dsize = sizeof(recmasterrole);
4351         data.dptr  = (uint8_t *)&recmasterrole;
4352
4353         ret = ctdb_control(ctdb, destnode, 0, 
4354                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4355                            NULL, NULL, &res, &timeout, NULL);
4356         if (ret != 0 || res != 0) {
4357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4358                 return -1;
4359         }
4360
4361         return 0;
4362 }
4363
4364 /* enable an eventscript
4365  */
4366 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4367 {
4368         int ret;
4369         TDB_DATA data;
4370         int32_t res;
4371
4372         data.dsize = strlen(script) + 1;
4373         data.dptr  = discard_const(script);
4374
4375         ret = ctdb_control(ctdb, destnode, 0, 
4376                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4377                            NULL, NULL, &res, &timeout, NULL);
4378         if (ret != 0 || res != 0) {
4379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4380                 return -1;
4381         }
4382
4383         return 0;
4384 }
4385
4386 /* disable an eventscript
4387  */
4388 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4389 {
4390         int ret;
4391         TDB_DATA data;
4392         int32_t res;
4393
4394         data.dsize = strlen(script) + 1;
4395         data.dptr  = discard_const(script);
4396
4397         ret = ctdb_control(ctdb, destnode, 0, 
4398                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4399                            NULL, NULL, &res, &timeout, NULL);
4400         if (ret != 0 || res != 0) {
4401                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4402                 return -1;
4403         }
4404
4405         return 0;
4406 }
4407
4408
4409 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4410 {
4411         int ret;
4412         TDB_DATA data;
4413         int32_t res;
4414
4415         data.dsize = sizeof(*bantime);
4416         data.dptr  = (uint8_t *)bantime;
4417
4418         ret = ctdb_control(ctdb, destnode, 0, 
4419                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4420                            NULL, NULL, &res, &timeout, NULL);
4421         if (ret != 0 || res != 0) {
4422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4423                 return -1;
4424         }
4425
4426         return 0;
4427 }
4428
4429
4430 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4431 {
4432         int ret;
4433         TDB_DATA outdata;
4434         int32_t res;
4435         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4436
4437         ret = ctdb_control(ctdb, destnode, 0, 
4438                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4439                            tmp_ctx, &outdata, &res, &timeout, NULL);
4440         if (ret != 0 || res != 0) {
4441                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4442                 talloc_free(tmp_ctx);
4443                 return -1;
4444         }
4445
4446         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4447         talloc_free(tmp_ctx);
4448
4449         return 0;
4450 }
4451
4452
4453 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4454 {
4455         int ret;
4456         int32_t res;
4457         TDB_DATA data;
4458         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4459
4460         data.dptr = (uint8_t*)db_prio;
4461         data.dsize = sizeof(*db_prio);
4462
4463         ret = ctdb_control(ctdb, destnode, 0, 
4464                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4465                            tmp_ctx, NULL, &res, &timeout, NULL);
4466         if (ret != 0 || res != 0) {
4467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4468                 talloc_free(tmp_ctx);
4469                 return -1;
4470         }
4471
4472         talloc_free(tmp_ctx);
4473
4474         return 0;
4475 }
4476
4477 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4478 {
4479         int ret;
4480         int32_t res;
4481         TDB_DATA data;
4482         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4483
4484         data.dptr = (uint8_t*)&db_id;
4485         data.dsize = sizeof(db_id);
4486
4487         ret = ctdb_control(ctdb, destnode, 0, 
4488                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4489                            tmp_ctx, NULL, &res, &timeout, NULL);
4490         if (ret != 0 || res < 0) {
4491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4492                 talloc_free(tmp_ctx);
4493                 return -1;
4494         }
4495
4496         if (priority) {
4497                 *priority = res;
4498         }
4499
4500         talloc_free(tmp_ctx);
4501
4502         return 0;
4503 }
4504
4505 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4506 {
4507         int ret;
4508         TDB_DATA outdata;
4509         int32_t res;
4510
4511         ret = ctdb_control(ctdb, destnode, 0, 
4512                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4513                            mem_ctx, &outdata, &res, &timeout, NULL);
4514         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4515                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4516                 return -1;
4517         }
4518
4519         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4520         talloc_free(outdata.dptr);
4521                     
4522         return 0;
4523 }
4524
4525 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4526 {
4527         if (h == NULL) {
4528                 return NULL;
4529         }
4530
4531         return &h->header;
4532 }
4533
4534
4535 struct ctdb_client_control_state *
4536 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4537 {
4538         struct ctdb_client_control_state *handle;
4539         struct ctdb_marshall_buffer *m;
4540         struct ctdb_rec_data *rec;
4541         TDB_DATA outdata;
4542
4543         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4544         if (m == NULL) {
4545                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4546                 return NULL;
4547         }
4548
4549         m->db_id = ctdb_db->db_id;
4550
4551         rec = ctdb_marshall_record(m, 0, key, header, data);
4552         if (rec == NULL) {
4553                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4554                 talloc_free(m);
4555                 return NULL;
4556         }
4557         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4558         if (m == NULL) {
4559                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4560                 talloc_free(m);
4561                 return NULL;
4562         }
4563         m->count++;
4564         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4565
4566
4567         outdata.dptr = (uint8_t *)m;
4568         outdata.dsize = talloc_get_size(m);
4569
4570         handle = ctdb_control_send(ctdb, destnode, 0, 
4571                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4572                            mem_ctx, &timeout, NULL);
4573         talloc_free(m);
4574         return handle;
4575 }
4576
4577 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4578 {
4579         int ret;
4580         int32_t res;
4581
4582         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4583         if ( (ret != 0) || (res != 0) ){
4584                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4585                 return -1;
4586         }
4587
4588         return 0;
4589 }
4590
4591 int
4592 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4593 {
4594         struct ctdb_client_control_state *state;
4595
4596         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4597         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4598 }
4599
4600
4601
4602
4603
4604
4605 /*
4606   set a database to be readonly
4607  */
4608 struct ctdb_client_control_state *
4609 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4610 {
4611         TDB_DATA data;
4612
4613         data.dptr = (uint8_t *)&dbid;
4614         data.dsize = sizeof(dbid);
4615
4616         return ctdb_control_send(ctdb, destnode, 0, 
4617                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4618                            ctdb, NULL, NULL);
4619 }
4620
4621 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4622 {
4623         int ret;
4624         int32_t res;
4625
4626         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4627         if (ret != 0 || res != 0) {
4628           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4629                 return -1;
4630         }
4631
4632         return 0;
4633 }
4634
4635 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4636 {
4637         struct ctdb_client_control_state *state;
4638
4639         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4640         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4641 }