ReadOnly: Add clientside code to fetch readonly records
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data, bool updatetdb)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92         c->header = header;
93
94         for (fn=ctdb_db->calls;fn;fn=fn->next) {
95                 if (fn->id == call->call_id) break;
96         }
97         if (fn == NULL) {
98                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
99                 talloc_free(c);
100                 return -1;
101         }
102
103         if (fn->fn(c) != 0) {
104                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
105                 talloc_free(c);
106                 return -1;
107         }
108
109         /* we need to force the record to be written out if this was a remote access */
110         if (c->new_data == NULL) {
111                 c->new_data = &c->record_data;
112         }
113
114         if (c->new_data && updatetdb) {
115                 /* XXX check that we always have the lock here? */
116                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
117                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
118                         talloc_free(c);
119                         return -1;
120                 }
121         }
122
123         if (c->reply_data) {
124                 call->reply_data = *c->reply_data;
125
126                 talloc_steal(call, call->reply_data.dptr);
127                 talloc_set_name_const(call->reply_data.dptr, __location__);
128         } else {
129                 call->reply_data.dptr = NULL;
130                 call->reply_data.dsize = 0;
131         }
132         call->status = c->status;
133
134         talloc_free(c);
135
136         return 0;
137 }
138
139
140 /*
141   queue a packet for sending from client to daemon
142 */
143 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
144 {
145         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
146 }
147
148
149 /*
150   called when a CTDB_REPLY_CALL packet comes in in the client
151
152   This packet comes in response to a CTDB_REQ_CALL request packet. It
153   contains any reply data from the call
154 */
155 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
156 {
157         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
158         struct ctdb_client_call_state *state;
159
160         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
161         if (state == NULL) {
162                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
163                 return;
164         }
165
166         if (hdr->reqid != state->reqid) {
167                 /* we found a record  but it was the wrong one */
168                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
169                 return;
170         }
171
172         state->call->reply_data.dptr = c->data;
173         state->call->reply_data.dsize = c->datalen;
174         state->call->status = c->status;
175
176         talloc_steal(state, c);
177
178         state->state = CTDB_CALL_DONE;
179
180         if (state->async.fn) {
181                 state->async.fn(state);
182         }
183 }
184
185 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186
187 /*
188   this is called in the client, when data comes in from the daemon
189  */
190 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
191 {
192         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
193         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194         TALLOC_CTX *tmp_ctx;
195
196         /* place the packet as a child of a tmp_ctx. We then use
197            talloc_free() below to free it. If any of the calls want
198            to keep it, then they will steal it somewhere else, and the
199            talloc_free() will be a no-op */
200         tmp_ctx = talloc_new(ctdb);
201         talloc_steal(tmp_ctx, hdr);
202
203         if (cnt == 0) {
204                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
205                 exit(0);
206         }
207
208         if (cnt < sizeof(*hdr)) {
209                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210                 goto done;
211         }
212         if (cnt != hdr->length) {
213                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
214                                (unsigned)hdr->length, (unsigned)cnt);
215                 goto done;
216         }
217
218         if (hdr->ctdb_magic != CTDB_MAGIC) {
219                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
220                 goto done;
221         }
222
223         if (hdr->ctdb_version != CTDB_VERSION) {
224                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
225                 goto done;
226         }
227
228         switch (hdr->operation) {
229         case CTDB_REPLY_CALL:
230                 ctdb_client_reply_call(ctdb, hdr);
231                 break;
232
233         case CTDB_REQ_MESSAGE:
234                 ctdb_request_message(ctdb, hdr);
235                 break;
236
237         case CTDB_REPLY_CONTROL:
238                 ctdb_client_reply_control(ctdb, hdr);
239                 break;
240
241         default:
242                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
243         }
244
245 done:
246         talloc_free(tmp_ctx);
247 }
248
249 /*
250   connect with exponential backoff, thanks Stevens
251 */
252 #define CONNECT_MAXSLEEP 64
253 static int ctdb_connect_retry(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256         int secs;
257         int ret = 0;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
264                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
265                               sizeof(addr));
266                 if ((ret == 0) || (errno != EAGAIN)) {
267                         break;
268                 }
269
270                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
271                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
272                                          strerror(errno), secs));
273                         sleep(secs);
274                 }
275         }
276
277         return ret;
278 }
279
280 /*
281   connect to a unix domain socket
282 */
283 int ctdb_socket_connect(struct ctdb_context *ctdb)
284 {
285         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286         if (ctdb->daemon.sd == -1) {
287                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288                 return -1;
289         }
290
291         set_nonblocking(ctdb->daemon.sd);
292         set_close_on_exec(ctdb->daemon.sd);
293
294         if (ctdb_connect_retry(ctdb) == -1) {
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 close(ctdb->daemon.sd);
297                 ctdb->daemon.sd = -1;
298                 return -1;
299         }
300
301         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
302                                               CTDB_DS_ALIGNMENT, 
303                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
304         return 0;
305 }
306
307
308 struct ctdb_record_handle {
309         struct ctdb_db_context *ctdb_db;
310         TDB_DATA key;
311         TDB_DATA *data;
312         struct ctdb_ltdb_header header;
313 };
314
315
316 /*
317   make a recv call to the local ctdb daemon - called from client context
318
319   This is called when the program wants to wait for a ctdb_call to complete and get the 
320   results. This call will block unless the call has already completed.
321 */
322 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
323 {
324         if (state == NULL) {
325                 return -1;
326         }
327
328         while (state->state < CTDB_CALL_DONE) {
329                 event_loop_once(state->ctdb_db->ctdb->ev);
330         }
331         if (state->state != CTDB_CALL_DONE) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
333                 talloc_free(state);
334                 return -1;
335         }
336
337         if (state->call->reply_data.dsize) {
338                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
339                                                       state->call->reply_data.dptr,
340                                                       state->call->reply_data.dsize);
341                 call->reply_data.dsize = state->call->reply_data.dsize;
342         } else {
343                 call->reply_data.dptr = NULL;
344                 call->reply_data.dsize = 0;
345         }
346         call->status = state->call->status;
347         talloc_free(state);
348
349         return call->status;
350 }
351
352
353
354
355 /*
356   destroy a ctdb_call in client
357 */
358 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
359 {
360         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
361         return 0;
362 }
363
364 /*
365   construct an event driven local ctdb_call
366
367   this is used so that locally processed ctdb_call requests are processed
368   in an event driven manner
369 */
370 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
371                                                                   struct ctdb_call *call,
372                                                                   struct ctdb_ltdb_header *header,
373                                                                   TDB_DATA *data)
374 {
375         struct ctdb_client_call_state *state;
376         struct ctdb_context *ctdb = ctdb_db->ctdb;
377         int ret;
378
379         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
380         CTDB_NO_MEMORY_NULL(ctdb, state);
381         state->call = talloc_zero(state, struct ctdb_call);
382         CTDB_NO_MEMORY_NULL(ctdb, state->call);
383
384         talloc_steal(state, data->dptr);
385
386         state->state   = CTDB_CALL_DONE;
387         *(state->call) = *call;
388         state->ctdb_db = ctdb_db;
389
390         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
391
392         return state;
393 }
394
395 /*
396   make a ctdb call to the local daemon - async send. Called from client context.
397
398   This constructs a ctdb_call request and queues it for processing. 
399   This call never blocks.
400 */
401 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
402                                               struct ctdb_call *call)
403 {
404         struct ctdb_client_call_state *state;
405         struct ctdb_context *ctdb = ctdb_db->ctdb;
406         struct ctdb_ltdb_header header;
407         TDB_DATA data;
408         int ret;
409         size_t len;
410         struct ctdb_req_call *c;
411
412         /* if the domain socket is not yet open, open it */
413         if (ctdb->daemon.sd==-1) {
414                 ctdb_socket_connect(ctdb);
415         }
416
417         ret = ctdb_ltdb_lock(ctdb_db, call->key);
418         if (ret != 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
420                 return NULL;
421         }
422
423         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
424
425         if (ret == 0 && header.dmaster == ctdb->pnn) {
426                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
427                 talloc_free(data.dptr);
428                 ctdb_ltdb_unlock(ctdb_db, call->key);
429                 return state;
430         }
431
432         ctdb_ltdb_unlock(ctdb_db, call->key);
433         talloc_free(data.dptr);
434
435         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
436         if (state == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
438                 return NULL;
439         }
440         state->call = talloc_zero(state, struct ctdb_call);
441         if (state->call == NULL) {
442                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
443                 return NULL;
444         }
445
446         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
447         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
448         if (c == NULL) {
449                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
450                 return NULL;
451         }
452
453         state->reqid     = ctdb_reqid_new(ctdb, state);
454         state->ctdb_db = ctdb_db;
455         talloc_set_destructor(state, ctdb_client_call_destructor);
456
457         c->hdr.reqid     = state->reqid;
458         c->flags         = call->flags;
459         c->db_id         = ctdb_db->db_id;
460         c->callid        = call->call_id;
461         c->hopcount      = 0;
462         c->keylen        = call->key.dsize;
463         c->calldatalen   = call->call_data.dsize;
464         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
465         memcpy(&c->data[call->key.dsize], 
466                call->call_data.dptr, call->call_data.dsize);
467         *(state->call)              = *call;
468         state->call->call_data.dptr = &c->data[call->key.dsize];
469         state->call->key.dptr       = &c->data[0];
470
471         state->state  = CTDB_CALL_WAIT;
472
473
474         ctdb_client_queue_pkt(ctdb, &c->hdr);
475
476         return state;
477 }
478
479
480 /*
481   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
482 */
483 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
484 {
485         struct ctdb_client_call_state *state;
486
487         state = ctdb_call_send(ctdb_db, call);
488         return ctdb_call_recv(state, call);
489 }
490
491
492 /*
493   tell the daemon what messaging srvid we will use, and register the message
494   handler function in the client
495 */
496 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
497                              ctdb_msg_fn_t handler,
498                              void *private_data)
499                                     
500 {
501         int res;
502         int32_t status;
503         
504         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
505                            tdb_null, NULL, NULL, &status, NULL, NULL);
506         if (res != 0 || status != 0) {
507                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
508                 return -1;
509         }
510
511         /* also need to register the handler with our own ctdb structure */
512         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
513 }
514
515 /*
516   tell the daemon we no longer want a srvid
517 */
518 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
519 {
520         int res;
521         int32_t status;
522         
523         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
524                            tdb_null, NULL, NULL, &status, NULL, NULL);
525         if (res != 0 || status != 0) {
526                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
527                 return -1;
528         }
529
530         /* also need to register the handler with our own ctdb structure */
531         ctdb_deregister_message_handler(ctdb, srvid, private_data);
532         return 0;
533 }
534
535
536 /*
537   send a message - from client context
538  */
539 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
540                       uint64_t srvid, TDB_DATA data)
541 {
542         struct ctdb_req_message *r;
543         int len, res;
544
545         len = offsetof(struct ctdb_req_message, data) + data.dsize;
546         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
547                                len, struct ctdb_req_message);
548         CTDB_NO_MEMORY(ctdb, r);
549
550         r->hdr.destnode  = pnn;
551         r->srvid         = srvid;
552         r->datalen       = data.dsize;
553         memcpy(&r->data[0], data.dptr, data.dsize);
554         
555         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
556         if (res != 0) {
557                 return res;
558         }
559
560         talloc_free(r);
561         return 0;
562 }
563
564
565 /*
566   cancel a ctdb_fetch_lock operation, releasing the lock
567  */
568 static int fetch_lock_destructor(struct ctdb_record_handle *h)
569 {
570         ctdb_ltdb_unlock(h->ctdb_db, h->key);
571         return 0;
572 }
573
574 /*
575   force the migration of a record to this node
576  */
577 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
578 {
579         struct ctdb_call call;
580         ZERO_STRUCT(call);
581         call.call_id = CTDB_NULL_FUNC;
582         call.key = key;
583         call.flags = CTDB_IMMEDIATE_MIGRATION;
584         return ctdb_call(ctdb_db, &call);
585 }
586
587 /*
588   try to fetch a readonly copy of a record
589  */
590 static int
591 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
592 {
593         int ret;
594
595         struct ctdb_call call;
596         ZERO_STRUCT(call);
597
598         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
599         call.call_data.dptr = NULL;
600         call.call_data.dsize = 0;
601         call.key = key;
602         call.flags = CTDB_WANT_READONLY;
603         ret = ctdb_call(ctdb_db, &call);
604
605         if (ret != 0) {
606                 return -1;
607         }
608         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
609                 return -1;
610         }
611
612         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
613         if (*hdr == NULL) {
614                 talloc_free(call.reply_data.dptr);
615                 return -1;
616         }
617
618         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
619         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
620         if (data->dptr == NULL) {
621                 talloc_free(call.reply_data.dptr);
622                 talloc_free(hdr);
623                 return -1;
624         }
625
626         return 0;
627 }
628
629 /*
630   get a lock on a record, and return the records data. Blocks until it gets the lock
631  */
632 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
633                                            TDB_DATA key, TDB_DATA *data)
634 {
635         int ret;
636         struct ctdb_record_handle *h;
637
638         /*
639           procedure is as follows:
640
641           1) get the chain lock. 
642           2) check if we are dmaster
643           3) if we are the dmaster then return handle 
644           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
645              reply from ctdbd
646           5) when we get the reply, goto (1)
647          */
648
649         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
650         if (h == NULL) {
651                 return NULL;
652         }
653
654         h->ctdb_db = ctdb_db;
655         h->key     = key;
656         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
657         if (h->key.dptr == NULL) {
658                 talloc_free(h);
659                 return NULL;
660         }
661         h->data    = data;
662
663         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
664                  (const char *)key.dptr));
665
666 again:
667         /* step 1 - get the chain lock */
668         ret = ctdb_ltdb_lock(ctdb_db, key);
669         if (ret != 0) {
670                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
671                 talloc_free(h);
672                 return NULL;
673         }
674
675         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
676
677         talloc_set_destructor(h, fetch_lock_destructor);
678
679         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
680
681         /* when torturing, ensure we test the remote path */
682         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
683             random() % 5 == 0) {
684                 h->header.dmaster = (uint32_t)-1;
685         }
686
687
688         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
689
690         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
691                 ctdb_ltdb_unlock(ctdb_db, key);
692                 ret = ctdb_client_force_migration(ctdb_db, key);
693                 if (ret != 0) {
694                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
695                         talloc_free(h);
696                         return NULL;
697                 }
698                 goto again;
699         }
700
701         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
702         return h;
703 }
704
705 /*
706   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
707  */
708 struct ctdb_record_handle *
709 ctdb_fetch_readonly_lock(
710         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
711         TDB_DATA key, TDB_DATA *data,
712         int read_only)
713 {
714         int ret;
715         struct ctdb_record_handle *h;
716         struct ctdb_ltdb_header *roheader = NULL;
717
718         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
719         if (h == NULL) {
720                 return NULL;
721         }
722
723         h->ctdb_db = ctdb_db;
724         h->key     = key;
725         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
726         if (h->key.dptr == NULL) {
727                 talloc_free(h);
728                 return NULL;
729         }
730         h->data    = data;
731
732         data->dptr = NULL;
733         data->dsize = 0;
734
735
736 again:
737         talloc_free(roheader);
738         roheader = NULL;
739
740         talloc_free(data->dptr);
741         data->dptr = NULL;
742         data->dsize = 0;
743
744         /* Lock the record/chain */
745         ret = ctdb_ltdb_lock(ctdb_db, key);
746         if (ret != 0) {
747                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
748                 talloc_free(h);
749                 return NULL;
750         }
751
752         talloc_set_destructor(h, fetch_lock_destructor);
753
754         /* Check if record exists yet in the TDB */
755         ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
756         if (ret != 0) {
757                 ctdb_ltdb_unlock(ctdb_db, key);
758                 ret = ctdb_client_force_migration(ctdb_db, key);
759                 if (ret != 0) {
760                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
761                         talloc_free(h);
762                         return NULL;
763                 }
764                 goto again;
765         }
766
767
768         /* if we are dmaster, just return the handle */
769         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
770                 return h;
771         }
772
773         if (read_only != 0) {
774                 TDB_DATA rodata = {NULL, 0};
775
776                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
777                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
778                         return h;
779                 }
780
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
785                         ret = ctdb_client_force_migration(ctdb_db, key);
786                         if (ret != 0) {
787                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
788                                 talloc_free(h);
789                                 return NULL;
790                         }
791
792                         goto again;
793                 }
794
795                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
796                         ret = ctdb_client_force_migration(ctdb_db, key);
797                         if (ret != 0) {
798                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
799                                 talloc_free(h);
800                                 return NULL;
801                         }
802
803                         goto again;
804                 }
805
806                 ret = ctdb_ltdb_lock(ctdb_db, key);
807                 if (ret != 0) {
808                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
809                         talloc_free(h);
810                         return NULL;
811                 }
812
813                 ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
814                 if (ret != 0) {
815                         ctdb_ltdb_unlock(ctdb_db, key);
816
817                         ret = ctdb_client_force_migration(ctdb_db, key);
818                         if (ret != 0) {
819                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
820                                 talloc_free(h);
821                                 return NULL;
822                         }
823
824                         goto again;
825                 }
826
827                 if (h->header.rsn >= roheader->rsn) {
828                         DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
829                         ctdb_ltdb_unlock(ctdb_db, key);
830
831                         ret = ctdb_client_force_migration(ctdb_db, key);
832                         if (ret != 0) {
833                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
834                                 talloc_free(h);
835                                 return NULL;
836                         }
837
838                         goto again;
839                 }
840
841                 if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
842                         ctdb_ltdb_unlock(ctdb_db, key);
843
844                         ret = ctdb_client_force_migration(ctdb_db, key);
845                         if (ret != 0) {
846                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
847                                 talloc_free(h);
848                                 return NULL;
849                         }
850
851                         goto again;
852                 }
853                 return h;
854         }
855
856         /* we are not dmaster and this was not a request for a readonly lock
857          * so unlock the record, migrate it and try again
858          */
859         ctdb_ltdb_unlock(ctdb_db, key);
860         ret = ctdb_client_force_migration(ctdb_db, key);
861         if (ret != 0) {
862                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
863                 talloc_free(h);
864                 return NULL;
865         }
866         goto again;
867 }
868
869 /*
870   store some data to the record that was locked with ctdb_fetch_lock()
871 */
872 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
873 {
874         if (h->ctdb_db->persistent) {
875                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
876                 return -1;
877         }
878
879         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
880 }
881
882 /*
883   non-locking fetch of a record
884  */
885 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
886                TDB_DATA key, TDB_DATA *data)
887 {
888         struct ctdb_call call;
889         int ret;
890
891         call.call_id = CTDB_FETCH_FUNC;
892         call.call_data.dptr = NULL;
893         call.call_data.dsize = 0;
894         call.key = key;
895
896         ret = ctdb_call(ctdb_db, &call);
897
898         if (ret == 0) {
899                 *data = call.reply_data;
900                 talloc_steal(mem_ctx, data->dptr);
901         }
902
903         return ret;
904 }
905
906
907
908 /*
909    called when a control completes or timesout to invoke the callback
910    function the user provided
911 */
912 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
913         struct timeval t, void *private_data)
914 {
915         struct ctdb_client_control_state *state;
916         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
917         int ret;
918
919         state = talloc_get_type(private_data, struct ctdb_client_control_state);
920         talloc_steal(tmp_ctx, state);
921
922         ret = ctdb_control_recv(state->ctdb, state, state,
923                         NULL, 
924                         NULL, 
925                         NULL);
926
927         talloc_free(tmp_ctx);
928 }
929
930 /*
931   called when a CTDB_REPLY_CONTROL packet comes in in the client
932
933   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
934   contains any reply data from the control
935 */
936 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
937                                       struct ctdb_req_header *hdr)
938 {
939         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
940         struct ctdb_client_control_state *state;
941
942         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
943         if (state == NULL) {
944                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
945                 return;
946         }
947
948         if (hdr->reqid != state->reqid) {
949                 /* we found a record  but it was the wrong one */
950                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
951                 return;
952         }
953
954         state->outdata.dptr = c->data;
955         state->outdata.dsize = c->datalen;
956         state->status = c->status;
957         if (c->errorlen) {
958                 state->errormsg = talloc_strndup(state, 
959                                                  (char *)&c->data[c->datalen], 
960                                                  c->errorlen);
961         }
962
963         /* state->outdata now uses resources from c so we dont want c
964            to just dissappear from under us while state is still alive
965         */
966         talloc_steal(state, c);
967
968         state->state = CTDB_CONTROL_DONE;
969
970         /* if we had a callback registered for this control, pull the response
971            and call the callback.
972         */
973         if (state->async.fn) {
974                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
975         }
976 }
977
978
979 /*
980   destroy a ctdb_control in client
981 */
982 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
983 {
984         ctdb_reqid_remove(state->ctdb, state->reqid);
985         return 0;
986 }
987
988
989 /* time out handler for ctdb_control */
990 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
991         struct timeval t, void *private_data)
992 {
993         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
994
995         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
996                          "dstnode:%u\n", state->reqid, state->c->opcode,
997                          state->c->hdr.destnode));
998
999         state->state = CTDB_CONTROL_TIMEOUT;
1000
1001         /* if we had a callback registered for this control, pull the response
1002            and call the callback.
1003         */
1004         if (state->async.fn) {
1005                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1006         }
1007 }
1008
1009 /* async version of send control request */
1010 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1011                 uint32_t destnode, uint64_t srvid, 
1012                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1013                 TALLOC_CTX *mem_ctx,
1014                 struct timeval *timeout,
1015                 char **errormsg)
1016 {
1017         struct ctdb_client_control_state *state;
1018         size_t len;
1019         struct ctdb_req_control *c;
1020         int ret;
1021
1022         if (errormsg) {
1023                 *errormsg = NULL;
1024         }
1025
1026         /* if the domain socket is not yet open, open it */
1027         if (ctdb->daemon.sd==-1) {
1028                 ctdb_socket_connect(ctdb);
1029         }
1030
1031         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1032         CTDB_NO_MEMORY_NULL(ctdb, state);
1033
1034         state->ctdb       = ctdb;
1035         state->reqid      = ctdb_reqid_new(ctdb, state);
1036         state->state      = CTDB_CONTROL_WAIT;
1037         state->errormsg   = NULL;
1038
1039         talloc_set_destructor(state, ctdb_control_destructor);
1040
1041         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1042         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1043                                len, struct ctdb_req_control);
1044         state->c            = c;        
1045         CTDB_NO_MEMORY_NULL(ctdb, c);
1046         c->hdr.reqid        = state->reqid;
1047         c->hdr.destnode     = destnode;
1048         c->opcode           = opcode;
1049         c->client_id        = 0;
1050         c->flags            = flags;
1051         c->srvid            = srvid;
1052         c->datalen          = data.dsize;
1053         if (data.dsize) {
1054                 memcpy(&c->data[0], data.dptr, data.dsize);
1055         }
1056
1057         /* timeout */
1058         if (timeout && !timeval_is_zero(timeout)) {
1059                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1060         }
1061
1062         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1063         if (ret != 0) {
1064                 talloc_free(state);
1065                 return NULL;
1066         }
1067
1068         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1069                 talloc_free(state);
1070                 return NULL;
1071         }
1072
1073         return state;
1074 }
1075
1076
1077 /* async version of receive control reply */
1078 int ctdb_control_recv(struct ctdb_context *ctdb, 
1079                 struct ctdb_client_control_state *state, 
1080                 TALLOC_CTX *mem_ctx,
1081                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1082 {
1083         TALLOC_CTX *tmp_ctx;
1084
1085         if (status != NULL) {
1086                 *status = -1;
1087         }
1088         if (errormsg != NULL) {
1089                 *errormsg = NULL;
1090         }
1091
1092         if (state == NULL) {
1093                 return -1;
1094         }
1095
1096         /* prevent double free of state */
1097         tmp_ctx = talloc_new(ctdb);
1098         talloc_steal(tmp_ctx, state);
1099
1100         /* loop one event at a time until we either timeout or the control
1101            completes.
1102         */
1103         while (state->state == CTDB_CONTROL_WAIT) {
1104                 event_loop_once(ctdb->ev);
1105         }
1106
1107         if (state->state != CTDB_CONTROL_DONE) {
1108                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1109                 if (state->async.fn) {
1110                         state->async.fn(state);
1111                 }
1112                 talloc_free(tmp_ctx);
1113                 return -1;
1114         }
1115
1116         if (state->errormsg) {
1117                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1118                 if (errormsg) {
1119                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1120                 }
1121                 if (state->async.fn) {
1122                         state->async.fn(state);
1123                 }
1124                 talloc_free(tmp_ctx);
1125                 return -1;
1126         }
1127
1128         if (outdata) {
1129                 *outdata = state->outdata;
1130                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1131         }
1132
1133         if (status) {
1134                 *status = state->status;
1135         }
1136
1137         if (state->async.fn) {
1138                 state->async.fn(state);
1139         }
1140
1141         talloc_free(tmp_ctx);
1142         return 0;
1143 }
1144
1145
1146
1147 /*
1148   send a ctdb control message
1149   timeout specifies how long we should wait for a reply.
1150   if timeout is NULL we wait indefinitely
1151  */
1152 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1153                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1154                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1155                  struct timeval *timeout,
1156                  char **errormsg)
1157 {
1158         struct ctdb_client_control_state *state;
1159
1160         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1161                         flags, data, mem_ctx,
1162                         timeout, errormsg);
1163         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1164                         errormsg);
1165 }
1166
1167
1168
1169
1170 /*
1171   a process exists call. Returns 0 if process exists, -1 otherwise
1172  */
1173 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1174 {
1175         int ret;
1176         TDB_DATA data;
1177         int32_t status;
1178
1179         data.dptr = (uint8_t*)&pid;
1180         data.dsize = sizeof(pid);
1181
1182         ret = ctdb_control(ctdb, destnode, 0, 
1183                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1184                            NULL, NULL, &status, NULL, NULL);
1185         if (ret != 0) {
1186                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1187                 return -1;
1188         }
1189
1190         return status;
1191 }
1192
1193 /*
1194   get remote statistics
1195  */
1196 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1197 {
1198         int ret;
1199         TDB_DATA data;
1200         int32_t res;
1201
1202         ret = ctdb_control(ctdb, destnode, 0, 
1203                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1204                            ctdb, &data, &res, NULL, NULL);
1205         if (ret != 0 || res != 0) {
1206                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1207                 return -1;
1208         }
1209
1210         if (data.dsize != sizeof(struct ctdb_statistics)) {
1211                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1212                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1213                       return -1;
1214         }
1215
1216         *status = *(struct ctdb_statistics *)data.dptr;
1217         talloc_free(data.dptr);
1218                         
1219         return 0;
1220 }
1221
1222 /*
1223   shutdown a remote ctdb node
1224  */
1225 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1226 {
1227         struct ctdb_client_control_state *state;
1228
1229         state = ctdb_control_send(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1231                            NULL, &timeout, NULL);
1232         if (state == NULL) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1234                 return -1;
1235         }
1236
1237         return 0;
1238 }
1239
1240 /*
1241   get vnn map from a remote node
1242  */
1243 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1244 {
1245         int ret;
1246         TDB_DATA outdata;
1247         int32_t res;
1248         struct ctdb_vnn_map_wire *map;
1249
1250         ret = ctdb_control(ctdb, destnode, 0, 
1251                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1252                            mem_ctx, &outdata, &res, &timeout, NULL);
1253         if (ret != 0 || res != 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1255                 return -1;
1256         }
1257         
1258         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1259         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1260             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1261                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1262                 return -1;
1263         }
1264
1265         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1266         CTDB_NO_MEMORY(ctdb, *vnnmap);
1267         (*vnnmap)->generation = map->generation;
1268         (*vnnmap)->size       = map->size;
1269         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1270
1271         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1272         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1273         talloc_free(outdata.dptr);
1274                     
1275         return 0;
1276 }
1277
1278
1279 /*
1280   get the recovery mode of a remote node
1281  */
1282 struct ctdb_client_control_state *
1283 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1284 {
1285         return ctdb_control_send(ctdb, destnode, 0, 
1286                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1287                            mem_ctx, &timeout, NULL);
1288 }
1289
1290 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1291 {
1292         int ret;
1293         int32_t res;
1294
1295         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1296         if (ret != 0) {
1297                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1298                 return -1;
1299         }
1300
1301         if (recmode) {
1302                 *recmode = (uint32_t)res;
1303         }
1304
1305         return 0;
1306 }
1307
1308 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1309 {
1310         struct ctdb_client_control_state *state;
1311
1312         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1313         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1314 }
1315
1316
1317
1318
1319 /*
1320   set the recovery mode of a remote node
1321  */
1322 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1323 {
1324         int ret;
1325         TDB_DATA data;
1326         int32_t res;
1327
1328         data.dsize = sizeof(uint32_t);
1329         data.dptr = (unsigned char *)&recmode;
1330
1331         ret = ctdb_control(ctdb, destnode, 0, 
1332                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1333                            NULL, NULL, &res, &timeout, NULL);
1334         if (ret != 0 || res != 0) {
1335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1336                 return -1;
1337         }
1338
1339         return 0;
1340 }
1341
1342
1343
1344 /*
1345   get the recovery master of a remote node
1346  */
1347 struct ctdb_client_control_state *
1348 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1349                         struct timeval timeout, uint32_t destnode)
1350 {
1351         return ctdb_control_send(ctdb, destnode, 0, 
1352                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1353                            mem_ctx, &timeout, NULL);
1354 }
1355
1356 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1357 {
1358         int ret;
1359         int32_t res;
1360
1361         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1362         if (ret != 0) {
1363                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1364                 return -1;
1365         }
1366
1367         if (recmaster) {
1368                 *recmaster = (uint32_t)res;
1369         }
1370
1371         return 0;
1372 }
1373
1374 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1375 {
1376         struct ctdb_client_control_state *state;
1377
1378         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1379         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1380 }
1381
1382
1383 /*
1384   set the recovery master of a remote node
1385  */
1386 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1387 {
1388         int ret;
1389         TDB_DATA data;
1390         int32_t res;
1391
1392         ZERO_STRUCT(data);
1393         data.dsize = sizeof(uint32_t);
1394         data.dptr = (unsigned char *)&recmaster;
1395
1396         ret = ctdb_control(ctdb, destnode, 0, 
1397                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1398                            NULL, NULL, &res, &timeout, NULL);
1399         if (ret != 0 || res != 0) {
1400                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1401                 return -1;
1402         }
1403
1404         return 0;
1405 }
1406
1407
1408 /*
1409   get a list of databases off a remote node
1410  */
1411 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1412                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1413 {
1414         int ret;
1415         TDB_DATA outdata;
1416         int32_t res;
1417
1418         ret = ctdb_control(ctdb, destnode, 0, 
1419                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1420                            mem_ctx, &outdata, &res, &timeout, NULL);
1421         if (ret != 0 || res != 0) {
1422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1423                 return -1;
1424         }
1425
1426         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1427         talloc_free(outdata.dptr);
1428                     
1429         return 0;
1430 }
1431
1432 /*
1433   get a list of nodes (vnn and flags ) from a remote node
1434  */
1435 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1436                 struct timeval timeout, uint32_t destnode, 
1437                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1438 {
1439         int ret;
1440         TDB_DATA outdata;
1441         int32_t res;
1442
1443         ret = ctdb_control(ctdb, destnode, 0, 
1444                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1445                            mem_ctx, &outdata, &res, &timeout, NULL);
1446         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1447                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1448                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1449         }
1450         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1452                 return -1;
1453         }
1454
1455         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1456         talloc_free(outdata.dptr);
1457                     
1458         return 0;
1459 }
1460
1461 /*
1462   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1463  */
1464 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1465                 struct timeval timeout, uint32_t destnode, 
1466                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1467 {
1468         int ret, i, len;
1469         TDB_DATA outdata;
1470         struct ctdb_node_mapv4 *nodemapv4;
1471         int32_t res;
1472
1473         ret = ctdb_control(ctdb, destnode, 0, 
1474                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1475                            mem_ctx, &outdata, &res, &timeout, NULL);
1476         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1478                 return -1;
1479         }
1480
1481         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1482
1483         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1484         (*nodemap) = talloc_zero_size(mem_ctx, len);
1485         CTDB_NO_MEMORY(ctdb, (*nodemap));
1486
1487         (*nodemap)->num = nodemapv4->num;
1488         for (i=0; i<nodemapv4->num; i++) {
1489                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1490                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1491                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1492                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1493         }
1494                 
1495         talloc_free(outdata.dptr);
1496                     
1497         return 0;
1498 }
1499
1500 /*
1501   drop the transport, reload the nodes file and restart the transport
1502  */
1503 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1504                     struct timeval timeout, uint32_t destnode)
1505 {
1506         int ret;
1507         int32_t res;
1508
1509         ret = ctdb_control(ctdb, destnode, 0, 
1510                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1511                            NULL, NULL, &res, &timeout, NULL);
1512         if (ret != 0 || res != 0) {
1513                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1514                 return -1;
1515         }
1516
1517         return 0;
1518 }
1519
1520
1521 /*
1522   set vnn map on a node
1523  */
1524 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1525                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1526 {
1527         int ret;
1528         TDB_DATA data;
1529         int32_t res;
1530         struct ctdb_vnn_map_wire *map;
1531         size_t len;
1532
1533         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1534         map = talloc_size(mem_ctx, len);
1535         CTDB_NO_MEMORY(ctdb, map);
1536
1537         map->generation = vnnmap->generation;
1538         map->size = vnnmap->size;
1539         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1540         
1541         data.dsize = len;
1542         data.dptr  = (uint8_t *)map;
1543
1544         ret = ctdb_control(ctdb, destnode, 0, 
1545                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1546                            NULL, NULL, &res, &timeout, NULL);
1547         if (ret != 0 || res != 0) {
1548                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1549                 return -1;
1550         }
1551
1552         talloc_free(map);
1553
1554         return 0;
1555 }
1556
1557
1558 /*
1559   async send for pull database
1560  */
1561 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1562         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1563         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1564 {
1565         TDB_DATA indata;
1566         struct ctdb_control_pulldb *pull;
1567         struct ctdb_client_control_state *state;
1568
1569         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1570         CTDB_NO_MEMORY_NULL(ctdb, pull);
1571
1572         pull->db_id   = dbid;
1573         pull->lmaster = lmaster;
1574
1575         indata.dsize = sizeof(struct ctdb_control_pulldb);
1576         indata.dptr  = (unsigned char *)pull;
1577
1578         state = ctdb_control_send(ctdb, destnode, 0, 
1579                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1580                                   mem_ctx, &timeout, NULL);
1581         talloc_free(pull);
1582
1583         return state;
1584 }
1585
1586 /*
1587   async recv for pull database
1588  */
1589 int ctdb_ctrl_pulldb_recv(
1590         struct ctdb_context *ctdb, 
1591         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1592         TDB_DATA *outdata)
1593 {
1594         int ret;
1595         int32_t res;
1596
1597         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1598         if ( (ret != 0) || (res != 0) ){
1599                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1600                 return -1;
1601         }
1602
1603         return 0;
1604 }
1605
1606 /*
1607   pull all keys and records for a specific database on a node
1608  */
1609 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1610                 uint32_t dbid, uint32_t lmaster, 
1611                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1612                 TDB_DATA *outdata)
1613 {
1614         struct ctdb_client_control_state *state;
1615
1616         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1617                                       timeout);
1618         
1619         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1620 }
1621
1622
1623 /*
1624   change dmaster for all keys in the database to the new value
1625  */
1626 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1627                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1628 {
1629         int ret;
1630         TDB_DATA indata;
1631         int32_t res;
1632
1633         indata.dsize = 2*sizeof(uint32_t);
1634         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1635
1636         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1637         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1638
1639         ret = ctdb_control(ctdb, destnode, 0, 
1640                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1641                            NULL, NULL, &res, &timeout, NULL);
1642         if (ret != 0 || res != 0) {
1643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1644                 return -1;
1645         }
1646
1647         return 0;
1648 }
1649
1650 /*
1651   ping a node, return number of clients connected
1652  */
1653 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1654 {
1655         int ret;
1656         int32_t res;
1657
1658         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1659                            tdb_null, NULL, NULL, &res, NULL, NULL);
1660         if (ret != 0) {
1661                 return -1;
1662         }
1663         return res;
1664 }
1665
1666 /*
1667   find the real path to a ltdb 
1668  */
1669 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1670                    const char **path)
1671 {
1672         int ret;
1673         int32_t res;
1674         TDB_DATA data;
1675
1676         data.dptr = (uint8_t *)&dbid;
1677         data.dsize = sizeof(dbid);
1678
1679         ret = ctdb_control(ctdb, destnode, 0, 
1680                            CTDB_CONTROL_GETDBPATH, 0, data, 
1681                            mem_ctx, &data, &res, &timeout, NULL);
1682         if (ret != 0 || res != 0) {
1683                 return -1;
1684         }
1685
1686         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1687         if ((*path) == NULL) {
1688                 return -1;
1689         }
1690
1691         talloc_free(data.dptr);
1692
1693         return 0;
1694 }
1695
1696 /*
1697   find the name of a db 
1698  */
1699 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1700                    const char **name)
1701 {
1702         int ret;
1703         int32_t res;
1704         TDB_DATA data;
1705
1706         data.dptr = (uint8_t *)&dbid;
1707         data.dsize = sizeof(dbid);
1708
1709         ret = ctdb_control(ctdb, destnode, 0, 
1710                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1711                            mem_ctx, &data, &res, &timeout, NULL);
1712         if (ret != 0 || res != 0) {
1713                 return -1;
1714         }
1715
1716         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1717         if ((*name) == NULL) {
1718                 return -1;
1719         }
1720
1721         talloc_free(data.dptr);
1722
1723         return 0;
1724 }
1725
1726 /*
1727   get the health status of a db
1728  */
1729 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1730                           struct timeval timeout,
1731                           uint32_t destnode,
1732                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1733                           const char **reason)
1734 {
1735         int ret;
1736         int32_t res;
1737         TDB_DATA data;
1738
1739         data.dptr = (uint8_t *)&dbid;
1740         data.dsize = sizeof(dbid);
1741
1742         ret = ctdb_control(ctdb, destnode, 0,
1743                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1744                            mem_ctx, &data, &res, &timeout, NULL);
1745         if (ret != 0 || res != 0) {
1746                 return -1;
1747         }
1748
1749         if (data.dsize == 0) {
1750                 (*reason) = NULL;
1751                 return 0;
1752         }
1753
1754         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1755         if ((*reason) == NULL) {
1756                 return -1;
1757         }
1758
1759         talloc_free(data.dptr);
1760
1761         return 0;
1762 }
1763
1764 /*
1765   create a database
1766  */
1767 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1768                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1769 {
1770         int ret;
1771         int32_t res;
1772         TDB_DATA data;
1773
1774         data.dptr = discard_const(name);
1775         data.dsize = strlen(name)+1;
1776
1777         ret = ctdb_control(ctdb, destnode, 0, 
1778                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1779                            0, data, 
1780                            mem_ctx, &data, &res, &timeout, NULL);
1781
1782         if (ret != 0 || res != 0) {
1783                 return -1;
1784         }
1785
1786         return 0;
1787 }
1788
1789 /*
1790   get debug level on a node
1791  */
1792 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1793 {
1794         int ret;
1795         int32_t res;
1796         TDB_DATA data;
1797
1798         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1799                            ctdb, &data, &res, NULL, NULL);
1800         if (ret != 0 || res != 0) {
1801                 return -1;
1802         }
1803         if (data.dsize != sizeof(int32_t)) {
1804                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1805                          (unsigned)data.dsize));
1806                 return -1;
1807         }
1808         *level = *(int32_t *)data.dptr;
1809         talloc_free(data.dptr);
1810         return 0;
1811 }
1812
1813 /*
1814   set debug level on a node
1815  */
1816 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1817 {
1818         int ret;
1819         int32_t res;
1820         TDB_DATA data;
1821
1822         data.dptr = (uint8_t *)&level;
1823         data.dsize = sizeof(level);
1824
1825         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1826                            NULL, NULL, &res, NULL, NULL);
1827         if (ret != 0 || res != 0) {
1828                 return -1;
1829         }
1830         return 0;
1831 }
1832
1833
1834 /*
1835   get a list of connected nodes
1836  */
1837 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1838                                 struct timeval timeout,
1839                                 TALLOC_CTX *mem_ctx,
1840                                 uint32_t *num_nodes)
1841 {
1842         struct ctdb_node_map *map=NULL;
1843         int ret, i;
1844         uint32_t *nodes;
1845
1846         *num_nodes = 0;
1847
1848         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1849         if (ret != 0) {
1850                 return NULL;
1851         }
1852
1853         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1854         if (nodes == NULL) {
1855                 return NULL;
1856         }
1857
1858         for (i=0;i<map->num;i++) {
1859                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1860                         nodes[*num_nodes] = map->nodes[i].pnn;
1861                         (*num_nodes)++;
1862                 }
1863         }
1864
1865         return nodes;
1866 }
1867
1868
1869 /*
1870   reset remote status
1871  */
1872 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1873 {
1874         int ret;
1875         int32_t res;
1876
1877         ret = ctdb_control(ctdb, destnode, 0, 
1878                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1879                            NULL, NULL, &res, NULL, NULL);
1880         if (ret != 0 || res != 0) {
1881                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1882                 return -1;
1883         }
1884         return 0;
1885 }
1886
1887 /*
1888   this is the dummy null procedure that all databases support
1889 */
1890 static int ctdb_null_func(struct ctdb_call_info *call)
1891 {
1892         return 0;
1893 }
1894
1895 /*
1896   this is a plain fetch procedure that all databases support
1897 */
1898 static int ctdb_fetch_func(struct ctdb_call_info *call)
1899 {
1900         call->reply_data = &call->record_data;
1901         return 0;
1902 }
1903
1904 /*
1905   this is a plain fetch procedure that all databases support
1906   this returns the full record including the ltdb header
1907 */
1908 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
1909 {
1910         call->reply_data = talloc(call, TDB_DATA);
1911         if (call->reply_data == NULL) {
1912                 return -1;
1913         }
1914         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
1915         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
1916         if (call->reply_data->dptr == NULL) {
1917                 return -1;
1918         }
1919         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
1920         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
1921
1922         return 0;
1923 }
1924
1925 /*
1926   attach to a specific database - client call
1927 */
1928 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1929 {
1930         struct ctdb_db_context *ctdb_db;
1931         TDB_DATA data;
1932         int ret;
1933         int32_t res;
1934
1935         ctdb_db = ctdb_db_handle(ctdb, name);
1936         if (ctdb_db) {
1937                 return ctdb_db;
1938         }
1939
1940         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1941         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1942
1943         ctdb_db->ctdb = ctdb;
1944         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1945         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1946
1947         data.dptr = discard_const(name);
1948         data.dsize = strlen(name)+1;
1949
1950         /* tell ctdb daemon to attach */
1951         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1952                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1953                            0, data, ctdb_db, &data, &res, NULL, NULL);
1954         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1955                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1956                 talloc_free(ctdb_db);
1957                 return NULL;
1958         }
1959         
1960         ctdb_db->db_id = *(uint32_t *)data.dptr;
1961         talloc_free(data.dptr);
1962
1963         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1964         if (ret != 0) {
1965                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1966                 talloc_free(ctdb_db);
1967                 return NULL;
1968         }
1969
1970         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1971         if (ctdb->valgrinding) {
1972                 tdb_flags |= TDB_NOMMAP;
1973         }
1974         tdb_flags |= TDB_DISALLOW_NESTING;
1975
1976         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1977         if (ctdb_db->ltdb == NULL) {
1978                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1979                 talloc_free(ctdb_db);
1980                 return NULL;
1981         }
1982
1983         ctdb_db->persistent = persistent;
1984
1985         DLIST_ADD(ctdb->db_list, ctdb_db);
1986
1987         /* add well known functions */
1988         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1989         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1990         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1991
1992         return ctdb_db;
1993 }
1994
1995
1996 /*
1997   setup a call for a database
1998  */
1999 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2000 {
2001         struct ctdb_registered_call *call;
2002
2003 #if 0
2004         TDB_DATA data;
2005         int32_t status;
2006         struct ctdb_control_set_call c;
2007         int ret;
2008
2009         /* this is no longer valid with the separate daemon architecture */
2010         c.db_id = ctdb_db->db_id;
2011         c.fn    = fn;
2012         c.id    = id;
2013
2014         data.dptr = (uint8_t *)&c;
2015         data.dsize = sizeof(c);
2016
2017         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2018                            data, NULL, NULL, &status, NULL, NULL);
2019         if (ret != 0 || status != 0) {
2020                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2021                 return -1;
2022         }
2023 #endif
2024
2025         /* also register locally */
2026         call = talloc(ctdb_db, struct ctdb_registered_call);
2027         call->fn = fn;
2028         call->id = id;
2029
2030         DLIST_ADD(ctdb_db->calls, call);        
2031         return 0;
2032 }
2033
2034
2035 struct traverse_state {
2036         bool done;
2037         uint32_t count;
2038         ctdb_traverse_func fn;
2039         void *private_data;
2040 };
2041
2042 /*
2043   called on each key during a ctdb_traverse
2044  */
2045 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2046 {
2047         struct traverse_state *state = (struct traverse_state *)p;
2048         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2049         TDB_DATA key;
2050
2051         if (data.dsize < sizeof(uint32_t) ||
2052             d->length != data.dsize) {
2053                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2054                 state->done = True;
2055                 return;
2056         }
2057
2058         key.dsize = d->keylen;
2059         key.dptr  = &d->data[0];
2060         data.dsize = d->datalen;
2061         data.dptr = &d->data[d->keylen];
2062
2063         if (key.dsize == 0 && data.dsize == 0) {
2064                 /* end of traverse */
2065                 state->done = True;
2066                 return;
2067         }
2068
2069         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
2070                 /* empty records are deleted records in ctdb */
2071                 return;
2072         }
2073
2074         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2075                 state->done = True;
2076         }
2077
2078         state->count++;
2079 }
2080
2081
2082 /*
2083   start a cluster wide traverse, calling the supplied fn on each record
2084   return the number of records traversed, or -1 on error
2085  */
2086 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2087 {
2088         TDB_DATA data;
2089         struct ctdb_traverse_start t;
2090         int32_t status;
2091         int ret;
2092         uint64_t srvid = (getpid() | 0xFLL<<60);
2093         struct traverse_state state;
2094
2095         state.done = False;
2096         state.count = 0;
2097         state.private_data = private_data;
2098         state.fn = fn;
2099
2100         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2103                 return -1;
2104         }
2105
2106         t.db_id = ctdb_db->db_id;
2107         t.srvid = srvid;
2108         t.reqid = 0;
2109
2110         data.dptr = (uint8_t *)&t;
2111         data.dsize = sizeof(t);
2112
2113         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
2114                            data, NULL, NULL, &status, NULL, NULL);
2115         if (ret != 0 || status != 0) {
2116                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2117                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2118                 return -1;
2119         }
2120
2121         while (!state.done) {
2122                 event_loop_once(ctdb_db->ctdb->ev);
2123         }
2124
2125         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2126         if (ret != 0) {
2127                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2128                 return -1;
2129         }
2130
2131         return state.count;
2132 }
2133
2134 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2135 /*
2136   called on each key during a catdb
2137  */
2138 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2139 {
2140         int i;
2141         FILE *f = (FILE *)p;
2142         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2143
2144         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2145         for (i=0;i<key.dsize;i++) {
2146                 if (ISASCII(key.dptr[i])) {
2147                         fprintf(f, "%c", key.dptr[i]);
2148                 } else {
2149                         fprintf(f, "\\%02X", key.dptr[i]);
2150                 }
2151         }
2152         fprintf(f, "\"\n");
2153
2154         fprintf(f, "dmaster: %u\n", h->dmaster);
2155         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2156         fprintf(f, "flags: 0x%08x", h->flags);
2157         if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2158         if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2159         if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2160         if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2161         if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2162         if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2163         if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2164         fprintf(f, "\n");
2165
2166         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2167         for (i=sizeof(*h);i<data.dsize;i++) {
2168                 if (ISASCII(data.dptr[i])) {
2169                         fprintf(f, "%c", data.dptr[i]);
2170                 } else {
2171                         fprintf(f, "\\%02X", data.dptr[i]);
2172                 }
2173         }
2174         fprintf(f, "\"\n");
2175
2176         fprintf(f, "\n");
2177
2178         return 0;
2179 }
2180
2181 /*
2182   convenience function to list all keys to stdout
2183  */
2184 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
2185 {
2186         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
2187 }
2188
2189 /*
2190   get the pid of a ctdb daemon
2191  */
2192 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2193 {
2194         int ret;
2195         int32_t res;
2196
2197         ret = ctdb_control(ctdb, destnode, 0, 
2198                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2199                            NULL, NULL, &res, &timeout, NULL);
2200         if (ret != 0) {
2201                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2202                 return -1;
2203         }
2204
2205         *pid = res;
2206
2207         return 0;
2208 }
2209
2210
2211 /*
2212   async freeze send control
2213  */
2214 struct ctdb_client_control_state *
2215 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2216 {
2217         return ctdb_control_send(ctdb, destnode, priority, 
2218                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2219                            mem_ctx, &timeout, NULL);
2220 }
2221
2222 /* 
2223    async freeze recv control
2224 */
2225 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2226 {
2227         int ret;
2228         int32_t res;
2229
2230         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2231         if ( (ret != 0) || (res != 0) ){
2232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2233                 return -1;
2234         }
2235
2236         return 0;
2237 }
2238
2239 /*
2240   freeze databases of a certain priority
2241  */
2242 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2243 {
2244         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2245         struct ctdb_client_control_state *state;
2246         int ret;
2247
2248         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2249         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2250         talloc_free(tmp_ctx);
2251
2252         return ret;
2253 }
2254
2255 /* Freeze all databases */
2256 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2257 {
2258         int i;
2259
2260         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2261                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2262                         return -1;
2263                 }
2264         }
2265         return 0;
2266 }
2267
2268 /*
2269   thaw databases of a certain priority
2270  */
2271 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2272 {
2273         int ret;
2274         int32_t res;
2275
2276         ret = ctdb_control(ctdb, destnode, priority, 
2277                            CTDB_CONTROL_THAW, 0, tdb_null, 
2278                            NULL, NULL, &res, &timeout, NULL);
2279         if (ret != 0 || res != 0) {
2280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2281                 return -1;
2282         }
2283
2284         return 0;
2285 }
2286
2287 /* thaw all databases */
2288 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2289 {
2290         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2291 }
2292
2293 /*
2294   get pnn of a node, or -1
2295  */
2296 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2297 {
2298         int ret;
2299         int32_t res;
2300
2301         ret = ctdb_control(ctdb, destnode, 0, 
2302                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2303                            NULL, NULL, &res, &timeout, NULL);
2304         if (ret != 0) {
2305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2306                 return -1;
2307         }
2308
2309         return res;
2310 }
2311
2312 /*
2313   get the monitoring mode of a remote node
2314  */
2315 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2316 {
2317         int ret;
2318         int32_t res;
2319
2320         ret = ctdb_control(ctdb, destnode, 0, 
2321                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2322                            NULL, NULL, &res, &timeout, NULL);
2323         if (ret != 0) {
2324                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2325                 return -1;
2326         }
2327
2328         *monmode = res;
2329
2330         return 0;
2331 }
2332
2333
2334 /*
2335  set the monitoring mode of a remote node to active
2336  */
2337 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2338 {
2339         int ret;
2340         
2341
2342         ret = ctdb_control(ctdb, destnode, 0, 
2343                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2344                            NULL, NULL,NULL, &timeout, NULL);
2345         if (ret != 0) {
2346                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2347                 return -1;
2348         }
2349
2350         
2351
2352         return 0;
2353 }
2354
2355 /*
2356   set the monitoring mode of a remote node to disable
2357  */
2358 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2359 {
2360         int ret;
2361         
2362
2363         ret = ctdb_control(ctdb, destnode, 0, 
2364                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2365                            NULL, NULL, NULL, &timeout, NULL);
2366         if (ret != 0) {
2367                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2368                 return -1;
2369         }
2370
2371         
2372
2373         return 0;
2374 }
2375
2376
2377
2378 /* 
2379   sent to a node to make it take over an ip address
2380 */
2381 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2382                           uint32_t destnode, struct ctdb_public_ip *ip)
2383 {
2384         TDB_DATA data;
2385         struct ctdb_public_ipv4 ipv4;
2386         int ret;
2387         int32_t res;
2388
2389         if (ip->addr.sa.sa_family == AF_INET) {
2390                 ipv4.pnn = ip->pnn;
2391                 ipv4.sin = ip->addr.ip;
2392
2393                 data.dsize = sizeof(ipv4);
2394                 data.dptr  = (uint8_t *)&ipv4;
2395
2396                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2397                            NULL, &res, &timeout, NULL);
2398         } else {
2399                 data.dsize = sizeof(*ip);
2400                 data.dptr  = (uint8_t *)ip;
2401
2402                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2403                            NULL, &res, &timeout, NULL);
2404         }
2405
2406         if (ret != 0 || res != 0) {
2407                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2408                 return -1;
2409         }
2410
2411         return 0;       
2412 }
2413
2414
2415 /* 
2416   sent to a node to make it release an ip address
2417 */
2418 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2419                          uint32_t destnode, struct ctdb_public_ip *ip)
2420 {
2421         TDB_DATA data;
2422         struct ctdb_public_ipv4 ipv4;
2423         int ret;
2424         int32_t res;
2425
2426         if (ip->addr.sa.sa_family == AF_INET) {
2427                 ipv4.pnn = ip->pnn;
2428                 ipv4.sin = ip->addr.ip;
2429
2430                 data.dsize = sizeof(ipv4);
2431                 data.dptr  = (uint8_t *)&ipv4;
2432
2433                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2434                                    NULL, &res, &timeout, NULL);
2435         } else {
2436                 data.dsize = sizeof(*ip);
2437                 data.dptr  = (uint8_t *)ip;
2438
2439                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2440                                    NULL, &res, &timeout, NULL);
2441         }
2442
2443         if (ret != 0 || res != 0) {
2444                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2445                 return -1;
2446         }
2447
2448         return 0;       
2449 }
2450
2451
2452 /*
2453   get a tunable
2454  */
2455 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2456                           struct timeval timeout, 
2457                           uint32_t destnode,
2458                           const char *name, uint32_t *value)
2459 {
2460         struct ctdb_control_get_tunable *t;
2461         TDB_DATA data, outdata;
2462         int32_t res;
2463         int ret;
2464
2465         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2466         data.dptr  = talloc_size(ctdb, data.dsize);
2467         CTDB_NO_MEMORY(ctdb, data.dptr);
2468
2469         t = (struct ctdb_control_get_tunable *)data.dptr;
2470         t->length = strlen(name)+1;
2471         memcpy(t->name, name, t->length);
2472
2473         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2474                            &outdata, &res, &timeout, NULL);
2475         talloc_free(data.dptr);
2476         if (ret != 0 || res != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2478                 return -1;
2479         }
2480
2481         if (outdata.dsize != sizeof(uint32_t)) {
2482                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2483                 talloc_free(outdata.dptr);
2484                 return -1;
2485         }
2486         
2487         *value = *(uint32_t *)outdata.dptr;
2488         talloc_free(outdata.dptr);
2489
2490         return 0;
2491 }
2492
2493 /*
2494   set a tunable
2495  */
2496 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2497                           struct timeval timeout, 
2498                           uint32_t destnode,
2499                           const char *name, uint32_t value)
2500 {
2501         struct ctdb_control_set_tunable *t;
2502         TDB_DATA data;
2503         int32_t res;
2504         int ret;
2505
2506         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2507         data.dptr  = talloc_size(ctdb, data.dsize);
2508         CTDB_NO_MEMORY(ctdb, data.dptr);
2509
2510         t = (struct ctdb_control_set_tunable *)data.dptr;
2511         t->length = strlen(name)+1;
2512         memcpy(t->name, name, t->length);
2513         t->value = value;
2514
2515         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2516                            NULL, &res, &timeout, NULL);
2517         talloc_free(data.dptr);
2518         if (ret != 0 || res != 0) {
2519                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2520                 return -1;
2521         }
2522
2523         return 0;
2524 }
2525
2526 /*
2527   list tunables
2528  */
2529 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2530                             struct timeval timeout, 
2531                             uint32_t destnode,
2532                             TALLOC_CTX *mem_ctx,
2533                             const char ***list, uint32_t *count)
2534 {
2535         TDB_DATA outdata;
2536         int32_t res;
2537         int ret;
2538         struct ctdb_control_list_tunable *t;
2539         char *p, *s, *ptr;
2540
2541         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2542                            mem_ctx, &outdata, &res, &timeout, NULL);
2543         if (ret != 0 || res != 0) {
2544                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2545                 return -1;
2546         }
2547
2548         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2549         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2550             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2551                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2552                 talloc_free(outdata.dptr);
2553                 return -1;              
2554         }
2555         
2556         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2557         CTDB_NO_MEMORY(ctdb, p);
2558
2559         talloc_free(outdata.dptr);
2560         
2561         (*list) = NULL;
2562         (*count) = 0;
2563
2564         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2565                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2566                 CTDB_NO_MEMORY(ctdb, *list);
2567                 (*list)[*count] = talloc_strdup(*list, s);
2568                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2569                 (*count)++;
2570         }
2571
2572         talloc_free(p);
2573
2574         return 0;
2575 }
2576
2577
2578 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2579                                    struct timeval timeout, uint32_t destnode,
2580                                    TALLOC_CTX *mem_ctx,
2581                                    uint32_t flags,
2582                                    struct ctdb_all_public_ips **ips)
2583 {
2584         int ret;
2585         TDB_DATA outdata;
2586         int32_t res;
2587
2588         ret = ctdb_control(ctdb, destnode, 0, 
2589                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2590                            mem_ctx, &outdata, &res, &timeout, NULL);
2591         if (ret == 0 && res == -1) {
2592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2593                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2594         }
2595         if (ret != 0 || res != 0) {
2596           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2597                 return -1;
2598         }
2599
2600         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2601         talloc_free(outdata.dptr);
2602                     
2603         return 0;
2604 }
2605
2606 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2607                              struct timeval timeout, uint32_t destnode,
2608                              TALLOC_CTX *mem_ctx,
2609                              struct ctdb_all_public_ips **ips)
2610 {
2611         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2612                                               destnode, mem_ctx,
2613                                               0, ips);
2614 }
2615
2616 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2617                         struct timeval timeout, uint32_t destnode, 
2618                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2619 {
2620         int ret, i, len;
2621         TDB_DATA outdata;
2622         int32_t res;
2623         struct ctdb_all_public_ipsv4 *ipsv4;
2624
2625         ret = ctdb_control(ctdb, destnode, 0, 
2626                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2627                            mem_ctx, &outdata, &res, &timeout, NULL);
2628         if (ret != 0 || res != 0) {
2629                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2630                 return -1;
2631         }
2632
2633         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2634         len = offsetof(struct ctdb_all_public_ips, ips) +
2635                 ipsv4->num*sizeof(struct ctdb_public_ip);
2636         *ips = talloc_zero_size(mem_ctx, len);
2637         CTDB_NO_MEMORY(ctdb, *ips);
2638         (*ips)->num = ipsv4->num;
2639         for (i=0; i<ipsv4->num; i++) {
2640                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2641                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2642         }
2643
2644         talloc_free(outdata.dptr);
2645                     
2646         return 0;
2647 }
2648
2649 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2650                                  struct timeval timeout, uint32_t destnode,
2651                                  TALLOC_CTX *mem_ctx,
2652                                  const ctdb_sock_addr *addr,
2653                                  struct ctdb_control_public_ip_info **_info)
2654 {
2655         int ret;
2656         TDB_DATA indata;
2657         TDB_DATA outdata;
2658         int32_t res;
2659         struct ctdb_control_public_ip_info *info;
2660         uint32_t len;
2661         uint32_t i;
2662
2663         indata.dptr = discard_const_p(uint8_t, addr);
2664         indata.dsize = sizeof(*addr);
2665
2666         ret = ctdb_control(ctdb, destnode, 0,
2667                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2668                            mem_ctx, &outdata, &res, &timeout, NULL);
2669         if (ret != 0 || res != 0) {
2670                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2671                                 "failed ret:%d res:%d\n",
2672                                 ret, res));
2673                 return -1;
2674         }
2675
2676         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2677         if (len > outdata.dsize) {
2678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2679                                 "returned invalid data with size %u > %u\n",
2680                                 (unsigned int)outdata.dsize,
2681                                 (unsigned int)len));
2682                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2683                 return -1;
2684         }
2685
2686         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2687         len += info->num*sizeof(struct ctdb_control_iface_info);
2688
2689         if (len > outdata.dsize) {
2690                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2691                                 "returned invalid data with size %u > %u\n",
2692                                 (unsigned int)outdata.dsize,
2693                                 (unsigned int)len));
2694                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2695                 return -1;
2696         }
2697
2698         /* make sure we null terminate the returned strings */
2699         for (i=0; i < info->num; i++) {
2700                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2701         }
2702
2703         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2704                                                                 outdata.dptr,
2705                                                                 outdata.dsize);
2706         talloc_free(outdata.dptr);
2707         if (*_info == NULL) {
2708                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2709                                 "talloc_memdup size %u failed\n",
2710                                 (unsigned int)outdata.dsize));
2711                 return -1;
2712         }
2713
2714         return 0;
2715 }
2716
2717 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2718                          struct timeval timeout, uint32_t destnode,
2719                          TALLOC_CTX *mem_ctx,
2720                          struct ctdb_control_get_ifaces **_ifaces)
2721 {
2722         int ret;
2723         TDB_DATA outdata;
2724         int32_t res;
2725         struct ctdb_control_get_ifaces *ifaces;
2726         uint32_t len;
2727         uint32_t i;
2728
2729         ret = ctdb_control(ctdb, destnode, 0,
2730                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2731                            mem_ctx, &outdata, &res, &timeout, NULL);
2732         if (ret != 0 || res != 0) {
2733                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2734                                 "failed ret:%d res:%d\n",
2735                                 ret, res));
2736                 return -1;
2737         }
2738
2739         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2740         if (len > outdata.dsize) {
2741                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2742                                 "returned invalid data with size %u > %u\n",
2743                                 (unsigned int)outdata.dsize,
2744                                 (unsigned int)len));
2745                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2746                 return -1;
2747         }
2748
2749         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2750         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2751
2752         if (len > outdata.dsize) {
2753                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2754                                 "returned invalid data with size %u > %u\n",
2755                                 (unsigned int)outdata.dsize,
2756                                 (unsigned int)len));
2757                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2758                 return -1;
2759         }
2760
2761         /* make sure we null terminate the returned strings */
2762         for (i=0; i < ifaces->num; i++) {
2763                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2764         }
2765
2766         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2767                                                                   outdata.dptr,
2768                                                                   outdata.dsize);
2769         talloc_free(outdata.dptr);
2770         if (*_ifaces == NULL) {
2771                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2772                                 "talloc_memdup size %u failed\n",
2773                                 (unsigned int)outdata.dsize));
2774                 return -1;
2775         }
2776
2777         return 0;
2778 }
2779
2780 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2781                              struct timeval timeout, uint32_t destnode,
2782                              TALLOC_CTX *mem_ctx,
2783                              const struct ctdb_control_iface_info *info)
2784 {
2785         int ret;
2786         TDB_DATA indata;
2787         int32_t res;
2788
2789         indata.dptr = discard_const_p(uint8_t, info);
2790         indata.dsize = sizeof(*info);
2791
2792         ret = ctdb_control(ctdb, destnode, 0,
2793                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2794                            mem_ctx, NULL, &res, &timeout, NULL);
2795         if (ret != 0 || res != 0) {
2796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2797                                 "failed ret:%d res:%d\n",
2798                                 ret, res));
2799                 return -1;
2800         }
2801
2802         return 0;
2803 }
2804
2805 /*
2806   set/clear the permanent disabled bit on a remote node
2807  */
2808 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2809                        uint32_t set, uint32_t clear)
2810 {
2811         int ret;
2812         TDB_DATA data;
2813         struct ctdb_node_map *nodemap=NULL;
2814         struct ctdb_node_flag_change c;
2815         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2816         uint32_t recmaster;
2817         uint32_t *nodes;
2818
2819
2820         /* find the recovery master */
2821         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2822         if (ret != 0) {
2823                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2824                 talloc_free(tmp_ctx);
2825                 return ret;
2826         }
2827
2828
2829         /* read the node flags from the recmaster */
2830         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2831         if (ret != 0) {
2832                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2833                 talloc_free(tmp_ctx);
2834                 return -1;
2835         }
2836         if (destnode >= nodemap->num) {
2837                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2838                 talloc_free(tmp_ctx);
2839                 return -1;
2840         }
2841
2842         c.pnn       = destnode;
2843         c.old_flags = nodemap->nodes[destnode].flags;
2844         c.new_flags = c.old_flags;
2845         c.new_flags |= set;
2846         c.new_flags &= ~clear;
2847
2848         data.dsize = sizeof(c);
2849         data.dptr = (unsigned char *)&c;
2850
2851         /* send the flags update to all connected nodes */
2852         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2853
2854         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2855                                         nodes, 0,
2856                                         timeout, false, data,
2857                                         NULL, NULL,
2858                                         NULL) != 0) {
2859                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2860
2861                 talloc_free(tmp_ctx);
2862                 return -1;
2863         }
2864
2865         talloc_free(tmp_ctx);
2866         return 0;
2867 }
2868
2869
2870 /*
2871   get all tunables
2872  */
2873 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2874                                struct timeval timeout, 
2875                                uint32_t destnode,
2876                                struct ctdb_tunable *tunables)
2877 {
2878         TDB_DATA outdata;
2879         int ret;
2880         int32_t res;
2881
2882         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2883                            &outdata, &res, &timeout, NULL);
2884         if (ret != 0 || res != 0) {
2885                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2886                 return -1;
2887         }
2888
2889         if (outdata.dsize != sizeof(*tunables)) {
2890                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2891                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2892                 return -1;              
2893         }
2894
2895         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2896         talloc_free(outdata.dptr);
2897         return 0;
2898 }
2899
2900 /*
2901   add a public address to a node
2902  */
2903 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2904                       struct timeval timeout, 
2905                       uint32_t destnode,
2906                       struct ctdb_control_ip_iface *pub)
2907 {
2908         TDB_DATA data;
2909         int32_t res;
2910         int ret;
2911
2912         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2913         data.dptr  = (unsigned char *)pub;
2914
2915         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2916                            NULL, &res, &timeout, NULL);
2917         if (ret != 0 || res != 0) {
2918                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2919                 return -1;
2920         }
2921
2922         return 0;
2923 }
2924
2925 /*
2926   delete a public address from a node
2927  */
2928 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2929                       struct timeval timeout, 
2930                       uint32_t destnode,
2931                       struct ctdb_control_ip_iface *pub)
2932 {
2933         TDB_DATA data;
2934         int32_t res;
2935         int ret;
2936
2937         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2938         data.dptr  = (unsigned char *)pub;
2939
2940         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2941                            NULL, &res, &timeout, NULL);
2942         if (ret != 0 || res != 0) {
2943                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2944                 return -1;
2945         }
2946
2947         return 0;
2948 }
2949
2950 /*
2951   kill a tcp connection
2952  */
2953 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2954                       struct timeval timeout, 
2955                       uint32_t destnode,
2956                       struct ctdb_control_killtcp *killtcp)
2957 {
2958         TDB_DATA data;
2959         int32_t res;
2960         int ret;
2961
2962         data.dsize = sizeof(struct ctdb_control_killtcp);
2963         data.dptr  = (unsigned char *)killtcp;
2964
2965         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2966                            NULL, &res, &timeout, NULL);
2967         if (ret != 0 || res != 0) {
2968                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2969                 return -1;
2970         }
2971
2972         return 0;
2973 }
2974
2975 /*
2976   send a gratious arp
2977  */
2978 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2979                       struct timeval timeout, 
2980                       uint32_t destnode,
2981                       ctdb_sock_addr *addr,
2982                       const char *ifname)
2983 {
2984         TDB_DATA data;
2985         int32_t res;
2986         int ret, len;
2987         struct ctdb_control_gratious_arp *gratious_arp;
2988         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2989
2990
2991         len = strlen(ifname)+1;
2992         gratious_arp = talloc_size(tmp_ctx, 
2993                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2994         CTDB_NO_MEMORY(ctdb, gratious_arp);
2995
2996         gratious_arp->addr = *addr;
2997         gratious_arp->len = len;
2998         memcpy(&gratious_arp->iface[0], ifname, len);
2999
3000
3001         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3002         data.dptr  = (unsigned char *)gratious_arp;
3003
3004         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3005                            NULL, &res, &timeout, NULL);
3006         if (ret != 0 || res != 0) {
3007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3008                 talloc_free(tmp_ctx);
3009                 return -1;
3010         }
3011
3012         talloc_free(tmp_ctx);
3013         return 0;
3014 }
3015
3016 /*
3017   get a list of all tcp tickles that a node knows about for a particular vnn
3018  */
3019 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3020                               struct timeval timeout, uint32_t destnode, 
3021                               TALLOC_CTX *mem_ctx, 
3022                               ctdb_sock_addr *addr,
3023                               struct ctdb_control_tcp_tickle_list **list)
3024 {
3025         int ret;
3026         TDB_DATA data, outdata;
3027         int32_t status;
3028
3029         data.dptr = (uint8_t*)addr;
3030         data.dsize = sizeof(ctdb_sock_addr);
3031
3032         ret = ctdb_control(ctdb, destnode, 0, 
3033                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3034                            mem_ctx, &outdata, &status, NULL, NULL);
3035         if (ret != 0 || status != 0) {
3036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3037                 return -1;
3038         }
3039
3040         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3041
3042         return status;
3043 }
3044
3045 /*
3046   register a server id
3047  */
3048 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3049                       struct timeval timeout, 
3050                       struct ctdb_server_id *id)
3051 {
3052         TDB_DATA data;
3053         int32_t res;
3054         int ret;
3055
3056         data.dsize = sizeof(struct ctdb_server_id);
3057         data.dptr  = (unsigned char *)id;
3058
3059         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3060                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3061                         0, data, NULL,
3062                         NULL, &res, &timeout, NULL);
3063         if (ret != 0 || res != 0) {
3064                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3065                 return -1;
3066         }
3067
3068         return 0;
3069 }
3070
3071 /*
3072   unregister a server id
3073  */
3074 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3075                       struct timeval timeout, 
3076                       struct ctdb_server_id *id)
3077 {
3078         TDB_DATA data;
3079         int32_t res;
3080         int ret;
3081
3082         data.dsize = sizeof(struct ctdb_server_id);
3083         data.dptr  = (unsigned char *)id;
3084
3085         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3086                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3087                         0, data, NULL,
3088                         NULL, &res, &timeout, NULL);
3089         if (ret != 0 || res != 0) {
3090                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3091                 return -1;
3092         }
3093
3094         return 0;
3095 }
3096
3097
3098 /*
3099   check if a server id exists
3100
3101   if a server id does exist, return *status == 1, otherwise *status == 0
3102  */
3103 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3104                       struct timeval timeout, 
3105                       uint32_t destnode,
3106                       struct ctdb_server_id *id,
3107                       uint32_t *status)
3108 {
3109         TDB_DATA data;
3110         int32_t res;
3111         int ret;
3112
3113         data.dsize = sizeof(struct ctdb_server_id);
3114         data.dptr  = (unsigned char *)id;
3115
3116         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3117                         0, data, NULL,
3118                         NULL, &res, &timeout, NULL);
3119         if (ret != 0) {
3120                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3121                 return -1;
3122         }
3123
3124         if (res) {
3125                 *status = 1;
3126         } else {
3127                 *status = 0;
3128         }
3129
3130         return 0;
3131 }
3132
3133 /*
3134    get the list of server ids that are registered on a node
3135 */
3136 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3137                 TALLOC_CTX *mem_ctx,
3138                 struct timeval timeout, uint32_t destnode, 
3139                 struct ctdb_server_id_list **svid_list)
3140 {
3141         int ret;
3142         TDB_DATA outdata;
3143         int32_t res;
3144
3145         ret = ctdb_control(ctdb, destnode, 0, 
3146                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3147                            mem_ctx, &outdata, &res, &timeout, NULL);
3148         if (ret != 0 || res != 0) {
3149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3150                 return -1;
3151         }
3152
3153         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3154                     
3155         return 0;
3156 }
3157
3158 /*
3159   initialise the ctdb daemon for client applications
3160
3161   NOTE: In current code the daemon does not fork. This is for testing purposes only
3162   and to simplify the code.
3163 */
3164 struct ctdb_context *ctdb_init(struct event_context *ev)
3165 {
3166         int ret;
3167         struct ctdb_context *ctdb;
3168
3169         ctdb = talloc_zero(ev, struct ctdb_context);
3170         if (ctdb == NULL) {
3171                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3172                 return NULL;
3173         }
3174         ctdb->ev  = ev;
3175         ctdb->idr = idr_init(ctdb);
3176         /* Wrap early to exercise code. */
3177         ctdb->lastid = INT_MAX-200;
3178         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3179
3180         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3181         if (ret != 0) {
3182                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3183                 talloc_free(ctdb);
3184                 return NULL;
3185         }
3186
3187         ctdb->statistics.statistics_start_time = timeval_current();
3188
3189         return ctdb;
3190 }
3191
3192
3193 /*
3194   set some ctdb flags
3195 */
3196 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3197 {
3198         ctdb->flags |= flags;
3199 }
3200
3201 /*
3202   setup the local socket name
3203 */
3204 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3205 {
3206         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3207         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3208
3209         return 0;
3210 }
3211
3212 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3213 {
3214         return ctdb->daemon.name;
3215 }
3216
3217 /*
3218   return the pnn of this node
3219 */
3220 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3221 {
3222         return ctdb->pnn;
3223 }
3224
3225
3226 /*
3227   get the uptime of a remote node
3228  */
3229 struct ctdb_client_control_state *
3230 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3231 {
3232         return ctdb_control_send(ctdb, destnode, 0, 
3233                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3234                            mem_ctx, &timeout, NULL);
3235 }
3236
3237 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3238 {
3239         int ret;
3240         int32_t res;
3241         TDB_DATA outdata;
3242
3243         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3244         if (ret != 0 || res != 0) {
3245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3246                 return -1;
3247         }
3248
3249         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3250
3251         return 0;
3252 }
3253
3254 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3255 {
3256         struct ctdb_client_control_state *state;
3257
3258         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3259         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3260 }
3261
3262 /*
3263   send a control to execute the "recovered" event script on a node
3264  */
3265 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3266 {
3267         int ret;
3268         int32_t status;
3269
3270         ret = ctdb_control(ctdb, destnode, 0, 
3271                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3272                            NULL, NULL, &status, &timeout, NULL);
3273         if (ret != 0 || status != 0) {
3274                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3275                 return -1;
3276         }
3277
3278         return 0;
3279 }
3280
3281 /* 
3282   callback for the async helpers used when sending the same control
3283   to multiple nodes in parallell.
3284 */
3285 static void async_callback(struct ctdb_client_control_state *state)
3286 {
3287         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3288         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3289         int ret;
3290         TDB_DATA outdata;
3291         int32_t res;
3292         uint32_t destnode = state->c->hdr.destnode;
3293
3294         /* one more node has responded with recmode data */
3295         data->count--;
3296
3297         /* if we failed to push the db, then return an error and let
3298            the main loop try again.
3299         */
3300         if (state->state != CTDB_CONTROL_DONE) {
3301                 if ( !data->dont_log_errors) {
3302                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3303                 }
3304                 data->fail_count++;
3305                 if (data->fail_callback) {
3306                         data->fail_callback(ctdb, destnode, res, outdata,
3307                                         data->callback_data);
3308                 }
3309                 return;
3310         }
3311         
3312         state->async.fn = NULL;
3313
3314         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3315         if ((ret != 0) || (res != 0)) {
3316                 if ( !data->dont_log_errors) {
3317                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3318                 }
3319                 data->fail_count++;
3320                 if (data->fail_callback) {
3321                         data->fail_callback(ctdb, destnode, res, outdata,
3322                                         data->callback_data);
3323                 }
3324         }
3325         if ((ret == 0) && (data->callback != NULL)) {
3326                 data->callback(ctdb, destnode, res, outdata,
3327                                         data->callback_data);
3328         }
3329 }
3330
3331
3332 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3333 {
3334         /* set up the callback functions */
3335         state->async.fn = async_callback;
3336         state->async.private_data = data;
3337         
3338         /* one more control to wait for to complete */
3339         data->count++;
3340 }
3341
3342
3343 /* wait for up to the maximum number of seconds allowed
3344    or until all nodes we expect a response from has replied
3345 */
3346 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3347 {
3348         while (data->count > 0) {
3349                 event_loop_once(ctdb->ev);
3350         }
3351         if (data->fail_count != 0) {
3352                 if (!data->dont_log_errors) {
3353                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3354                                  data->fail_count));
3355                 }
3356                 return -1;
3357         }
3358         return 0;
3359 }
3360
3361
3362 /* 
3363    perform a simple control on the listed nodes
3364    The control cannot return data
3365  */
3366 int ctdb_client_async_control(struct ctdb_context *ctdb,
3367                                 enum ctdb_controls opcode,
3368                                 uint32_t *nodes,
3369                                 uint64_t srvid,
3370                                 struct timeval timeout,
3371                                 bool dont_log_errors,
3372                                 TDB_DATA data,
3373                                 client_async_callback client_callback,
3374                                 client_async_callback fail_callback,
3375                                 void *callback_data)
3376 {
3377         struct client_async_data *async_data;
3378         struct ctdb_client_control_state *state;
3379         int j, num_nodes;
3380
3381         async_data = talloc_zero(ctdb, struct client_async_data);
3382         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3383         async_data->dont_log_errors = dont_log_errors;
3384         async_data->callback = client_callback;
3385         async_data->fail_callback = fail_callback;
3386         async_data->callback_data = callback_data;
3387         async_data->opcode        = opcode;
3388
3389         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3390
3391         /* loop over all nodes and send an async control to each of them */
3392         for (j=0; j<num_nodes; j++) {
3393                 uint32_t pnn = nodes[j];
3394
3395                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3396                                           0, data, async_data, &timeout, NULL);
3397                 if (state == NULL) {
3398                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3399                         talloc_free(async_data);
3400                         return -1;
3401                 }
3402                 
3403                 ctdb_client_async_add(async_data, state);
3404         }
3405
3406         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3407                 talloc_free(async_data);
3408                 return -1;
3409         }
3410
3411         talloc_free(async_data);
3412         return 0;
3413 }
3414
3415 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3416                                 struct ctdb_vnn_map *vnn_map,
3417                                 TALLOC_CTX *mem_ctx,
3418                                 bool include_self)
3419 {
3420         int i, j, num_nodes;
3421         uint32_t *nodes;
3422
3423         for (i=num_nodes=0;i<vnn_map->size;i++) {
3424                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3425                         continue;
3426                 }
3427                 num_nodes++;
3428         } 
3429
3430         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3431         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3432
3433         for (i=j=0;i<vnn_map->size;i++) {
3434                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3435                         continue;
3436                 }
3437                 nodes[j++] = vnn_map->map[i];
3438         } 
3439
3440         return nodes;
3441 }
3442
3443 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3444                                 struct ctdb_node_map *node_map,
3445                                 TALLOC_CTX *mem_ctx,
3446                                 bool include_self)
3447 {
3448         int i, j, num_nodes;
3449         uint32_t *nodes;
3450
3451         for (i=num_nodes=0;i<node_map->num;i++) {
3452                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3453                         continue;
3454                 }
3455                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3456                         continue;
3457                 }
3458                 num_nodes++;
3459         } 
3460
3461         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3462         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3463
3464         for (i=j=0;i<node_map->num;i++) {
3465                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3466                         continue;
3467                 }
3468                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3469                         continue;
3470                 }
3471                 nodes[j++] = node_map->nodes[i].pnn;
3472         } 
3473
3474         return nodes;
3475 }
3476
3477 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3478                                 struct ctdb_node_map *node_map,
3479                                 TALLOC_CTX *mem_ctx,
3480                                 uint32_t pnn)
3481 {
3482         int i, j, num_nodes;
3483         uint32_t *nodes;
3484
3485         for (i=num_nodes=0;i<node_map->num;i++) {
3486                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3487                         continue;
3488                 }
3489                 if (node_map->nodes[i].pnn == pnn) {
3490                         continue;
3491                 }
3492                 num_nodes++;
3493         } 
3494
3495         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3496         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3497
3498         for (i=j=0;i<node_map->num;i++) {
3499                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3500                         continue;
3501                 }
3502                 if (node_map->nodes[i].pnn == pnn) {
3503                         continue;
3504                 }
3505                 nodes[j++] = node_map->nodes[i].pnn;
3506         } 
3507
3508         return nodes;
3509 }
3510
3511 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3512                                 struct ctdb_node_map *node_map,
3513                                 TALLOC_CTX *mem_ctx,
3514                                 bool include_self)
3515 {
3516         int i, j, num_nodes;
3517         uint32_t *nodes;
3518
3519         for (i=num_nodes=0;i<node_map->num;i++) {
3520                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3521                         continue;
3522                 }
3523                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3524                         continue;
3525                 }
3526                 num_nodes++;
3527         } 
3528
3529         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3530         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3531
3532         for (i=j=0;i<node_map->num;i++) {
3533                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3534                         continue;
3535                 }
3536                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3537                         continue;
3538                 }
3539                 nodes[j++] = node_map->nodes[i].pnn;
3540         } 
3541
3542         return nodes;
3543 }
3544
3545 /* 
3546   this is used to test if a pnn lock exists and if it exists will return
3547   the number of connections that pnn has reported or -1 if that recovery
3548   daemon is not running.
3549 */
3550 int
3551 ctdb_read_pnn_lock(int fd, int32_t pnn)
3552 {
3553         struct flock lock;
3554         char c;
3555
3556         lock.l_type = F_WRLCK;
3557         lock.l_whence = SEEK_SET;
3558         lock.l_start = pnn;
3559         lock.l_len = 1;
3560         lock.l_pid = 0;
3561
3562         if (fcntl(fd, F_GETLK, &lock) != 0) {
3563                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3564                 return -1;
3565         }
3566
3567         if (lock.l_type == F_UNLCK) {
3568                 return -1;
3569         }
3570
3571         if (pread(fd, &c, 1, pnn) == -1) {
3572                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3573                 return -1;
3574         }
3575
3576         return c;
3577 }
3578
3579 /*
3580   get capabilities of a remote node
3581  */
3582 struct ctdb_client_control_state *
3583 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3584 {
3585         return ctdb_control_send(ctdb, destnode, 0, 
3586                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3587                            mem_ctx, &timeout, NULL);
3588 }
3589
3590 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3591 {
3592         int ret;
3593         int32_t res;
3594         TDB_DATA outdata;
3595
3596         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3597         if ( (ret != 0) || (res != 0) ) {
3598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3599                 return -1;
3600         }
3601
3602         if (capabilities) {
3603                 *capabilities = *((uint32_t *)outdata.dptr);
3604         }
3605
3606         return 0;
3607 }
3608
3609 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3610 {
3611         struct ctdb_client_control_state *state;
3612         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3613         int ret;
3614
3615         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3616         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3617         talloc_free(tmp_ctx);
3618         return ret;
3619 }
3620
3621 /**
3622  * check whether a transaction is active on a given db on a given node
3623  */
3624 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3625                                      uint32_t destnode,
3626                                      uint32_t db_id)
3627 {
3628         int32_t status;
3629         int ret;
3630         TDB_DATA indata;
3631
3632         indata.dptr = (uint8_t *)&db_id;
3633         indata.dsize = sizeof(db_id);
3634
3635         ret = ctdb_control(ctdb, destnode, 0,
3636                            CTDB_CONTROL_TRANS2_ACTIVE,
3637                            0, indata, NULL, NULL, &status,
3638                            NULL, NULL);
3639
3640         if (ret != 0) {
3641                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3642                 return -1;
3643         }
3644
3645         return status;
3646 }
3647
3648
3649 struct ctdb_transaction_handle {
3650         struct ctdb_db_context *ctdb_db;
3651         bool in_replay;
3652         /*
3653          * we store the reads and writes done under a transaction:
3654          * - one list stores both reads and writes (m_all),
3655          * - the other just writes (m_write)
3656          */
3657         struct ctdb_marshall_buffer *m_all;
3658         struct ctdb_marshall_buffer *m_write;
3659 };
3660
3661 /* start a transaction on a database */
3662 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3663 {
3664         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3665         return 0;
3666 }
3667
3668 /* start a transaction on a database */
3669 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3670 {
3671         struct ctdb_record_handle *rh;
3672         TDB_DATA key;
3673         TDB_DATA data;
3674         struct ctdb_ltdb_header header;
3675         TALLOC_CTX *tmp_ctx;
3676         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3677         int ret;
3678         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3679         pid_t pid;
3680         int32_t status;
3681
3682         key.dptr = discard_const(keyname);
3683         key.dsize = strlen(keyname);
3684
3685         if (!ctdb_db->persistent) {
3686                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3687                 return -1;
3688         }
3689
3690 again:
3691         tmp_ctx = talloc_new(h);
3692
3693         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3694         if (rh == NULL) {
3695                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3696                 talloc_free(tmp_ctx);
3697                 return -1;
3698         }
3699
3700         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3701                                               CTDB_CURRENT_NODE,
3702                                               ctdb_db->db_id);
3703         if (status == 1) {
3704                 unsigned long int usec = (1000 + random()) % 100000;
3705                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3706                                     "on db_id[0x%08x]. waiting for %lu "
3707                                     "microseconds\n",
3708                                     ctdb_db->db_id, usec));
3709                 talloc_free(tmp_ctx);
3710                 usleep(usec);
3711                 goto again;
3712         }
3713
3714         /*
3715          * store the pid in the database:
3716          * it is not enough that the node is dmaster...
3717          */
3718         pid = getpid();
3719         data.dptr = (unsigned char *)&pid;
3720         data.dsize = sizeof(pid_t);
3721         rh->header.rsn++;
3722         rh->header.dmaster = ctdb_db->ctdb->pnn;
3723         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3724         if (ret != 0) {
3725                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3726                                   "transaction record\n"));
3727                 talloc_free(tmp_ctx);
3728                 return -1;
3729         }
3730
3731         talloc_free(rh);
3732
3733         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3734         if (ret != 0) {
3735                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3736                 talloc_free(tmp_ctx);
3737                 return -1;
3738         }
3739
3740         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3741         if (ret != 0) {
3742                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3743                                  "lock record inside transaction\n"));
3744                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3745                 talloc_free(tmp_ctx);
3746                 goto again;
3747         }
3748
3749         if (header.dmaster != ctdb_db->ctdb->pnn) {
3750                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3751                                    "transaction lock record\n"));
3752                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3753                 talloc_free(tmp_ctx);
3754                 goto again;
3755         }
3756
3757         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3758                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3759                                     "the transaction lock record\n"));
3760                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3761                 talloc_free(tmp_ctx);
3762                 goto again;
3763         }
3764
3765         talloc_free(tmp_ctx);
3766
3767         return 0;
3768 }
3769
3770
3771 /* start a transaction on a database */
3772 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3773                                                        TALLOC_CTX *mem_ctx)
3774 {
3775         struct ctdb_transaction_handle *h;
3776         int ret;
3777
3778         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3779         if (h == NULL) {
3780                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3781                 return NULL;
3782         }
3783
3784         h->ctdb_db = ctdb_db;
3785
3786         ret = ctdb_transaction_fetch_start(h);
3787         if (ret != 0) {
3788                 talloc_free(h);
3789                 return NULL;
3790         }
3791
3792         talloc_set_destructor(h, ctdb_transaction_destructor);
3793
3794         return h;
3795 }
3796
3797
3798
3799 /*
3800   fetch a record inside a transaction
3801  */
3802 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3803                            TALLOC_CTX *mem_ctx, 
3804                            TDB_DATA key, TDB_DATA *data)
3805 {
3806         struct ctdb_ltdb_header header;
3807         int ret;
3808
3809         ZERO_STRUCT(header);
3810
3811         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3812         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3813                 /* record doesn't exist yet */
3814                 *data = tdb_null;
3815                 ret = 0;
3816         }
3817         
3818         if (ret != 0) {
3819                 return ret;
3820         }
3821
3822         if (!h->in_replay) {
3823                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3824                 if (h->m_all == NULL) {
3825                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3826                         return -1;
3827                 }
3828         }
3829
3830         return 0;
3831 }
3832
3833 /*
3834   stores a record inside a transaction
3835  */
3836 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3837                            TDB_DATA key, TDB_DATA data)
3838 {
3839         TALLOC_CTX *tmp_ctx = talloc_new(h);
3840         struct ctdb_ltdb_header header;
3841         TDB_DATA olddata;
3842         int ret;
3843
3844         ZERO_STRUCT(header);
3845
3846         /* we need the header so we can update the RSN */
3847         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3848         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3849                 /* the record doesn't exist - create one with us as dmaster.
3850                    This is only safe because we are in a transaction and this
3851                    is a persistent database */
3852                 ZERO_STRUCT(header);
3853         } else if (ret != 0) {
3854                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3855                 talloc_free(tmp_ctx);
3856                 return ret;
3857         }
3858
3859         if (data.dsize == olddata.dsize &&
3860             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3861                 /* save writing the same data */
3862                 talloc_free(tmp_ctx);
3863                 return 0;
3864         }
3865
3866         header.dmaster = h->ctdb_db->ctdb->pnn;
3867         header.rsn++;
3868
3869         if (!h->in_replay) {
3870                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3871                 if (h->m_all == NULL) {
3872                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3873                         talloc_free(tmp_ctx);
3874                         return -1;
3875                 }
3876         }               
3877
3878         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3879         if (h->m_write == NULL) {
3880                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3881                 talloc_free(tmp_ctx);
3882                 return -1;
3883         }
3884         
3885         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3886
3887         talloc_free(tmp_ctx);
3888         
3889         return ret;
3890 }
3891
3892 /*
3893   replay a transaction
3894  */
3895 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3896 {
3897         int ret, i;
3898         struct ctdb_rec_data *rec = NULL;
3899
3900         h->in_replay = true;
3901         talloc_free(h->m_write);
3902         h->m_write = NULL;
3903
3904         ret = ctdb_transaction_fetch_start(h);
3905         if (ret != 0) {
3906                 return ret;
3907         }
3908
3909         for (i=0;i<h->m_all->count;i++) {
3910                 TDB_DATA key, data;
3911
3912                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3913                 if (rec == NULL) {
3914                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3915                         goto failed;
3916                 }
3917
3918                 if (rec->reqid == 0) {
3919                         /* its a store */
3920                         if (ctdb_transaction_store(h, key, data) != 0) {
3921                                 goto failed;
3922                         }
3923                 } else {
3924                         TDB_DATA data2;
3925                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3926
3927                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3928                                 talloc_free(tmp_ctx);
3929                                 goto failed;
3930                         }
3931                         if (data2.dsize != data.dsize ||
3932                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3933                                 /* the record has changed on us - we have to give up */
3934                                 talloc_free(tmp_ctx);
3935                                 goto failed;
3936                         }
3937                         talloc_free(tmp_ctx);
3938                 }
3939         }
3940         
3941         return 0;
3942
3943 failed:
3944         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3945         return -1;
3946 }
3947
3948
3949 /*
3950   commit a transaction
3951  */
3952 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3953 {
3954         int ret, retries=0;
3955         int32_t status;
3956         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3957         struct timeval timeout;
3958         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3959
3960         talloc_set_destructor(h, NULL);
3961
3962         /* our commit strategy is quite complex.
3963
3964            - we first try to commit the changes to all other nodes
3965
3966            - if that works, then we commit locally and we are done
3967
3968            - if a commit on another node fails, then we need to cancel
3969              the transaction, then restart the transaction (thus
3970              opening a window of time for a pending recovery to
3971              complete), then replay the transaction, checking all the
3972              reads and writes (checking that reads give the same data,
3973              and writes succeed). Then we retry the transaction to the
3974              other nodes
3975         */
3976
3977 again:
3978         if (h->m_write == NULL) {
3979                 /* no changes were made */
3980                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3981                 talloc_free(h);
3982                 return 0;
3983         }
3984
3985         /* tell ctdbd to commit to the other nodes */
3986         timeout = timeval_current_ofs(1, 0);
3987         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3988                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3989                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3990                            &timeout, NULL);
3991         if (ret != 0 || status != 0) {
3992                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3993                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3994                                      ", retrying after 1 second...\n",
3995                                      (retries==0)?"":"retry "));
3996                 sleep(1);
3997
3998                 if (ret != 0) {
3999                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4000                 } else {
4001                         /* work out what error code we will give if we 
4002                            have to fail the operation */
4003                         switch ((enum ctdb_trans2_commit_error)status) {
4004                         case CTDB_TRANS2_COMMIT_SUCCESS:
4005                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4006                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4007                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4008                                 break;
4009                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4010                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4011                                 break;
4012                         }
4013                 }
4014
4015                 if (++retries == 100) {
4016                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4017                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4018                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4019                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4020                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4021                         talloc_free(h);
4022                         return -1;
4023                 }               
4024
4025                 if (ctdb_replay_transaction(h) != 0) {
4026                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4027                                           "transaction on db 0x%08x, "
4028                                           "failure control =%u\n",
4029                                           h->ctdb_db->db_id,
4030                                           (unsigned)failure_control));
4031                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4032                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4033                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4034                         talloc_free(h);
4035                         return -1;
4036                 }
4037                 goto again;
4038         } else {
4039                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4040         }
4041
4042         /* do the real commit locally */
4043         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4044         if (ret != 0) {
4045                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4046                                   "on db id 0x%08x locally, "
4047                                   "failure_control=%u\n",
4048                                   h->ctdb_db->db_id,
4049                                   (unsigned)failure_control));
4050                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4051                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4052                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4053                 talloc_free(h);
4054                 return ret;
4055         }
4056
4057         /* tell ctdbd that we are finished with our local commit */
4058         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4059                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4060                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4061         talloc_free(h);
4062         return 0;
4063 }
4064
4065 /*
4066   recovery daemon ping to main daemon
4067  */
4068 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4069 {
4070         int ret;
4071         int32_t res;
4072
4073         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4074                            ctdb, NULL, &res, NULL, NULL);
4075         if (ret != 0 || res != 0) {
4076                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4077                 return -1;
4078         }
4079
4080         return 0;
4081 }
4082
4083 /* when forking the main daemon and the child process needs to connect back
4084  * to the daemon as a client process, this function can be used to change
4085  * the ctdb context from daemon into client mode
4086  */
4087 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4088 {
4089         int ret;
4090         va_list ap;
4091
4092         /* Add extra information so we can identify this in the logs */
4093         va_start(ap, fmt);
4094         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4095         va_end(ap);
4096
4097         /* shutdown the transport */
4098         if (ctdb->methods) {
4099                 ctdb->methods->shutdown(ctdb);
4100         }
4101
4102         /* get a new event context */
4103         talloc_free(ctdb->ev);
4104         ctdb->ev = event_context_init(ctdb);
4105         tevent_loop_allow_nesting(ctdb->ev);
4106
4107         close(ctdb->daemon.sd);
4108         ctdb->daemon.sd = -1;
4109
4110         /* the client does not need to be realtime */
4111         if (ctdb->do_setsched) {
4112                 ctdb_restore_scheduler(ctdb);
4113         }
4114
4115         /* initialise ctdb */
4116         ret = ctdb_socket_connect(ctdb);
4117         if (ret != 0) {
4118                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4119                 return -1;
4120         }
4121
4122          return 0;
4123 }
4124
4125 /*
4126   get the status of running the monitor eventscripts: NULL means never run.
4127  */
4128 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4129                 struct timeval timeout, uint32_t destnode, 
4130                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4131                 struct ctdb_scripts_wire **script_status)
4132 {
4133         int ret;
4134         TDB_DATA outdata, indata;
4135         int32_t res;
4136         uint32_t uinttype = type;
4137
4138         indata.dptr = (uint8_t *)&uinttype;
4139         indata.dsize = sizeof(uinttype);
4140
4141         ret = ctdb_control(ctdb, destnode, 0, 
4142                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4143                            mem_ctx, &outdata, &res, &timeout, NULL);
4144         if (ret != 0 || res != 0) {
4145                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4146                 return -1;
4147         }
4148
4149         if (outdata.dsize == 0) {
4150                 *script_status = NULL;
4151         } else {
4152                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4153                 talloc_free(outdata.dptr);
4154         }
4155                     
4156         return 0;
4157 }
4158
4159 /*
4160   tell the main daemon how long it took to lock the reclock file
4161  */
4162 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4163 {
4164         int ret;
4165         int32_t res;
4166         TDB_DATA data;
4167
4168         data.dptr = (uint8_t *)&latency;
4169         data.dsize = sizeof(latency);
4170
4171         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4172                            ctdb, NULL, &res, NULL, NULL);
4173         if (ret != 0 || res != 0) {
4174                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4175                 return -1;
4176         }
4177
4178         return 0;
4179 }
4180
4181 /*
4182   get the name of the reclock file
4183  */
4184 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4185                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4186                          const char **name)
4187 {
4188         int ret;
4189         int32_t res;
4190         TDB_DATA data;
4191
4192         ret = ctdb_control(ctdb, destnode, 0, 
4193                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4194                            mem_ctx, &data, &res, &timeout, NULL);
4195         if (ret != 0 || res != 0) {
4196                 return -1;
4197         }
4198
4199         if (data.dsize == 0) {
4200                 *name = NULL;
4201         } else {
4202                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4203         }
4204         talloc_free(data.dptr);
4205
4206         return 0;
4207 }
4208
4209 /*
4210   set the reclock filename for a node
4211  */
4212 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4213 {
4214         int ret;
4215         TDB_DATA data;
4216         int32_t res;
4217
4218         if (reclock == NULL) {
4219                 data.dsize = 0;
4220                 data.dptr  = NULL;
4221         } else {
4222                 data.dsize = strlen(reclock) + 1;
4223                 data.dptr  = discard_const(reclock);
4224         }
4225
4226         ret = ctdb_control(ctdb, destnode, 0, 
4227                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4228                            NULL, NULL, &res, &timeout, NULL);
4229         if (ret != 0 || res != 0) {
4230                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4231                 return -1;
4232         }
4233
4234         return 0;
4235 }
4236
4237 /*
4238   stop a node
4239  */
4240 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4241 {
4242         int ret;
4243         int32_t res;
4244
4245         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4246                            ctdb, NULL, &res, &timeout, NULL);
4247         if (ret != 0 || res != 0) {
4248                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4249                 return -1;
4250         }
4251
4252         return 0;
4253 }
4254
4255 /*
4256   continue a node
4257  */
4258 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4259 {
4260         int ret;
4261
4262         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4263                            ctdb, NULL, NULL, &timeout, NULL);
4264         if (ret != 0) {
4265                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4266                 return -1;
4267         }
4268
4269         return 0;
4270 }
4271
4272 /*
4273   set the natgw state for a node
4274  */
4275 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4276 {
4277         int ret;
4278         TDB_DATA data;
4279         int32_t res;
4280
4281         data.dsize = sizeof(natgwstate);
4282         data.dptr  = (uint8_t *)&natgwstate;
4283
4284         ret = ctdb_control(ctdb, destnode, 0, 
4285                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4286                            NULL, NULL, &res, &timeout, NULL);
4287         if (ret != 0 || res != 0) {
4288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4289                 return -1;
4290         }
4291
4292         return 0;
4293 }
4294
4295 /*
4296   set the lmaster role for a node
4297  */
4298 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4299 {
4300         int ret;
4301         TDB_DATA data;
4302         int32_t res;
4303
4304         data.dsize = sizeof(lmasterrole);
4305         data.dptr  = (uint8_t *)&lmasterrole;
4306
4307         ret = ctdb_control(ctdb, destnode, 0, 
4308                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4309                            NULL, NULL, &res, &timeout, NULL);
4310         if (ret != 0 || res != 0) {
4311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4312                 return -1;
4313         }
4314
4315         return 0;
4316 }
4317
4318 /*
4319   set the recmaster role for a node
4320  */
4321 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4322 {
4323         int ret;
4324         TDB_DATA data;
4325         int32_t res;
4326
4327         data.dsize = sizeof(recmasterrole);
4328         data.dptr  = (uint8_t *)&recmasterrole;
4329
4330         ret = ctdb_control(ctdb, destnode, 0, 
4331                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4332                            NULL, NULL, &res, &timeout, NULL);
4333         if (ret != 0 || res != 0) {
4334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4335                 return -1;
4336         }
4337
4338         return 0;
4339 }
4340
4341 /* enable an eventscript
4342  */
4343 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4344 {
4345         int ret;
4346         TDB_DATA data;
4347         int32_t res;
4348
4349         data.dsize = strlen(script) + 1;
4350         data.dptr  = discard_const(script);
4351
4352         ret = ctdb_control(ctdb, destnode, 0, 
4353                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4354                            NULL, NULL, &res, &timeout, NULL);
4355         if (ret != 0 || res != 0) {
4356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4357                 return -1;
4358         }
4359
4360         return 0;
4361 }
4362
4363 /* disable an eventscript
4364  */
4365 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4366 {
4367         int ret;
4368         TDB_DATA data;
4369         int32_t res;
4370
4371         data.dsize = strlen(script) + 1;
4372         data.dptr  = discard_const(script);
4373
4374         ret = ctdb_control(ctdb, destnode, 0, 
4375                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4376                            NULL, NULL, &res, &timeout, NULL);
4377         if (ret != 0 || res != 0) {
4378                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4379                 return -1;
4380         }
4381
4382         return 0;
4383 }
4384
4385
4386 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4387 {
4388         int ret;
4389         TDB_DATA data;
4390         int32_t res;
4391
4392         data.dsize = sizeof(*bantime);
4393         data.dptr  = (uint8_t *)bantime;
4394
4395         ret = ctdb_control(ctdb, destnode, 0, 
4396                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4397                            NULL, NULL, &res, &timeout, NULL);
4398         if (ret != 0 || res != 0) {
4399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4400                 return -1;
4401         }
4402
4403         return 0;
4404 }
4405
4406
4407 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4408 {
4409         int ret;
4410         TDB_DATA outdata;
4411         int32_t res;
4412         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4413
4414         ret = ctdb_control(ctdb, destnode, 0, 
4415                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4416                            tmp_ctx, &outdata, &res, &timeout, NULL);
4417         if (ret != 0 || res != 0) {
4418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4419                 talloc_free(tmp_ctx);
4420                 return -1;
4421         }
4422
4423         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4424         talloc_free(tmp_ctx);
4425
4426         return 0;
4427 }
4428
4429
4430 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4431 {
4432         int ret;
4433         int32_t res;
4434         TDB_DATA data;
4435         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4436
4437         data.dptr = (uint8_t*)db_prio;
4438         data.dsize = sizeof(*db_prio);
4439
4440         ret = ctdb_control(ctdb, destnode, 0, 
4441                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4442                            tmp_ctx, NULL, &res, &timeout, NULL);
4443         if (ret != 0 || res != 0) {
4444                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4445                 talloc_free(tmp_ctx);
4446                 return -1;
4447         }
4448
4449         talloc_free(tmp_ctx);
4450
4451         return 0;
4452 }
4453
4454 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4455 {
4456         int ret;
4457         int32_t res;
4458         TDB_DATA data;
4459         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4460
4461         data.dptr = (uint8_t*)&db_id;
4462         data.dsize = sizeof(db_id);
4463
4464         ret = ctdb_control(ctdb, destnode, 0, 
4465                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4466                            tmp_ctx, NULL, &res, &timeout, NULL);
4467         if (ret != 0 || res < 0) {
4468                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4469                 talloc_free(tmp_ctx);
4470                 return -1;
4471         }
4472
4473         if (priority) {
4474                 *priority = res;
4475         }
4476
4477         talloc_free(tmp_ctx);
4478
4479         return 0;
4480 }
4481
4482 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4483 {
4484         int ret;
4485         TDB_DATA outdata;
4486         int32_t res;
4487
4488         ret = ctdb_control(ctdb, destnode, 0, 
4489                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4490                            mem_ctx, &outdata, &res, &timeout, NULL);
4491         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4493                 return -1;
4494         }
4495
4496         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4497         talloc_free(outdata.dptr);
4498                     
4499         return 0;
4500 }
4501
4502 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4503 {
4504         if (h == NULL) {
4505                 return NULL;
4506         }
4507
4508         return &h->header;
4509 }
4510
4511
4512 struct ctdb_client_control_state *
4513 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4514 {
4515         struct ctdb_client_control_state *handle;
4516         struct ctdb_marshall_buffer *m;
4517         struct ctdb_rec_data *rec;
4518         TDB_DATA outdata;
4519
4520         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4521         if (m == NULL) {
4522                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4523                 return NULL;
4524         }
4525
4526         m->db_id = ctdb_db->db_id;
4527
4528         rec = ctdb_marshall_record(m, 0, key, header, data);
4529         if (rec == NULL) {
4530                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4531                 talloc_free(m);
4532                 return NULL;
4533         }
4534         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4535         if (m == NULL) {
4536                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4537                 talloc_free(m);
4538                 return NULL;
4539         }
4540         m->count++;
4541         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4542
4543
4544         outdata.dptr = (uint8_t *)m;
4545         outdata.dsize = talloc_get_size(m);
4546
4547         handle = ctdb_control_send(ctdb, destnode, 0, 
4548                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4549                            mem_ctx, &timeout, NULL);
4550         talloc_free(m);
4551         return handle;
4552 }
4553
4554 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4555 {
4556         int ret;
4557         int32_t res;
4558
4559         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4560         if ( (ret != 0) || (res != 0) ){
4561                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4562                 return -1;
4563         }
4564
4565         return 0;
4566 }
4567
4568 int
4569 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4570 {
4571         struct ctdb_client_control_state *state;
4572
4573         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4574         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4575 }
4576