fix zero-initialization of header in _ctdbd_allocate_pkt to the correct size
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         hdr->length       = length;
58         hdr->operation    = operation;
59         hdr->ctdb_magic   = CTDB_MAGIC;
60         hdr->ctdb_version = CTDB_VERSION;
61         hdr->srcnode      = ctdb->pnn;
62         if (ctdb->vnn_map) {
63                 hdr->generation = ctdb->vnn_map->generation;
64         }
65
66         return hdr;
67 }
68
69 /*
70   local version of ctdb_call
71 */
72 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
73                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
74                     TDB_DATA *data, bool updatetdb)
75 {
76         struct ctdb_call_info *c;
77         struct ctdb_registered_call *fn;
78         struct ctdb_context *ctdb = ctdb_db->ctdb;
79         
80         c = talloc(ctdb, struct ctdb_call_info);
81         CTDB_NO_MEMORY(ctdb, c);
82
83         c->key = call->key;
84         c->call_data = &call->call_data;
85         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
86         c->record_data.dsize = data->dsize;
87         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
88         c->new_data = NULL;
89         c->reply_data = NULL;
90         c->status = 0;
91         c->header = header;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         /* we need to force the record to be written out if this was a remote access */
109         if (c->new_data == NULL) {
110                 c->new_data = &c->record_data;
111         }
112
113         if (c->new_data && updatetdb) {
114                 /* XXX check that we always have the lock here? */
115                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
117                         talloc_free(c);
118                         return -1;
119                 }
120         }
121
122         if (c->reply_data) {
123                 call->reply_data = *c->reply_data;
124
125                 talloc_steal(call, call->reply_data.dptr);
126                 talloc_set_name_const(call->reply_data.dptr, __location__);
127         } else {
128                 call->reply_data.dptr = NULL;
129                 call->reply_data.dsize = 0;
130         }
131         call->status = c->status;
132
133         talloc_free(c);
134
135         return 0;
136 }
137
138
139 /*
140   queue a packet for sending from client to daemon
141 */
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
143 {
144         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
145 }
146
147
148 /*
149   called when a CTDB_REPLY_CALL packet comes in in the client
150
151   This packet comes in response to a CTDB_REQ_CALL request packet. It
152   contains any reply data from the call
153 */
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
155 {
156         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157         struct ctdb_client_call_state *state;
158
159         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
160         if (state == NULL) {
161                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
162                 return;
163         }
164
165         if (hdr->reqid != state->reqid) {
166                 /* we found a record  but it was the wrong one */
167                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
168                 return;
169         }
170
171         state->call->reply_data.dptr = c->data;
172         state->call->reply_data.dsize = c->datalen;
173         state->call->status = c->status;
174
175         talloc_steal(state, c);
176
177         state->state = CTDB_CALL_DONE;
178
179         if (state->async.fn) {
180                 state->async.fn(state);
181         }
182 }
183
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
185
186 /*
187   this is called in the client, when data comes in from the daemon
188  */
189 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
190 {
191         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
193         TALLOC_CTX *tmp_ctx;
194
195         /* place the packet as a child of a tmp_ctx. We then use
196            talloc_free() below to free it. If any of the calls want
197            to keep it, then they will steal it somewhere else, and the
198            talloc_free() will be a no-op */
199         tmp_ctx = talloc_new(ctdb);
200         talloc_steal(tmp_ctx, hdr);
201
202         if (cnt == 0) {
203                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
204                 exit(0);
205         }
206
207         if (cnt < sizeof(*hdr)) {
208                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
209                 goto done;
210         }
211         if (cnt != hdr->length) {
212                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
213                                (unsigned)hdr->length, (unsigned)cnt);
214                 goto done;
215         }
216
217         if (hdr->ctdb_magic != CTDB_MAGIC) {
218                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
219                 goto done;
220         }
221
222         if (hdr->ctdb_version != CTDB_VERSION) {
223                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
224                 goto done;
225         }
226
227         switch (hdr->operation) {
228         case CTDB_REPLY_CALL:
229                 ctdb_client_reply_call(ctdb, hdr);
230                 break;
231
232         case CTDB_REQ_MESSAGE:
233                 ctdb_request_message(ctdb, hdr);
234                 break;
235
236         case CTDB_REPLY_CONTROL:
237                 ctdb_client_reply_control(ctdb, hdr);
238                 break;
239
240         default:
241                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
242         }
243
244 done:
245         talloc_free(tmp_ctx);
246 }
247
248 /*
249   connect with exponential backoff, thanks Stevens
250 */
251 #define CONNECT_MAXSLEEP 64
252 static int ctdb_connect_retry(struct ctdb_context *ctdb)
253 {
254         struct sockaddr_un addr;
255         int secs;
256         int ret = 0;
257
258         memset(&addr, 0, sizeof(addr));
259         addr.sun_family = AF_UNIX;
260         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
261
262         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
263                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
264                               sizeof(addr));
265                 if ((ret == 0) || (errno != EAGAIN)) {
266                         break;
267                 }
268
269                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
270                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
271                                          strerror(errno), secs));
272                         sleep(secs);
273                 }
274         }
275
276         return ret;
277 }
278
279 /*
280   connect to a unix domain socket
281 */
282 int ctdb_socket_connect(struct ctdb_context *ctdb)
283 {
284         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285         if (ctdb->daemon.sd == -1) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
287                 return -1;
288         }
289
290         set_nonblocking(ctdb->daemon.sd);
291         set_close_on_exec(ctdb->daemon.sd);
292
293         if (ctdb_connect_retry(ctdb) == -1) {
294                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
295                 close(ctdb->daemon.sd);
296                 ctdb->daemon.sd = -1;
297                 return -1;
298         }
299
300         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
301                                               CTDB_DS_ALIGNMENT, 
302                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
303         return 0;
304 }
305
306
307 struct ctdb_record_handle {
308         struct ctdb_db_context *ctdb_db;
309         TDB_DATA key;
310         TDB_DATA *data;
311         struct ctdb_ltdb_header header;
312 };
313
314
315 /*
316   make a recv call to the local ctdb daemon - called from client context
317
318   This is called when the program wants to wait for a ctdb_call to complete and get the 
319   results. This call will block unless the call has already completed.
320 */
321 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
322 {
323         if (state == NULL) {
324                 return -1;
325         }
326
327         while (state->state < CTDB_CALL_DONE) {
328                 event_loop_once(state->ctdb_db->ctdb->ev);
329         }
330         if (state->state != CTDB_CALL_DONE) {
331                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
332                 talloc_free(state);
333                 return -1;
334         }
335
336         if (state->call->reply_data.dsize) {
337                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
338                                                       state->call->reply_data.dptr,
339                                                       state->call->reply_data.dsize);
340                 call->reply_data.dsize = state->call->reply_data.dsize;
341         } else {
342                 call->reply_data.dptr = NULL;
343                 call->reply_data.dsize = 0;
344         }
345         call->status = state->call->status;
346         talloc_free(state);
347
348         return call->status;
349 }
350
351
352
353
354 /*
355   destroy a ctdb_call in client
356 */
357 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
358 {
359         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
360         return 0;
361 }
362
363 /*
364   construct an event driven local ctdb_call
365
366   this is used so that locally processed ctdb_call requests are processed
367   in an event driven manner
368 */
369 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
370                                                                   struct ctdb_call *call,
371                                                                   struct ctdb_ltdb_header *header,
372                                                                   TDB_DATA *data)
373 {
374         struct ctdb_client_call_state *state;
375         struct ctdb_context *ctdb = ctdb_db->ctdb;
376         int ret;
377
378         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
379         CTDB_NO_MEMORY_NULL(ctdb, state);
380         state->call = talloc_zero(state, struct ctdb_call);
381         CTDB_NO_MEMORY_NULL(ctdb, state->call);
382
383         talloc_steal(state, data->dptr);
384
385         state->state   = CTDB_CALL_DONE;
386         *(state->call) = *call;
387         state->ctdb_db = ctdb_db;
388
389         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
390         if (ret != 0) {
391                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
392         }
393
394         return state;
395 }
396
397 /*
398   make a ctdb call to the local daemon - async send. Called from client context.
399
400   This constructs a ctdb_call request and queues it for processing. 
401   This call never blocks.
402 */
403 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
404                                               struct ctdb_call *call)
405 {
406         struct ctdb_client_call_state *state;
407         struct ctdb_context *ctdb = ctdb_db->ctdb;
408         struct ctdb_ltdb_header header;
409         TDB_DATA data;
410         int ret;
411         size_t len;
412         struct ctdb_req_call *c;
413
414         /* if the domain socket is not yet open, open it */
415         if (ctdb->daemon.sd==-1) {
416                 ctdb_socket_connect(ctdb);
417         }
418
419         ret = ctdb_ltdb_lock(ctdb_db, call->key);
420         if (ret != 0) {
421                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
422                 return NULL;
423         }
424
425         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
426
427         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
428                 ret = -1;
429         }
430
431         if (ret == 0 && header.dmaster == ctdb->pnn) {
432                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
433                 talloc_free(data.dptr);
434                 ctdb_ltdb_unlock(ctdb_db, call->key);
435                 return state;
436         }
437
438         ctdb_ltdb_unlock(ctdb_db, call->key);
439         talloc_free(data.dptr);
440
441         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
442         if (state == NULL) {
443                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
444                 return NULL;
445         }
446         state->call = talloc_zero(state, struct ctdb_call);
447         if (state->call == NULL) {
448                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
449                 return NULL;
450         }
451
452         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
453         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
454         if (c == NULL) {
455                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
456                 return NULL;
457         }
458
459         state->reqid     = ctdb_reqid_new(ctdb, state);
460         state->ctdb_db = ctdb_db;
461         talloc_set_destructor(state, ctdb_client_call_destructor);
462
463         c->hdr.reqid     = state->reqid;
464         c->flags         = call->flags;
465         c->db_id         = ctdb_db->db_id;
466         c->callid        = call->call_id;
467         c->hopcount      = 0;
468         c->keylen        = call->key.dsize;
469         c->calldatalen   = call->call_data.dsize;
470         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
471         memcpy(&c->data[call->key.dsize], 
472                call->call_data.dptr, call->call_data.dsize);
473         *(state->call)              = *call;
474         state->call->call_data.dptr = &c->data[call->key.dsize];
475         state->call->key.dptr       = &c->data[0];
476
477         state->state  = CTDB_CALL_WAIT;
478
479
480         ctdb_client_queue_pkt(ctdb, &c->hdr);
481
482         return state;
483 }
484
485
486 /*
487   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
488 */
489 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
490 {
491         struct ctdb_client_call_state *state;
492
493         state = ctdb_call_send(ctdb_db, call);
494         return ctdb_call_recv(state, call);
495 }
496
497
498 /*
499   tell the daemon what messaging srvid we will use, and register the message
500   handler function in the client
501 */
502 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
503                              ctdb_msg_fn_t handler,
504                              void *private_data)
505                                     
506 {
507         int res;
508         int32_t status;
509         
510         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
511                            tdb_null, NULL, NULL, &status, NULL, NULL);
512         if (res != 0 || status != 0) {
513                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
514                 return -1;
515         }
516
517         /* also need to register the handler with our own ctdb structure */
518         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
519 }
520
521 /*
522   tell the daemon we no longer want a srvid
523 */
524 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
525 {
526         int res;
527         int32_t status;
528         
529         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
530                            tdb_null, NULL, NULL, &status, NULL, NULL);
531         if (res != 0 || status != 0) {
532                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
533                 return -1;
534         }
535
536         /* also need to register the handler with our own ctdb structure */
537         ctdb_deregister_message_handler(ctdb, srvid, private_data);
538         return 0;
539 }
540
541
542 /*
543   send a message - from client context
544  */
545 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
546                       uint64_t srvid, TDB_DATA data)
547 {
548         struct ctdb_req_message *r;
549         int len, res;
550
551         len = offsetof(struct ctdb_req_message, data) + data.dsize;
552         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
553                                len, struct ctdb_req_message);
554         CTDB_NO_MEMORY(ctdb, r);
555
556         r->hdr.destnode  = pnn;
557         r->srvid         = srvid;
558         r->datalen       = data.dsize;
559         memcpy(&r->data[0], data.dptr, data.dsize);
560         
561         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
562         if (res != 0) {
563                 return res;
564         }
565
566         talloc_free(r);
567         return 0;
568 }
569
570
571 /*
572   cancel a ctdb_fetch_lock operation, releasing the lock
573  */
574 static int fetch_lock_destructor(struct ctdb_record_handle *h)
575 {
576         ctdb_ltdb_unlock(h->ctdb_db, h->key);
577         return 0;
578 }
579
580 /*
581   force the migration of a record to this node
582  */
583 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
584 {
585         struct ctdb_call call;
586         ZERO_STRUCT(call);
587         call.call_id = CTDB_NULL_FUNC;
588         call.key = key;
589         call.flags = CTDB_IMMEDIATE_MIGRATION;
590         return ctdb_call(ctdb_db, &call);
591 }
592
593 /*
594   try to fetch a readonly copy of a record
595  */
596 static int
597 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
598 {
599         int ret;
600
601         struct ctdb_call call;
602         ZERO_STRUCT(call);
603
604         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
605         call.call_data.dptr = NULL;
606         call.call_data.dsize = 0;
607         call.key = key;
608         call.flags = CTDB_WANT_READONLY;
609         ret = ctdb_call(ctdb_db, &call);
610
611         if (ret != 0) {
612                 return -1;
613         }
614         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
615                 return -1;
616         }
617
618         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
619         if (*hdr == NULL) {
620                 talloc_free(call.reply_data.dptr);
621                 return -1;
622         }
623
624         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
625         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
626         if (data->dptr == NULL) {
627                 talloc_free(call.reply_data.dptr);
628                 talloc_free(hdr);
629                 return -1;
630         }
631
632         return 0;
633 }
634
635 /*
636   get a lock on a record, and return the records data. Blocks until it gets the lock
637  */
638 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
639                                            TDB_DATA key, TDB_DATA *data)
640 {
641         int ret;
642         struct ctdb_record_handle *h;
643
644         /*
645           procedure is as follows:
646
647           1) get the chain lock. 
648           2) check if we are dmaster
649           3) if we are the dmaster then return handle 
650           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
651              reply from ctdbd
652           5) when we get the reply, goto (1)
653          */
654
655         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
656         if (h == NULL) {
657                 return NULL;
658         }
659
660         h->ctdb_db = ctdb_db;
661         h->key     = key;
662         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
663         if (h->key.dptr == NULL) {
664                 talloc_free(h);
665                 return NULL;
666         }
667         h->data    = data;
668
669         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
670                  (const char *)key.dptr));
671
672 again:
673         /* step 1 - get the chain lock */
674         ret = ctdb_ltdb_lock(ctdb_db, key);
675         if (ret != 0) {
676                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
677                 talloc_free(h);
678                 return NULL;
679         }
680
681         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
682
683         talloc_set_destructor(h, fetch_lock_destructor);
684
685         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
686
687         /* when torturing, ensure we test the remote path */
688         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
689             random() % 5 == 0) {
690                 h->header.dmaster = (uint32_t)-1;
691         }
692
693
694         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
695
696         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
697                 ctdb_ltdb_unlock(ctdb_db, key);
698                 ret = ctdb_client_force_migration(ctdb_db, key);
699                 if (ret != 0) {
700                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
701                         talloc_free(h);
702                         return NULL;
703                 }
704                 goto again;
705         }
706
707         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
708         return h;
709 }
710
711 /*
712   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
713  */
714 struct ctdb_record_handle *
715 ctdb_fetch_readonly_lock(
716         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
717         TDB_DATA key, TDB_DATA *data,
718         int read_only)
719 {
720         int ret;
721         struct ctdb_record_handle *h;
722         struct ctdb_ltdb_header *roheader = NULL;
723
724         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
725         if (h == NULL) {
726                 return NULL;
727         }
728
729         h->ctdb_db = ctdb_db;
730         h->key     = key;
731         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
732         if (h->key.dptr == NULL) {
733                 talloc_free(h);
734                 return NULL;
735         }
736         h->data    = data;
737
738         data->dptr = NULL;
739         data->dsize = 0;
740
741
742 again:
743         talloc_free(roheader);
744         roheader = NULL;
745
746         talloc_free(data->dptr);
747         data->dptr = NULL;
748         data->dsize = 0;
749
750         /* Lock the record/chain */
751         ret = ctdb_ltdb_lock(ctdb_db, key);
752         if (ret != 0) {
753                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
754                 talloc_free(h);
755                 return NULL;
756         }
757
758         talloc_set_destructor(h, fetch_lock_destructor);
759
760         /* Check if record exists yet in the TDB */
761         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
762         if (ret != 0) {
763                 ctdb_ltdb_unlock(ctdb_db, key);
764                 ret = ctdb_client_force_migration(ctdb_db, key);
765                 if (ret != 0) {
766                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
767                         talloc_free(h);
768                         return NULL;
769                 }
770                 goto again;
771         }
772
773         /* if this is a request for read/write and we have delegations
774            we have to revoke all delegations first
775         */
776         if ((read_only == 0) 
777         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
778         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
779                 ctdb_ltdb_unlock(ctdb_db, key);
780                 ret = ctdb_client_force_migration(ctdb_db, key);
781                 if (ret != 0) {
782                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
783                         talloc_free(h);
784                         return NULL;
785                 }
786                 goto again;
787         }
788
789         /* if we are dmaster, just return the handle */
790         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
791                 return h;
792         }
793
794         if (read_only != 0) {
795                 TDB_DATA rodata = {NULL, 0};
796
797                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
798                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
799                         return h;
800                 }
801
802                 ctdb_ltdb_unlock(ctdb_db, key);
803                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
804                 if (ret != 0) {
805                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
806                         ret = ctdb_client_force_migration(ctdb_db, key);
807                         if (ret != 0) {
808                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
809                                 talloc_free(h);
810                                 return NULL;
811                         }
812
813                         goto again;
814                 }
815
816                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
817                         ret = ctdb_client_force_migration(ctdb_db, key);
818                         if (ret != 0) {
819                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
820                                 talloc_free(h);
821                                 return NULL;
822                         }
823
824                         goto again;
825                 }
826
827                 ret = ctdb_ltdb_lock(ctdb_db, key);
828                 if (ret != 0) {
829                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
830                         talloc_free(h);
831                         return NULL;
832                 }
833
834                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
835                 if (ret != 0) {
836                         ctdb_ltdb_unlock(ctdb_db, key);
837
838                         ret = ctdb_client_force_migration(ctdb_db, key);
839                         if (ret != 0) {
840                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
841                                 talloc_free(h);
842                                 return NULL;
843                         }
844
845                         goto again;
846                 }
847
848                 return h;
849         }
850
851         /* we are not dmaster and this was not a request for a readonly lock
852          * so unlock the record, migrate it and try again
853          */
854         ctdb_ltdb_unlock(ctdb_db, key);
855         ret = ctdb_client_force_migration(ctdb_db, key);
856         if (ret != 0) {
857                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
858                 talloc_free(h);
859                 return NULL;
860         }
861         goto again;
862 }
863
864 /*
865   store some data to the record that was locked with ctdb_fetch_lock()
866 */
867 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
868 {
869         if (h->ctdb_db->persistent) {
870                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
871                 return -1;
872         }
873
874         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
875 }
876
877 /*
878   non-locking fetch of a record
879  */
880 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
881                TDB_DATA key, TDB_DATA *data)
882 {
883         struct ctdb_call call;
884         int ret;
885
886         call.call_id = CTDB_FETCH_FUNC;
887         call.call_data.dptr = NULL;
888         call.call_data.dsize = 0;
889         call.key = key;
890
891         ret = ctdb_call(ctdb_db, &call);
892
893         if (ret == 0) {
894                 *data = call.reply_data;
895                 talloc_steal(mem_ctx, data->dptr);
896         }
897
898         return ret;
899 }
900
901
902
903 /*
904    called when a control completes or timesout to invoke the callback
905    function the user provided
906 */
907 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
908         struct timeval t, void *private_data)
909 {
910         struct ctdb_client_control_state *state;
911         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
912         int ret;
913
914         state = talloc_get_type(private_data, struct ctdb_client_control_state);
915         talloc_steal(tmp_ctx, state);
916
917         ret = ctdb_control_recv(state->ctdb, state, state,
918                         NULL, 
919                         NULL, 
920                         NULL);
921         if (ret != 0) {
922                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
923         }
924
925         talloc_free(tmp_ctx);
926 }
927
928 /*
929   called when a CTDB_REPLY_CONTROL packet comes in in the client
930
931   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
932   contains any reply data from the control
933 */
934 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
935                                       struct ctdb_req_header *hdr)
936 {
937         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
938         struct ctdb_client_control_state *state;
939
940         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
941         if (state == NULL) {
942                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
943                 return;
944         }
945
946         if (hdr->reqid != state->reqid) {
947                 /* we found a record  but it was the wrong one */
948                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
949                 return;
950         }
951
952         state->outdata.dptr = c->data;
953         state->outdata.dsize = c->datalen;
954         state->status = c->status;
955         if (c->errorlen) {
956                 state->errormsg = talloc_strndup(state, 
957                                                  (char *)&c->data[c->datalen], 
958                                                  c->errorlen);
959         }
960
961         /* state->outdata now uses resources from c so we dont want c
962            to just dissappear from under us while state is still alive
963         */
964         talloc_steal(state, c);
965
966         state->state = CTDB_CONTROL_DONE;
967
968         /* if we had a callback registered for this control, pull the response
969            and call the callback.
970         */
971         if (state->async.fn) {
972                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
973         }
974 }
975
976
977 /*
978   destroy a ctdb_control in client
979 */
980 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
981 {
982         ctdb_reqid_remove(state->ctdb, state->reqid);
983         return 0;
984 }
985
986
987 /* time out handler for ctdb_control */
988 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
989         struct timeval t, void *private_data)
990 {
991         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
992
993         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
994                          "dstnode:%u\n", state->reqid, state->c->opcode,
995                          state->c->hdr.destnode));
996
997         state->state = CTDB_CONTROL_TIMEOUT;
998
999         /* if we had a callback registered for this control, pull the response
1000            and call the callback.
1001         */
1002         if (state->async.fn) {
1003                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1004         }
1005 }
1006
1007 /* async version of send control request */
1008 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1009                 uint32_t destnode, uint64_t srvid, 
1010                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1011                 TALLOC_CTX *mem_ctx,
1012                 struct timeval *timeout,
1013                 char **errormsg)
1014 {
1015         struct ctdb_client_control_state *state;
1016         size_t len;
1017         struct ctdb_req_control *c;
1018         int ret;
1019
1020         if (errormsg) {
1021                 *errormsg = NULL;
1022         }
1023
1024         /* if the domain socket is not yet open, open it */
1025         if (ctdb->daemon.sd==-1) {
1026                 ctdb_socket_connect(ctdb);
1027         }
1028
1029         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1030         CTDB_NO_MEMORY_NULL(ctdb, state);
1031
1032         state->ctdb       = ctdb;
1033         state->reqid      = ctdb_reqid_new(ctdb, state);
1034         state->state      = CTDB_CONTROL_WAIT;
1035         state->errormsg   = NULL;
1036
1037         talloc_set_destructor(state, ctdb_client_control_destructor);
1038
1039         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1040         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1041                                len, struct ctdb_req_control);
1042         state->c            = c;        
1043         CTDB_NO_MEMORY_NULL(ctdb, c);
1044         c->hdr.reqid        = state->reqid;
1045         c->hdr.destnode     = destnode;
1046         c->opcode           = opcode;
1047         c->client_id        = 0;
1048         c->flags            = flags;
1049         c->srvid            = srvid;
1050         c->datalen          = data.dsize;
1051         if (data.dsize) {
1052                 memcpy(&c->data[0], data.dptr, data.dsize);
1053         }
1054
1055         /* timeout */
1056         if (timeout && !timeval_is_zero(timeout)) {
1057                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1058         }
1059
1060         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1061         if (ret != 0) {
1062                 talloc_free(state);
1063                 return NULL;
1064         }
1065
1066         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1067                 talloc_free(state);
1068                 return NULL;
1069         }
1070
1071         return state;
1072 }
1073
1074
1075 /* async version of receive control reply */
1076 int ctdb_control_recv(struct ctdb_context *ctdb, 
1077                 struct ctdb_client_control_state *state, 
1078                 TALLOC_CTX *mem_ctx,
1079                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1080 {
1081         TALLOC_CTX *tmp_ctx;
1082
1083         if (status != NULL) {
1084                 *status = -1;
1085         }
1086         if (errormsg != NULL) {
1087                 *errormsg = NULL;
1088         }
1089
1090         if (state == NULL) {
1091                 return -1;
1092         }
1093
1094         /* prevent double free of state */
1095         tmp_ctx = talloc_new(ctdb);
1096         talloc_steal(tmp_ctx, state);
1097
1098         /* loop one event at a time until we either timeout or the control
1099            completes.
1100         */
1101         while (state->state == CTDB_CONTROL_WAIT) {
1102                 event_loop_once(ctdb->ev);
1103         }
1104
1105         if (state->state != CTDB_CONTROL_DONE) {
1106                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1107                 if (state->async.fn) {
1108                         state->async.fn(state);
1109                 }
1110                 talloc_free(tmp_ctx);
1111                 return -1;
1112         }
1113
1114         if (state->errormsg) {
1115                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1116                 if (errormsg) {
1117                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1118                 }
1119                 if (state->async.fn) {
1120                         state->async.fn(state);
1121                 }
1122                 talloc_free(tmp_ctx);
1123                 return -1;
1124         }
1125
1126         if (outdata) {
1127                 *outdata = state->outdata;
1128                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1129         }
1130
1131         if (status) {
1132                 *status = state->status;
1133         }
1134
1135         if (state->async.fn) {
1136                 state->async.fn(state);
1137         }
1138
1139         talloc_free(tmp_ctx);
1140         return 0;
1141 }
1142
1143
1144
1145 /*
1146   send a ctdb control message
1147   timeout specifies how long we should wait for a reply.
1148   if timeout is NULL we wait indefinitely
1149  */
1150 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1151                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1152                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1153                  struct timeval *timeout,
1154                  char **errormsg)
1155 {
1156         struct ctdb_client_control_state *state;
1157
1158         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1159                         flags, data, mem_ctx,
1160                         timeout, errormsg);
1161         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1162                         errormsg);
1163 }
1164
1165
1166
1167
1168 /*
1169   a process exists call. Returns 0 if process exists, -1 otherwise
1170  */
1171 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1172 {
1173         int ret;
1174         TDB_DATA data;
1175         int32_t status;
1176
1177         data.dptr = (uint8_t*)&pid;
1178         data.dsize = sizeof(pid);
1179
1180         ret = ctdb_control(ctdb, destnode, 0, 
1181                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1182                            NULL, NULL, &status, NULL, NULL);
1183         if (ret != 0) {
1184                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1185                 return -1;
1186         }
1187
1188         return status;
1189 }
1190
1191 /*
1192   get remote statistics
1193  */
1194 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1195 {
1196         int ret;
1197         TDB_DATA data;
1198         int32_t res;
1199
1200         ret = ctdb_control(ctdb, destnode, 0, 
1201                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1202                            ctdb, &data, &res, NULL, NULL);
1203         if (ret != 0 || res != 0) {
1204                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1205                 return -1;
1206         }
1207
1208         if (data.dsize != sizeof(struct ctdb_statistics)) {
1209                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1210                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1211                       return -1;
1212         }
1213
1214         *status = *(struct ctdb_statistics *)data.dptr;
1215         talloc_free(data.dptr);
1216                         
1217         return 0;
1218 }
1219
1220 /*
1221   shutdown a remote ctdb node
1222  */
1223 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1224 {
1225         struct ctdb_client_control_state *state;
1226
1227         state = ctdb_control_send(ctdb, destnode, 0, 
1228                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1229                            NULL, &timeout, NULL);
1230         if (state == NULL) {
1231                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1232                 return -1;
1233         }
1234
1235         return 0;
1236 }
1237
1238 /*
1239   get vnn map from a remote node
1240  */
1241 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1242 {
1243         int ret;
1244         TDB_DATA outdata;
1245         int32_t res;
1246         struct ctdb_vnn_map_wire *map;
1247
1248         ret = ctdb_control(ctdb, destnode, 0, 
1249                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1250                            mem_ctx, &outdata, &res, &timeout, NULL);
1251         if (ret != 0 || res != 0) {
1252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1253                 return -1;
1254         }
1255         
1256         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1257         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1258             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1259                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1260                 return -1;
1261         }
1262
1263         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1264         CTDB_NO_MEMORY(ctdb, *vnnmap);
1265         (*vnnmap)->generation = map->generation;
1266         (*vnnmap)->size       = map->size;
1267         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1268
1269         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1270         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1271         talloc_free(outdata.dptr);
1272                     
1273         return 0;
1274 }
1275
1276
1277 /*
1278   get the recovery mode of a remote node
1279  */
1280 struct ctdb_client_control_state *
1281 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1282 {
1283         return ctdb_control_send(ctdb, destnode, 0, 
1284                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1285                            mem_ctx, &timeout, NULL);
1286 }
1287
1288 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1289 {
1290         int ret;
1291         int32_t res;
1292
1293         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1296                 return -1;
1297         }
1298
1299         if (recmode) {
1300                 *recmode = (uint32_t)res;
1301         }
1302
1303         return 0;
1304 }
1305
1306 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1307 {
1308         struct ctdb_client_control_state *state;
1309
1310         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1311         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1312 }
1313
1314
1315
1316
1317 /*
1318   set the recovery mode of a remote node
1319  */
1320 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1321 {
1322         int ret;
1323         TDB_DATA data;
1324         int32_t res;
1325
1326         data.dsize = sizeof(uint32_t);
1327         data.dptr = (unsigned char *)&recmode;
1328
1329         ret = ctdb_control(ctdb, destnode, 0, 
1330                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1331                            NULL, NULL, &res, &timeout, NULL);
1332         if (ret != 0 || res != 0) {
1333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1334                 return -1;
1335         }
1336
1337         return 0;
1338 }
1339
1340
1341
1342 /*
1343   get the recovery master of a remote node
1344  */
1345 struct ctdb_client_control_state *
1346 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1347                         struct timeval timeout, uint32_t destnode)
1348 {
1349         return ctdb_control_send(ctdb, destnode, 0, 
1350                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1351                            mem_ctx, &timeout, NULL);
1352 }
1353
1354 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1355 {
1356         int ret;
1357         int32_t res;
1358
1359         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1360         if (ret != 0) {
1361                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1362                 return -1;
1363         }
1364
1365         if (recmaster) {
1366                 *recmaster = (uint32_t)res;
1367         }
1368
1369         return 0;
1370 }
1371
1372 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1373 {
1374         struct ctdb_client_control_state *state;
1375
1376         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1377         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1378 }
1379
1380
1381 /*
1382   set the recovery master of a remote node
1383  */
1384 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1385 {
1386         int ret;
1387         TDB_DATA data;
1388         int32_t res;
1389
1390         ZERO_STRUCT(data);
1391         data.dsize = sizeof(uint32_t);
1392         data.dptr = (unsigned char *)&recmaster;
1393
1394         ret = ctdb_control(ctdb, destnode, 0, 
1395                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1396                            NULL, NULL, &res, &timeout, NULL);
1397         if (ret != 0 || res != 0) {
1398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1399                 return -1;
1400         }
1401
1402         return 0;
1403 }
1404
1405
1406 /*
1407   get a list of databases off a remote node
1408  */
1409 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1410                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1411 {
1412         int ret;
1413         TDB_DATA outdata;
1414         int32_t res;
1415
1416         ret = ctdb_control(ctdb, destnode, 0, 
1417                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1418                            mem_ctx, &outdata, &res, &timeout, NULL);
1419         if (ret != 0 || res != 0) {
1420                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1421                 return -1;
1422         }
1423
1424         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1425         talloc_free(outdata.dptr);
1426                     
1427         return 0;
1428 }
1429
1430 /*
1431   get a list of nodes (vnn and flags ) from a remote node
1432  */
1433 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1434                 struct timeval timeout, uint32_t destnode, 
1435                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1436 {
1437         int ret;
1438         TDB_DATA outdata;
1439         int32_t res;
1440
1441         ret = ctdb_control(ctdb, destnode, 0, 
1442                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1443                            mem_ctx, &outdata, &res, &timeout, NULL);
1444         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1446                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1447         }
1448         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1450                 return -1;
1451         }
1452
1453         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1454         talloc_free(outdata.dptr);
1455                     
1456         return 0;
1457 }
1458
1459 /*
1460   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1461  */
1462 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1463                 struct timeval timeout, uint32_t destnode, 
1464                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1465 {
1466         int ret, i, len;
1467         TDB_DATA outdata;
1468         struct ctdb_node_mapv4 *nodemapv4;
1469         int32_t res;
1470
1471         ret = ctdb_control(ctdb, destnode, 0, 
1472                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1473                            mem_ctx, &outdata, &res, &timeout, NULL);
1474         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1475                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1476                 return -1;
1477         }
1478
1479         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1480
1481         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1482         (*nodemap) = talloc_zero_size(mem_ctx, len);
1483         CTDB_NO_MEMORY(ctdb, (*nodemap));
1484
1485         (*nodemap)->num = nodemapv4->num;
1486         for (i=0; i<nodemapv4->num; i++) {
1487                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1488                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1489                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1490                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1491         }
1492                 
1493         talloc_free(outdata.dptr);
1494                     
1495         return 0;
1496 }
1497
1498 /*
1499   drop the transport, reload the nodes file and restart the transport
1500  */
1501 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1502                     struct timeval timeout, uint32_t destnode)
1503 {
1504         int ret;
1505         int32_t res;
1506
1507         ret = ctdb_control(ctdb, destnode, 0, 
1508                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1509                            NULL, NULL, &res, &timeout, NULL);
1510         if (ret != 0 || res != 0) {
1511                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1512                 return -1;
1513         }
1514
1515         return 0;
1516 }
1517
1518
1519 /*
1520   set vnn map on a node
1521  */
1522 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1523                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1524 {
1525         int ret;
1526         TDB_DATA data;
1527         int32_t res;
1528         struct ctdb_vnn_map_wire *map;
1529         size_t len;
1530
1531         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1532         map = talloc_size(mem_ctx, len);
1533         CTDB_NO_MEMORY(ctdb, map);
1534
1535         map->generation = vnnmap->generation;
1536         map->size = vnnmap->size;
1537         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1538         
1539         data.dsize = len;
1540         data.dptr  = (uint8_t *)map;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1544                            NULL, NULL, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1547                 return -1;
1548         }
1549
1550         talloc_free(map);
1551
1552         return 0;
1553 }
1554
1555
1556 /*
1557   async send for pull database
1558  */
1559 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1560         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1561         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1562 {
1563         TDB_DATA indata;
1564         struct ctdb_control_pulldb *pull;
1565         struct ctdb_client_control_state *state;
1566
1567         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1568         CTDB_NO_MEMORY_NULL(ctdb, pull);
1569
1570         pull->db_id   = dbid;
1571         pull->lmaster = lmaster;
1572
1573         indata.dsize = sizeof(struct ctdb_control_pulldb);
1574         indata.dptr  = (unsigned char *)pull;
1575
1576         state = ctdb_control_send(ctdb, destnode, 0, 
1577                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1578                                   mem_ctx, &timeout, NULL);
1579         talloc_free(pull);
1580
1581         return state;
1582 }
1583
1584 /*
1585   async recv for pull database
1586  */
1587 int ctdb_ctrl_pulldb_recv(
1588         struct ctdb_context *ctdb, 
1589         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1590         TDB_DATA *outdata)
1591 {
1592         int ret;
1593         int32_t res;
1594
1595         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1596         if ( (ret != 0) || (res != 0) ){
1597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1598                 return -1;
1599         }
1600
1601         return 0;
1602 }
1603
1604 /*
1605   pull all keys and records for a specific database on a node
1606  */
1607 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1608                 uint32_t dbid, uint32_t lmaster, 
1609                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1610                 TDB_DATA *outdata)
1611 {
1612         struct ctdb_client_control_state *state;
1613
1614         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1615                                       timeout);
1616         
1617         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1618 }
1619
1620
1621 /*
1622   change dmaster for all keys in the database to the new value
1623  */
1624 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1625                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1626 {
1627         int ret;
1628         TDB_DATA indata;
1629         int32_t res;
1630
1631         indata.dsize = 2*sizeof(uint32_t);
1632         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1633
1634         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1635         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1636
1637         ret = ctdb_control(ctdb, destnode, 0, 
1638                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1639                            NULL, NULL, &res, &timeout, NULL);
1640         if (ret != 0 || res != 0) {
1641                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1642                 return -1;
1643         }
1644
1645         return 0;
1646 }
1647
1648 /*
1649   ping a node, return number of clients connected
1650  */
1651 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1652 {
1653         int ret;
1654         int32_t res;
1655
1656         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1657                            tdb_null, NULL, NULL, &res, NULL, NULL);
1658         if (ret != 0) {
1659                 return -1;
1660         }
1661         return res;
1662 }
1663
1664 /*
1665   find the real path to a ltdb 
1666  */
1667 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1668                    const char **path)
1669 {
1670         int ret;
1671         int32_t res;
1672         TDB_DATA data;
1673
1674         data.dptr = (uint8_t *)&dbid;
1675         data.dsize = sizeof(dbid);
1676
1677         ret = ctdb_control(ctdb, destnode, 0, 
1678                            CTDB_CONTROL_GETDBPATH, 0, data, 
1679                            mem_ctx, &data, &res, &timeout, NULL);
1680         if (ret != 0 || res != 0) {
1681                 return -1;
1682         }
1683
1684         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1685         if ((*path) == NULL) {
1686                 return -1;
1687         }
1688
1689         talloc_free(data.dptr);
1690
1691         return 0;
1692 }
1693
1694 /*
1695   find the name of a db 
1696  */
1697 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1698                    const char **name)
1699 {
1700         int ret;
1701         int32_t res;
1702         TDB_DATA data;
1703
1704         data.dptr = (uint8_t *)&dbid;
1705         data.dsize = sizeof(dbid);
1706
1707         ret = ctdb_control(ctdb, destnode, 0, 
1708                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1709                            mem_ctx, &data, &res, &timeout, NULL);
1710         if (ret != 0 || res != 0) {
1711                 return -1;
1712         }
1713
1714         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1715         if ((*name) == NULL) {
1716                 return -1;
1717         }
1718
1719         talloc_free(data.dptr);
1720
1721         return 0;
1722 }
1723
1724 /*
1725   get the health status of a db
1726  */
1727 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1728                           struct timeval timeout,
1729                           uint32_t destnode,
1730                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1731                           const char **reason)
1732 {
1733         int ret;
1734         int32_t res;
1735         TDB_DATA data;
1736
1737         data.dptr = (uint8_t *)&dbid;
1738         data.dsize = sizeof(dbid);
1739
1740         ret = ctdb_control(ctdb, destnode, 0,
1741                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1742                            mem_ctx, &data, &res, &timeout, NULL);
1743         if (ret != 0 || res != 0) {
1744                 return -1;
1745         }
1746
1747         if (data.dsize == 0) {
1748                 (*reason) = NULL;
1749                 return 0;
1750         }
1751
1752         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1753         if ((*reason) == NULL) {
1754                 return -1;
1755         }
1756
1757         talloc_free(data.dptr);
1758
1759         return 0;
1760 }
1761
1762 /*
1763   create a database
1764  */
1765 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1766                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1767 {
1768         int ret;
1769         int32_t res;
1770         TDB_DATA data;
1771
1772         data.dptr = discard_const(name);
1773         data.dsize = strlen(name)+1;
1774
1775         ret = ctdb_control(ctdb, destnode, 0, 
1776                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1777                            0, data, 
1778                            mem_ctx, &data, &res, &timeout, NULL);
1779
1780         if (ret != 0 || res != 0) {
1781                 return -1;
1782         }
1783
1784         return 0;
1785 }
1786
1787 /*
1788   get debug level on a node
1789  */
1790 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1791 {
1792         int ret;
1793         int32_t res;
1794         TDB_DATA data;
1795
1796         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1797                            ctdb, &data, &res, NULL, NULL);
1798         if (ret != 0 || res != 0) {
1799                 return -1;
1800         }
1801         if (data.dsize != sizeof(int32_t)) {
1802                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1803                          (unsigned)data.dsize));
1804                 return -1;
1805         }
1806         *level = *(int32_t *)data.dptr;
1807         talloc_free(data.dptr);
1808         return 0;
1809 }
1810
1811 /*
1812   set debug level on a node
1813  */
1814 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1815 {
1816         int ret;
1817         int32_t res;
1818         TDB_DATA data;
1819
1820         data.dptr = (uint8_t *)&level;
1821         data.dsize = sizeof(level);
1822
1823         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1824                            NULL, NULL, &res, NULL, NULL);
1825         if (ret != 0 || res != 0) {
1826                 return -1;
1827         }
1828         return 0;
1829 }
1830
1831
1832 /*
1833   get a list of connected nodes
1834  */
1835 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1836                                 struct timeval timeout,
1837                                 TALLOC_CTX *mem_ctx,
1838                                 uint32_t *num_nodes)
1839 {
1840         struct ctdb_node_map *map=NULL;
1841         int ret, i;
1842         uint32_t *nodes;
1843
1844         *num_nodes = 0;
1845
1846         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1847         if (ret != 0) {
1848                 return NULL;
1849         }
1850
1851         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1852         if (nodes == NULL) {
1853                 return NULL;
1854         }
1855
1856         for (i=0;i<map->num;i++) {
1857                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1858                         nodes[*num_nodes] = map->nodes[i].pnn;
1859                         (*num_nodes)++;
1860                 }
1861         }
1862
1863         return nodes;
1864 }
1865
1866
1867 /*
1868   reset remote status
1869  */
1870 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1871 {
1872         int ret;
1873         int32_t res;
1874
1875         ret = ctdb_control(ctdb, destnode, 0, 
1876                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1877                            NULL, NULL, &res, NULL, NULL);
1878         if (ret != 0 || res != 0) {
1879                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1880                 return -1;
1881         }
1882         return 0;
1883 }
1884
1885 /*
1886   attach to a specific database - client call
1887 */
1888 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1889                                     struct timeval timeout,
1890                                     const char *name,
1891                                     bool persistent,
1892                                     uint32_t tdb_flags)
1893 {
1894         struct ctdb_db_context *ctdb_db;
1895         TDB_DATA data;
1896         int ret;
1897         int32_t res;
1898
1899         ctdb_db = ctdb_db_handle(ctdb, name);
1900         if (ctdb_db) {
1901                 return ctdb_db;
1902         }
1903
1904         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1905         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1906
1907         ctdb_db->ctdb = ctdb;
1908         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1909         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1910
1911         data.dptr = discard_const(name);
1912         data.dsize = strlen(name)+1;
1913
1914         /* tell ctdb daemon to attach */
1915         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1916                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1917                            0, data, ctdb_db, &data, &res, NULL, NULL);
1918         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1919                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1920                 talloc_free(ctdb_db);
1921                 return NULL;
1922         }
1923         
1924         ctdb_db->db_id = *(uint32_t *)data.dptr;
1925         talloc_free(data.dptr);
1926
1927         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1928         if (ret != 0) {
1929                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1930                 talloc_free(ctdb_db);
1931                 return NULL;
1932         }
1933
1934         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1935         if (ctdb->valgrinding) {
1936                 tdb_flags |= TDB_NOMMAP;
1937         }
1938         tdb_flags |= TDB_DISALLOW_NESTING;
1939
1940         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1941         if (ctdb_db->ltdb == NULL) {
1942                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1943                 talloc_free(ctdb_db);
1944                 return NULL;
1945         }
1946
1947         ctdb_db->persistent = persistent;
1948
1949         DLIST_ADD(ctdb->db_list, ctdb_db);
1950
1951         /* add well known functions */
1952         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1953         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1954         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1955
1956         return ctdb_db;
1957 }
1958
1959
1960 /*
1961   setup a call for a database
1962  */
1963 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1964 {
1965         struct ctdb_registered_call *call;
1966
1967 #if 0
1968         TDB_DATA data;
1969         int32_t status;
1970         struct ctdb_control_set_call c;
1971         int ret;
1972
1973         /* this is no longer valid with the separate daemon architecture */
1974         c.db_id = ctdb_db->db_id;
1975         c.fn    = fn;
1976         c.id    = id;
1977
1978         data.dptr = (uint8_t *)&c;
1979         data.dsize = sizeof(c);
1980
1981         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1982                            data, NULL, NULL, &status, NULL, NULL);
1983         if (ret != 0 || status != 0) {
1984                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1985                 return -1;
1986         }
1987 #endif
1988
1989         /* also register locally */
1990         call = talloc(ctdb_db, struct ctdb_registered_call);
1991         call->fn = fn;
1992         call->id = id;
1993
1994         DLIST_ADD(ctdb_db->calls, call);        
1995         return 0;
1996 }
1997
1998
1999 struct traverse_state {
2000         bool done;
2001         uint32_t count;
2002         ctdb_traverse_func fn;
2003         void *private_data;
2004         bool listemptyrecords;
2005 };
2006
2007 /*
2008   called on each key during a ctdb_traverse
2009  */
2010 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2011 {
2012         struct traverse_state *state = (struct traverse_state *)p;
2013         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2014         TDB_DATA key;
2015
2016         if (data.dsize < sizeof(uint32_t) ||
2017             d->length != data.dsize) {
2018                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2019                 state->done = True;
2020                 return;
2021         }
2022
2023         key.dsize = d->keylen;
2024         key.dptr  = &d->data[0];
2025         data.dsize = d->datalen;
2026         data.dptr = &d->data[d->keylen];
2027
2028         if (key.dsize == 0 && data.dsize == 0) {
2029                 /* end of traverse */
2030                 state->done = True;
2031                 return;
2032         }
2033
2034         if (!state->listemptyrecords &&
2035             data.dsize == sizeof(struct ctdb_ltdb_header))
2036         {
2037                 /* empty records are deleted records in ctdb */
2038                 return;
2039         }
2040
2041         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2042                 state->done = True;
2043         }
2044
2045         state->count++;
2046 }
2047
2048 /**
2049  * start a cluster wide traverse, calling the supplied fn on each record
2050  * return the number of records traversed, or -1 on error
2051  *
2052  * Extendet variant with a flag to signal whether empty records should
2053  * be listed.
2054  */
2055 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2056                              ctdb_traverse_func fn,
2057                              bool withemptyrecords,
2058                              void *private_data)
2059 {
2060         TDB_DATA data;
2061         struct ctdb_traverse_start_ext t;
2062         int32_t status;
2063         int ret;
2064         uint64_t srvid = (getpid() | 0xFLL<<60);
2065         struct traverse_state state;
2066
2067         state.done = False;
2068         state.count = 0;
2069         state.private_data = private_data;
2070         state.fn = fn;
2071         state.listemptyrecords = withemptyrecords;
2072
2073         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2074         if (ret != 0) {
2075                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2076                 return -1;
2077         }
2078
2079         t.db_id = ctdb_db->db_id;
2080         t.srvid = srvid;
2081         t.reqid = 0;
2082         t.withemptyrecords = withemptyrecords;
2083
2084         data.dptr = (uint8_t *)&t;
2085         data.dsize = sizeof(t);
2086
2087         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2088                            data, NULL, NULL, &status, NULL, NULL);
2089         if (ret != 0 || status != 0) {
2090                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2091                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2092                 return -1;
2093         }
2094
2095         while (!state.done) {
2096                 event_loop_once(ctdb_db->ctdb->ev);
2097         }
2098
2099         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2100         if (ret != 0) {
2101                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2102                 return -1;
2103         }
2104
2105         return state.count;
2106 }
2107
2108 /**
2109  * start a cluster wide traverse, calling the supplied fn on each record
2110  * return the number of records traversed, or -1 on error
2111  *
2112  * Standard version which does not list the empty records:
2113  * These are considered deleted.
2114  */
2115 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2116 {
2117         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2118 }
2119
2120 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2121 /*
2122   called on each key during a catdb
2123  */
2124 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2125 {
2126         int i;
2127         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2128         FILE *f = c->f;
2129         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2130
2131         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2132         for (i=0;i<key.dsize;i++) {
2133                 if (ISASCII(key.dptr[i])) {
2134                         fprintf(f, "%c", key.dptr[i]);
2135                 } else {
2136                         fprintf(f, "\\%02X", key.dptr[i]);
2137                 }
2138         }
2139         fprintf(f, "\"\n");
2140
2141         fprintf(f, "dmaster: %u\n", h->dmaster);
2142         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2143
2144         if (c->printlmaster && ctdb->vnn_map != NULL) {
2145                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2146         }
2147
2148         if (c->printhash) {
2149                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2150         }
2151
2152         if (c->printrecordflags) {
2153                 fprintf(f, "flags: 0x%08x", h->flags);
2154                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2155                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2156                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2157                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2158                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2159                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2160                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2161                 fprintf(f, "\n");
2162         }
2163
2164         if (c->printdatasize) {
2165                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2166         } else {
2167                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2168                 for (i=sizeof(*h);i<data.dsize;i++) {
2169                         if (ISASCII(data.dptr[i])) {
2170                                 fprintf(f, "%c", data.dptr[i]);
2171                         } else {
2172                                 fprintf(f, "\\%02X", data.dptr[i]);
2173                         }
2174                 }
2175                 fprintf(f, "\"\n");
2176         }
2177
2178         fprintf(f, "\n");
2179
2180         return 0;
2181 }
2182
2183 /*
2184   convenience function to list all keys to stdout
2185  */
2186 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2187                  struct ctdb_dump_db_context *ctx)
2188 {
2189         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2190                                  ctx->printemptyrecords, ctx);
2191 }
2192
2193 /*
2194   get the pid of a ctdb daemon
2195  */
2196 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2197 {
2198         int ret;
2199         int32_t res;
2200
2201         ret = ctdb_control(ctdb, destnode, 0, 
2202                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2203                            NULL, NULL, &res, &timeout, NULL);
2204         if (ret != 0) {
2205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2206                 return -1;
2207         }
2208
2209         *pid = res;
2210
2211         return 0;
2212 }
2213
2214
2215 /*
2216   async freeze send control
2217  */
2218 struct ctdb_client_control_state *
2219 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2220 {
2221         return ctdb_control_send(ctdb, destnode, priority, 
2222                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2223                            mem_ctx, &timeout, NULL);
2224 }
2225
2226 /* 
2227    async freeze recv control
2228 */
2229 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2230 {
2231         int ret;
2232         int32_t res;
2233
2234         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2235         if ( (ret != 0) || (res != 0) ){
2236                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2237                 return -1;
2238         }
2239
2240         return 0;
2241 }
2242
2243 /*
2244   freeze databases of a certain priority
2245  */
2246 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2247 {
2248         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2249         struct ctdb_client_control_state *state;
2250         int ret;
2251
2252         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2253         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2254         talloc_free(tmp_ctx);
2255
2256         return ret;
2257 }
2258
2259 /* Freeze all databases */
2260 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2261 {
2262         int i;
2263
2264         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2265                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2266                         return -1;
2267                 }
2268         }
2269         return 0;
2270 }
2271
2272 /*
2273   thaw databases of a certain priority
2274  */
2275 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2276 {
2277         int ret;
2278         int32_t res;
2279
2280         ret = ctdb_control(ctdb, destnode, priority, 
2281                            CTDB_CONTROL_THAW, 0, tdb_null, 
2282                            NULL, NULL, &res, &timeout, NULL);
2283         if (ret != 0 || res != 0) {
2284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2285                 return -1;
2286         }
2287
2288         return 0;
2289 }
2290
2291 /* thaw all databases */
2292 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2293 {
2294         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2295 }
2296
2297 /*
2298   get pnn of a node, or -1
2299  */
2300 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2301 {
2302         int ret;
2303         int32_t res;
2304
2305         ret = ctdb_control(ctdb, destnode, 0, 
2306                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2307                            NULL, NULL, &res, &timeout, NULL);
2308         if (ret != 0) {
2309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2310                 return -1;
2311         }
2312
2313         return res;
2314 }
2315
2316 /*
2317   get the monitoring mode of a remote node
2318  */
2319 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2320 {
2321         int ret;
2322         int32_t res;
2323
2324         ret = ctdb_control(ctdb, destnode, 0, 
2325                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2326                            NULL, NULL, &res, &timeout, NULL);
2327         if (ret != 0) {
2328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2329                 return -1;
2330         }
2331
2332         *monmode = res;
2333
2334         return 0;
2335 }
2336
2337
2338 /*
2339  set the monitoring mode of a remote node to active
2340  */
2341 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2342 {
2343         int ret;
2344         
2345
2346         ret = ctdb_control(ctdb, destnode, 0, 
2347                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2348                            NULL, NULL,NULL, &timeout, NULL);
2349         if (ret != 0) {
2350                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2351                 return -1;
2352         }
2353
2354         
2355
2356         return 0;
2357 }
2358
2359 /*
2360   set the monitoring mode of a remote node to disable
2361  */
2362 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2363 {
2364         int ret;
2365         
2366
2367         ret = ctdb_control(ctdb, destnode, 0, 
2368                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2369                            NULL, NULL, NULL, &timeout, NULL);
2370         if (ret != 0) {
2371                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2372                 return -1;
2373         }
2374
2375         
2376
2377         return 0;
2378 }
2379
2380
2381
2382 /* 
2383   sent to a node to make it take over an ip address
2384 */
2385 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2386                           uint32_t destnode, struct ctdb_public_ip *ip)
2387 {
2388         TDB_DATA data;
2389         struct ctdb_public_ipv4 ipv4;
2390         int ret;
2391         int32_t res;
2392
2393         if (ip->addr.sa.sa_family == AF_INET) {
2394                 ipv4.pnn = ip->pnn;
2395                 ipv4.sin = ip->addr.ip;
2396
2397                 data.dsize = sizeof(ipv4);
2398                 data.dptr  = (uint8_t *)&ipv4;
2399
2400                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2401                            NULL, &res, &timeout, NULL);
2402         } else {
2403                 data.dsize = sizeof(*ip);
2404                 data.dptr  = (uint8_t *)ip;
2405
2406                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2407                            NULL, &res, &timeout, NULL);
2408         }
2409
2410         if (ret != 0 || res != 0) {
2411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2412                 return -1;
2413         }
2414
2415         return 0;       
2416 }
2417
2418
2419 /* 
2420   sent to a node to make it release an ip address
2421 */
2422 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2423                          uint32_t destnode, struct ctdb_public_ip *ip)
2424 {
2425         TDB_DATA data;
2426         struct ctdb_public_ipv4 ipv4;
2427         int ret;
2428         int32_t res;
2429
2430         if (ip->addr.sa.sa_family == AF_INET) {
2431                 ipv4.pnn = ip->pnn;
2432                 ipv4.sin = ip->addr.ip;
2433
2434                 data.dsize = sizeof(ipv4);
2435                 data.dptr  = (uint8_t *)&ipv4;
2436
2437                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2438                                    NULL, &res, &timeout, NULL);
2439         } else {
2440                 data.dsize = sizeof(*ip);
2441                 data.dptr  = (uint8_t *)ip;
2442
2443                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2444                                    NULL, &res, &timeout, NULL);
2445         }
2446
2447         if (ret != 0 || res != 0) {
2448                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2449                 return -1;
2450         }
2451
2452         return 0;       
2453 }
2454
2455
2456 /*
2457   get a tunable
2458  */
2459 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2460                           struct timeval timeout, 
2461                           uint32_t destnode,
2462                           const char *name, uint32_t *value)
2463 {
2464         struct ctdb_control_get_tunable *t;
2465         TDB_DATA data, outdata;
2466         int32_t res;
2467         int ret;
2468
2469         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2470         data.dptr  = talloc_size(ctdb, data.dsize);
2471         CTDB_NO_MEMORY(ctdb, data.dptr);
2472
2473         t = (struct ctdb_control_get_tunable *)data.dptr;
2474         t->length = strlen(name)+1;
2475         memcpy(t->name, name, t->length);
2476
2477         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2478                            &outdata, &res, &timeout, NULL);
2479         talloc_free(data.dptr);
2480         if (ret != 0 || res != 0) {
2481                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2482                 return -1;
2483         }
2484
2485         if (outdata.dsize != sizeof(uint32_t)) {
2486                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2487                 talloc_free(outdata.dptr);
2488                 return -1;
2489         }
2490         
2491         *value = *(uint32_t *)outdata.dptr;
2492         talloc_free(outdata.dptr);
2493
2494         return 0;
2495 }
2496
2497 /*
2498   set a tunable
2499  */
2500 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2501                           struct timeval timeout, 
2502                           uint32_t destnode,
2503                           const char *name, uint32_t value)
2504 {
2505         struct ctdb_control_set_tunable *t;
2506         TDB_DATA data;
2507         int32_t res;
2508         int ret;
2509
2510         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2511         data.dptr  = talloc_size(ctdb, data.dsize);
2512         CTDB_NO_MEMORY(ctdb, data.dptr);
2513
2514         t = (struct ctdb_control_set_tunable *)data.dptr;
2515         t->length = strlen(name)+1;
2516         memcpy(t->name, name, t->length);
2517         t->value = value;
2518
2519         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2520                            NULL, &res, &timeout, NULL);
2521         talloc_free(data.dptr);
2522         if (ret != 0 || res != 0) {
2523                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2524                 return -1;
2525         }
2526
2527         return 0;
2528 }
2529
2530 /*
2531   list tunables
2532  */
2533 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2534                             struct timeval timeout, 
2535                             uint32_t destnode,
2536                             TALLOC_CTX *mem_ctx,
2537                             const char ***list, uint32_t *count)
2538 {
2539         TDB_DATA outdata;
2540         int32_t res;
2541         int ret;
2542         struct ctdb_control_list_tunable *t;
2543         char *p, *s, *ptr;
2544
2545         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2546                            mem_ctx, &outdata, &res, &timeout, NULL);
2547         if (ret != 0 || res != 0) {
2548                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2549                 return -1;
2550         }
2551
2552         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2553         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2554             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2555                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2556                 talloc_free(outdata.dptr);
2557                 return -1;              
2558         }
2559         
2560         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2561         CTDB_NO_MEMORY(ctdb, p);
2562
2563         talloc_free(outdata.dptr);
2564         
2565         (*list) = NULL;
2566         (*count) = 0;
2567
2568         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2569                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2570                 CTDB_NO_MEMORY(ctdb, *list);
2571                 (*list)[*count] = talloc_strdup(*list, s);
2572                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2573                 (*count)++;
2574         }
2575
2576         talloc_free(p);
2577
2578         return 0;
2579 }
2580
2581
2582 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2583                                    struct timeval timeout, uint32_t destnode,
2584                                    TALLOC_CTX *mem_ctx,
2585                                    uint32_t flags,
2586                                    struct ctdb_all_public_ips **ips)
2587 {
2588         int ret;
2589         TDB_DATA outdata;
2590         int32_t res;
2591
2592         ret = ctdb_control(ctdb, destnode, 0, 
2593                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2594                            mem_ctx, &outdata, &res, &timeout, NULL);
2595         if (ret == 0 && res == -1) {
2596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2597                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2598         }
2599         if (ret != 0 || res != 0) {
2600           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2601                 return -1;
2602         }
2603
2604         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2605         talloc_free(outdata.dptr);
2606                     
2607         return 0;
2608 }
2609
2610 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2611                              struct timeval timeout, uint32_t destnode,
2612                              TALLOC_CTX *mem_ctx,
2613                              struct ctdb_all_public_ips **ips)
2614 {
2615         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2616                                               destnode, mem_ctx,
2617                                               0, ips);
2618 }
2619
2620 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2621                         struct timeval timeout, uint32_t destnode, 
2622                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2623 {
2624         int ret, i, len;
2625         TDB_DATA outdata;
2626         int32_t res;
2627         struct ctdb_all_public_ipsv4 *ipsv4;
2628
2629         ret = ctdb_control(ctdb, destnode, 0, 
2630                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2631                            mem_ctx, &outdata, &res, &timeout, NULL);
2632         if (ret != 0 || res != 0) {
2633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2634                 return -1;
2635         }
2636
2637         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2638         len = offsetof(struct ctdb_all_public_ips, ips) +
2639                 ipsv4->num*sizeof(struct ctdb_public_ip);
2640         *ips = talloc_zero_size(mem_ctx, len);
2641         CTDB_NO_MEMORY(ctdb, *ips);
2642         (*ips)->num = ipsv4->num;
2643         for (i=0; i<ipsv4->num; i++) {
2644                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2645                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2646         }
2647
2648         talloc_free(outdata.dptr);
2649                     
2650         return 0;
2651 }
2652
2653 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2654                                  struct timeval timeout, uint32_t destnode,
2655                                  TALLOC_CTX *mem_ctx,
2656                                  const ctdb_sock_addr *addr,
2657                                  struct ctdb_control_public_ip_info **_info)
2658 {
2659         int ret;
2660         TDB_DATA indata;
2661         TDB_DATA outdata;
2662         int32_t res;
2663         struct ctdb_control_public_ip_info *info;
2664         uint32_t len;
2665         uint32_t i;
2666
2667         indata.dptr = discard_const_p(uint8_t, addr);
2668         indata.dsize = sizeof(*addr);
2669
2670         ret = ctdb_control(ctdb, destnode, 0,
2671                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2672                            mem_ctx, &outdata, &res, &timeout, NULL);
2673         if (ret != 0 || res != 0) {
2674                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2675                                 "failed ret:%d res:%d\n",
2676                                 ret, res));
2677                 return -1;
2678         }
2679
2680         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2681         if (len > outdata.dsize) {
2682                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2683                                 "returned invalid data with size %u > %u\n",
2684                                 (unsigned int)outdata.dsize,
2685                                 (unsigned int)len));
2686                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2687                 return -1;
2688         }
2689
2690         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2691         len += info->num*sizeof(struct ctdb_control_iface_info);
2692
2693         if (len > outdata.dsize) {
2694                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2695                                 "returned invalid data with size %u > %u\n",
2696                                 (unsigned int)outdata.dsize,
2697                                 (unsigned int)len));
2698                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2699                 return -1;
2700         }
2701
2702         /* make sure we null terminate the returned strings */
2703         for (i=0; i < info->num; i++) {
2704                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2705         }
2706
2707         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2708                                                                 outdata.dptr,
2709                                                                 outdata.dsize);
2710         talloc_free(outdata.dptr);
2711         if (*_info == NULL) {
2712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2713                                 "talloc_memdup size %u failed\n",
2714                                 (unsigned int)outdata.dsize));
2715                 return -1;
2716         }
2717
2718         return 0;
2719 }
2720
2721 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2722                          struct timeval timeout, uint32_t destnode,
2723                          TALLOC_CTX *mem_ctx,
2724                          struct ctdb_control_get_ifaces **_ifaces)
2725 {
2726         int ret;
2727         TDB_DATA outdata;
2728         int32_t res;
2729         struct ctdb_control_get_ifaces *ifaces;
2730         uint32_t len;
2731         uint32_t i;
2732
2733         ret = ctdb_control(ctdb, destnode, 0,
2734                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2735                            mem_ctx, &outdata, &res, &timeout, NULL);
2736         if (ret != 0 || res != 0) {
2737                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2738                                 "failed ret:%d res:%d\n",
2739                                 ret, res));
2740                 return -1;
2741         }
2742
2743         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2744         if (len > outdata.dsize) {
2745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2746                                 "returned invalid data with size %u > %u\n",
2747                                 (unsigned int)outdata.dsize,
2748                                 (unsigned int)len));
2749                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2750                 return -1;
2751         }
2752
2753         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2754         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2755
2756         if (len > outdata.dsize) {
2757                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2758                                 "returned invalid data with size %u > %u\n",
2759                                 (unsigned int)outdata.dsize,
2760                                 (unsigned int)len));
2761                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2762                 return -1;
2763         }
2764
2765         /* make sure we null terminate the returned strings */
2766         for (i=0; i < ifaces->num; i++) {
2767                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2768         }
2769
2770         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2771                                                                   outdata.dptr,
2772                                                                   outdata.dsize);
2773         talloc_free(outdata.dptr);
2774         if (*_ifaces == NULL) {
2775                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2776                                 "talloc_memdup size %u failed\n",
2777                                 (unsigned int)outdata.dsize));
2778                 return -1;
2779         }
2780
2781         return 0;
2782 }
2783
2784 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2785                              struct timeval timeout, uint32_t destnode,
2786                              TALLOC_CTX *mem_ctx,
2787                              const struct ctdb_control_iface_info *info)
2788 {
2789         int ret;
2790         TDB_DATA indata;
2791         int32_t res;
2792
2793         indata.dptr = discard_const_p(uint8_t, info);
2794         indata.dsize = sizeof(*info);
2795
2796         ret = ctdb_control(ctdb, destnode, 0,
2797                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2798                            mem_ctx, NULL, &res, &timeout, NULL);
2799         if (ret != 0 || res != 0) {
2800                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2801                                 "failed ret:%d res:%d\n",
2802                                 ret, res));
2803                 return -1;
2804         }
2805
2806         return 0;
2807 }
2808
2809 /*
2810   set/clear the permanent disabled bit on a remote node
2811  */
2812 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2813                        uint32_t set, uint32_t clear)
2814 {
2815         int ret;
2816         TDB_DATA data;
2817         struct ctdb_node_map *nodemap=NULL;
2818         struct ctdb_node_flag_change c;
2819         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2820         uint32_t recmaster;
2821         uint32_t *nodes;
2822
2823
2824         /* find the recovery master */
2825         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2826         if (ret != 0) {
2827                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2828                 talloc_free(tmp_ctx);
2829                 return ret;
2830         }
2831
2832
2833         /* read the node flags from the recmaster */
2834         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2835         if (ret != 0) {
2836                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2837                 talloc_free(tmp_ctx);
2838                 return -1;
2839         }
2840         if (destnode >= nodemap->num) {
2841                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2842                 talloc_free(tmp_ctx);
2843                 return -1;
2844         }
2845
2846         c.pnn       = destnode;
2847         c.old_flags = nodemap->nodes[destnode].flags;
2848         c.new_flags = c.old_flags;
2849         c.new_flags |= set;
2850         c.new_flags &= ~clear;
2851
2852         data.dsize = sizeof(c);
2853         data.dptr = (unsigned char *)&c;
2854
2855         /* send the flags update to all connected nodes */
2856         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2857
2858         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2859                                         nodes, 0,
2860                                         timeout, false, data,
2861                                         NULL, NULL,
2862                                         NULL) != 0) {
2863                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2864
2865                 talloc_free(tmp_ctx);
2866                 return -1;
2867         }
2868
2869         talloc_free(tmp_ctx);
2870         return 0;
2871 }
2872
2873
2874 /*
2875   get all tunables
2876  */
2877 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2878                                struct timeval timeout, 
2879                                uint32_t destnode,
2880                                struct ctdb_tunable *tunables)
2881 {
2882         TDB_DATA outdata;
2883         int ret;
2884         int32_t res;
2885
2886         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2887                            &outdata, &res, &timeout, NULL);
2888         if (ret != 0 || res != 0) {
2889                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2890                 return -1;
2891         }
2892
2893         if (outdata.dsize != sizeof(*tunables)) {
2894                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2895                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2896                 return -1;              
2897         }
2898
2899         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2900         talloc_free(outdata.dptr);
2901         return 0;
2902 }
2903
2904 /*
2905   add a public address to a node
2906  */
2907 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2908                       struct timeval timeout, 
2909                       uint32_t destnode,
2910                       struct ctdb_control_ip_iface *pub)
2911 {
2912         TDB_DATA data;
2913         int32_t res;
2914         int ret;
2915
2916         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2917         data.dptr  = (unsigned char *)pub;
2918
2919         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2920                            NULL, &res, &timeout, NULL);
2921         if (ret != 0 || res != 0) {
2922                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2923                 return -1;
2924         }
2925
2926         return 0;
2927 }
2928
2929 /*
2930   delete a public address from a node
2931  */
2932 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2933                       struct timeval timeout, 
2934                       uint32_t destnode,
2935                       struct ctdb_control_ip_iface *pub)
2936 {
2937         TDB_DATA data;
2938         int32_t res;
2939         int ret;
2940
2941         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2942         data.dptr  = (unsigned char *)pub;
2943
2944         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2945                            NULL, &res, &timeout, NULL);
2946         if (ret != 0 || res != 0) {
2947                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2948                 return -1;
2949         }
2950
2951         return 0;
2952 }
2953
2954 /*
2955   kill a tcp connection
2956  */
2957 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2958                       struct timeval timeout, 
2959                       uint32_t destnode,
2960                       struct ctdb_control_killtcp *killtcp)
2961 {
2962         TDB_DATA data;
2963         int32_t res;
2964         int ret;
2965
2966         data.dsize = sizeof(struct ctdb_control_killtcp);
2967         data.dptr  = (unsigned char *)killtcp;
2968
2969         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2970                            NULL, &res, &timeout, NULL);
2971         if (ret != 0 || res != 0) {
2972                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2973                 return -1;
2974         }
2975
2976         return 0;
2977 }
2978
2979 /*
2980   send a gratious arp
2981  */
2982 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2983                       struct timeval timeout, 
2984                       uint32_t destnode,
2985                       ctdb_sock_addr *addr,
2986                       const char *ifname)
2987 {
2988         TDB_DATA data;
2989         int32_t res;
2990         int ret, len;
2991         struct ctdb_control_gratious_arp *gratious_arp;
2992         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2993
2994
2995         len = strlen(ifname)+1;
2996         gratious_arp = talloc_size(tmp_ctx, 
2997                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2998         CTDB_NO_MEMORY(ctdb, gratious_arp);
2999
3000         gratious_arp->addr = *addr;
3001         gratious_arp->len = len;
3002         memcpy(&gratious_arp->iface[0], ifname, len);
3003
3004
3005         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3006         data.dptr  = (unsigned char *)gratious_arp;
3007
3008         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3009                            NULL, &res, &timeout, NULL);
3010         if (ret != 0 || res != 0) {
3011                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3012                 talloc_free(tmp_ctx);
3013                 return -1;
3014         }
3015
3016         talloc_free(tmp_ctx);
3017         return 0;
3018 }
3019
3020 /*
3021   get a list of all tcp tickles that a node knows about for a particular vnn
3022  */
3023 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3024                               struct timeval timeout, uint32_t destnode, 
3025                               TALLOC_CTX *mem_ctx, 
3026                               ctdb_sock_addr *addr,
3027                               struct ctdb_control_tcp_tickle_list **list)
3028 {
3029         int ret;
3030         TDB_DATA data, outdata;
3031         int32_t status;
3032
3033         data.dptr = (uint8_t*)addr;
3034         data.dsize = sizeof(ctdb_sock_addr);
3035
3036         ret = ctdb_control(ctdb, destnode, 0, 
3037                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3038                            mem_ctx, &outdata, &status, NULL, NULL);
3039         if (ret != 0 || status != 0) {
3040                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3041                 return -1;
3042         }
3043
3044         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3045
3046         return status;
3047 }
3048
3049 /*
3050   register a server id
3051  */
3052 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3053                       struct timeval timeout, 
3054                       struct ctdb_server_id *id)
3055 {
3056         TDB_DATA data;
3057         int32_t res;
3058         int ret;
3059
3060         data.dsize = sizeof(struct ctdb_server_id);
3061         data.dptr  = (unsigned char *)id;
3062
3063         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3064                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3065                         0, data, NULL,
3066                         NULL, &res, &timeout, NULL);
3067         if (ret != 0 || res != 0) {
3068                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3069                 return -1;
3070         }
3071
3072         return 0;
3073 }
3074
3075 /*
3076   unregister a server id
3077  */
3078 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3079                       struct timeval timeout, 
3080                       struct ctdb_server_id *id)
3081 {
3082         TDB_DATA data;
3083         int32_t res;
3084         int ret;
3085
3086         data.dsize = sizeof(struct ctdb_server_id);
3087         data.dptr  = (unsigned char *)id;
3088
3089         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3090                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3091                         0, data, NULL,
3092                         NULL, &res, &timeout, NULL);
3093         if (ret != 0 || res != 0) {
3094                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3095                 return -1;
3096         }
3097
3098         return 0;
3099 }
3100
3101
3102 /*
3103   check if a server id exists
3104
3105   if a server id does exist, return *status == 1, otherwise *status == 0
3106  */
3107 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3108                       struct timeval timeout, 
3109                       uint32_t destnode,
3110                       struct ctdb_server_id *id,
3111                       uint32_t *status)
3112 {
3113         TDB_DATA data;
3114         int32_t res;
3115         int ret;
3116
3117         data.dsize = sizeof(struct ctdb_server_id);
3118         data.dptr  = (unsigned char *)id;
3119
3120         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3121                         0, data, NULL,
3122                         NULL, &res, &timeout, NULL);
3123         if (ret != 0) {
3124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3125                 return -1;
3126         }
3127
3128         if (res) {
3129                 *status = 1;
3130         } else {
3131                 *status = 0;
3132         }
3133
3134         return 0;
3135 }
3136
3137 /*
3138    get the list of server ids that are registered on a node
3139 */
3140 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3141                 TALLOC_CTX *mem_ctx,
3142                 struct timeval timeout, uint32_t destnode, 
3143                 struct ctdb_server_id_list **svid_list)
3144 {
3145         int ret;
3146         TDB_DATA outdata;
3147         int32_t res;
3148
3149         ret = ctdb_control(ctdb, destnode, 0, 
3150                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3151                            mem_ctx, &outdata, &res, &timeout, NULL);
3152         if (ret != 0 || res != 0) {
3153                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3154                 return -1;
3155         }
3156
3157         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3158                     
3159         return 0;
3160 }
3161
3162 /*
3163   initialise the ctdb daemon for client applications
3164
3165   NOTE: In current code the daemon does not fork. This is for testing purposes only
3166   and to simplify the code.
3167 */
3168 struct ctdb_context *ctdb_init(struct event_context *ev)
3169 {
3170         int ret;
3171         struct ctdb_context *ctdb;
3172
3173         ctdb = talloc_zero(ev, struct ctdb_context);
3174         if (ctdb == NULL) {
3175                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3176                 return NULL;
3177         }
3178         ctdb->ev  = ev;
3179         ctdb->idr = idr_init(ctdb);
3180         /* Wrap early to exercise code. */
3181         ctdb->lastid = INT_MAX-200;
3182         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3183
3184         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3185         if (ret != 0) {
3186                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3187                 talloc_free(ctdb);
3188                 return NULL;
3189         }
3190
3191         ctdb->statistics.statistics_start_time = timeval_current();
3192
3193         return ctdb;
3194 }
3195
3196
3197 /*
3198   set some ctdb flags
3199 */
3200 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3201 {
3202         ctdb->flags |= flags;
3203 }
3204
3205 /*
3206   setup the local socket name
3207 */
3208 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3209 {
3210         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3211         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3212
3213         return 0;
3214 }
3215
3216 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3217 {
3218         return ctdb->daemon.name;
3219 }
3220
3221 /*
3222   return the pnn of this node
3223 */
3224 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3225 {
3226         return ctdb->pnn;
3227 }
3228
3229
3230 /*
3231   get the uptime of a remote node
3232  */
3233 struct ctdb_client_control_state *
3234 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3235 {
3236         return ctdb_control_send(ctdb, destnode, 0, 
3237                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3238                            mem_ctx, &timeout, NULL);
3239 }
3240
3241 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3242 {
3243         int ret;
3244         int32_t res;
3245         TDB_DATA outdata;
3246
3247         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3248         if (ret != 0 || res != 0) {
3249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3250                 return -1;
3251         }
3252
3253         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3254
3255         return 0;
3256 }
3257
3258 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3259 {
3260         struct ctdb_client_control_state *state;
3261
3262         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3263         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3264 }
3265
3266 /*
3267   send a control to execute the "recovered" event script on a node
3268  */
3269 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3270 {
3271         int ret;
3272         int32_t status;
3273
3274         ret = ctdb_control(ctdb, destnode, 0, 
3275                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3276                            NULL, NULL, &status, &timeout, NULL);
3277         if (ret != 0 || status != 0) {
3278                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3279                 return -1;
3280         }
3281
3282         return 0;
3283 }
3284
3285 /* 
3286   callback for the async helpers used when sending the same control
3287   to multiple nodes in parallell.
3288 */
3289 static void async_callback(struct ctdb_client_control_state *state)
3290 {
3291         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3292         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3293         int ret;
3294         TDB_DATA outdata;
3295         int32_t res;
3296         uint32_t destnode = state->c->hdr.destnode;
3297
3298         /* one more node has responded with recmode data */
3299         data->count--;
3300
3301         /* if we failed to push the db, then return an error and let
3302            the main loop try again.
3303         */
3304         if (state->state != CTDB_CONTROL_DONE) {
3305                 if ( !data->dont_log_errors) {
3306                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3307                 }
3308                 data->fail_count++;
3309                 if (data->fail_callback) {
3310                         data->fail_callback(ctdb, destnode, res, outdata,
3311                                         data->callback_data);
3312                 }
3313                 return;
3314         }
3315         
3316         state->async.fn = NULL;
3317
3318         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3319         if ((ret != 0) || (res != 0)) {
3320                 if ( !data->dont_log_errors) {
3321                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3322                 }
3323                 data->fail_count++;
3324                 if (data->fail_callback) {
3325                         data->fail_callback(ctdb, destnode, res, outdata,
3326                                         data->callback_data);
3327                 }
3328         }
3329         if ((ret == 0) && (data->callback != NULL)) {
3330                 data->callback(ctdb, destnode, res, outdata,
3331                                         data->callback_data);
3332         }
3333 }
3334
3335
3336 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3337 {
3338         /* set up the callback functions */
3339         state->async.fn = async_callback;
3340         state->async.private_data = data;
3341         
3342         /* one more control to wait for to complete */
3343         data->count++;
3344 }
3345
3346
3347 /* wait for up to the maximum number of seconds allowed
3348    or until all nodes we expect a response from has replied
3349 */
3350 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3351 {
3352         while (data->count > 0) {
3353                 event_loop_once(ctdb->ev);
3354         }
3355         if (data->fail_count != 0) {
3356                 if (!data->dont_log_errors) {
3357                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3358                                  data->fail_count));
3359                 }
3360                 return -1;
3361         }
3362         return 0;
3363 }
3364
3365
3366 /* 
3367    perform a simple control on the listed nodes
3368    The control cannot return data
3369  */
3370 int ctdb_client_async_control(struct ctdb_context *ctdb,
3371                                 enum ctdb_controls opcode,
3372                                 uint32_t *nodes,
3373                                 uint64_t srvid,
3374                                 struct timeval timeout,
3375                                 bool dont_log_errors,
3376                                 TDB_DATA data,
3377                                 client_async_callback client_callback,
3378                                 client_async_callback fail_callback,
3379                                 void *callback_data)
3380 {
3381         struct client_async_data *async_data;
3382         struct ctdb_client_control_state *state;
3383         int j, num_nodes;
3384
3385         async_data = talloc_zero(ctdb, struct client_async_data);
3386         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3387         async_data->dont_log_errors = dont_log_errors;
3388         async_data->callback = client_callback;
3389         async_data->fail_callback = fail_callback;
3390         async_data->callback_data = callback_data;
3391         async_data->opcode        = opcode;
3392
3393         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3394
3395         /* loop over all nodes and send an async control to each of them */
3396         for (j=0; j<num_nodes; j++) {
3397                 uint32_t pnn = nodes[j];
3398
3399                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3400                                           0, data, async_data, &timeout, NULL);
3401                 if (state == NULL) {
3402                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3403                         talloc_free(async_data);
3404                         return -1;
3405                 }
3406                 
3407                 ctdb_client_async_add(async_data, state);
3408         }
3409
3410         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3411                 talloc_free(async_data);
3412                 return -1;
3413         }
3414
3415         talloc_free(async_data);
3416         return 0;
3417 }
3418
3419 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3420                                 struct ctdb_vnn_map *vnn_map,
3421                                 TALLOC_CTX *mem_ctx,
3422                                 bool include_self)
3423 {
3424         int i, j, num_nodes;
3425         uint32_t *nodes;
3426
3427         for (i=num_nodes=0;i<vnn_map->size;i++) {
3428                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3429                         continue;
3430                 }
3431                 num_nodes++;
3432         } 
3433
3434         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3435         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3436
3437         for (i=j=0;i<vnn_map->size;i++) {
3438                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3439                         continue;
3440                 }
3441                 nodes[j++] = vnn_map->map[i];
3442         } 
3443
3444         return nodes;
3445 }
3446
3447 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3448                                 struct ctdb_node_map *node_map,
3449                                 TALLOC_CTX *mem_ctx,
3450                                 bool include_self)
3451 {
3452         int i, j, num_nodes;
3453         uint32_t *nodes;
3454
3455         for (i=num_nodes=0;i<node_map->num;i++) {
3456                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3457                         continue;
3458                 }
3459                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3460                         continue;
3461                 }
3462                 num_nodes++;
3463         } 
3464
3465         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3466         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3467
3468         for (i=j=0;i<node_map->num;i++) {
3469                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3470                         continue;
3471                 }
3472                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3473                         continue;
3474                 }
3475                 nodes[j++] = node_map->nodes[i].pnn;
3476         } 
3477
3478         return nodes;
3479 }
3480
3481 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3482                                 struct ctdb_node_map *node_map,
3483                                 TALLOC_CTX *mem_ctx,
3484                                 uint32_t pnn)
3485 {
3486         int i, j, num_nodes;
3487         uint32_t *nodes;
3488
3489         for (i=num_nodes=0;i<node_map->num;i++) {
3490                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3491                         continue;
3492                 }
3493                 if (node_map->nodes[i].pnn == pnn) {
3494                         continue;
3495                 }
3496                 num_nodes++;
3497         } 
3498
3499         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3500         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3501
3502         for (i=j=0;i<node_map->num;i++) {
3503                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3504                         continue;
3505                 }
3506                 if (node_map->nodes[i].pnn == pnn) {
3507                         continue;
3508                 }
3509                 nodes[j++] = node_map->nodes[i].pnn;
3510         } 
3511
3512         return nodes;
3513 }
3514
3515 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3516                                 struct ctdb_node_map *node_map,
3517                                 TALLOC_CTX *mem_ctx,
3518                                 bool include_self)
3519 {
3520         int i, j, num_nodes;
3521         uint32_t *nodes;
3522
3523         for (i=num_nodes=0;i<node_map->num;i++) {
3524                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3525                         continue;
3526                 }
3527                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3528                         continue;
3529                 }
3530                 num_nodes++;
3531         } 
3532
3533         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3534         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3535
3536         for (i=j=0;i<node_map->num;i++) {
3537                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3538                         continue;
3539                 }
3540                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3541                         continue;
3542                 }
3543                 nodes[j++] = node_map->nodes[i].pnn;
3544         } 
3545
3546         return nodes;
3547 }
3548
3549 /* 
3550   this is used to test if a pnn lock exists and if it exists will return
3551   the number of connections that pnn has reported or -1 if that recovery
3552   daemon is not running.
3553 */
3554 int
3555 ctdb_read_pnn_lock(int fd, int32_t pnn)
3556 {
3557         struct flock lock;
3558         char c;
3559
3560         lock.l_type = F_WRLCK;
3561         lock.l_whence = SEEK_SET;
3562         lock.l_start = pnn;
3563         lock.l_len = 1;
3564         lock.l_pid = 0;
3565
3566         if (fcntl(fd, F_GETLK, &lock) != 0) {
3567                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3568                 return -1;
3569         }
3570
3571         if (lock.l_type == F_UNLCK) {
3572                 return -1;
3573         }
3574
3575         if (pread(fd, &c, 1, pnn) == -1) {
3576                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3577                 return -1;
3578         }
3579
3580         return c;
3581 }
3582
3583 /*
3584   get capabilities of a remote node
3585  */
3586 struct ctdb_client_control_state *
3587 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3588 {
3589         return ctdb_control_send(ctdb, destnode, 0, 
3590                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3591                            mem_ctx, &timeout, NULL);
3592 }
3593
3594 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3595 {
3596         int ret;
3597         int32_t res;
3598         TDB_DATA outdata;
3599
3600         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3601         if ( (ret != 0) || (res != 0) ) {
3602                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3603                 return -1;
3604         }
3605
3606         if (capabilities) {
3607                 *capabilities = *((uint32_t *)outdata.dptr);
3608         }
3609
3610         return 0;
3611 }
3612
3613 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3614 {
3615         struct ctdb_client_control_state *state;
3616         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3617         int ret;
3618
3619         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3620         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3621         talloc_free(tmp_ctx);
3622         return ret;
3623 }
3624
3625 /**
3626  * check whether a transaction is active on a given db on a given node
3627  */
3628 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3629                                      uint32_t destnode,
3630                                      uint32_t db_id)
3631 {
3632         int32_t status;
3633         int ret;
3634         TDB_DATA indata;
3635
3636         indata.dptr = (uint8_t *)&db_id;
3637         indata.dsize = sizeof(db_id);
3638
3639         ret = ctdb_control(ctdb, destnode, 0,
3640                            CTDB_CONTROL_TRANS2_ACTIVE,
3641                            0, indata, NULL, NULL, &status,
3642                            NULL, NULL);
3643
3644         if (ret != 0) {
3645                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3646                 return -1;
3647         }
3648
3649         return status;
3650 }
3651
3652
3653 struct ctdb_transaction_handle {
3654         struct ctdb_db_context *ctdb_db;
3655         bool in_replay;
3656         /*
3657          * we store the reads and writes done under a transaction:
3658          * - one list stores both reads and writes (m_all),
3659          * - the other just writes (m_write)
3660          */
3661         struct ctdb_marshall_buffer *m_all;
3662         struct ctdb_marshall_buffer *m_write;
3663 };
3664
3665 /* start a transaction on a database */
3666 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3667 {
3668         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3669         return 0;
3670 }
3671
3672 /* start a transaction on a database */
3673 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3674 {
3675         struct ctdb_record_handle *rh;
3676         TDB_DATA key;
3677         TDB_DATA data;
3678         struct ctdb_ltdb_header header;
3679         TALLOC_CTX *tmp_ctx;
3680         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3681         int ret;
3682         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3683         pid_t pid;
3684         int32_t status;
3685
3686         key.dptr = discard_const(keyname);
3687         key.dsize = strlen(keyname);
3688
3689         if (!ctdb_db->persistent) {
3690                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3691                 return -1;
3692         }
3693
3694 again:
3695         tmp_ctx = talloc_new(h);
3696
3697         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3698         if (rh == NULL) {
3699                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3700                 talloc_free(tmp_ctx);
3701                 return -1;
3702         }
3703
3704         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3705                                               CTDB_CURRENT_NODE,
3706                                               ctdb_db->db_id);
3707         if (status == 1) {
3708                 unsigned long int usec = (1000 + random()) % 100000;
3709                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3710                                     "on db_id[0x%08x]. waiting for %lu "
3711                                     "microseconds\n",
3712                                     ctdb_db->db_id, usec));
3713                 talloc_free(tmp_ctx);
3714                 usleep(usec);
3715                 goto again;
3716         }
3717
3718         /*
3719          * store the pid in the database:
3720          * it is not enough that the node is dmaster...
3721          */
3722         pid = getpid();
3723         data.dptr = (unsigned char *)&pid;
3724         data.dsize = sizeof(pid_t);
3725         rh->header.rsn++;
3726         rh->header.dmaster = ctdb_db->ctdb->pnn;
3727         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3728         if (ret != 0) {
3729                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3730                                   "transaction record\n"));
3731                 talloc_free(tmp_ctx);
3732                 return -1;
3733         }
3734
3735         talloc_free(rh);
3736
3737         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3738         if (ret != 0) {
3739                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3740                 talloc_free(tmp_ctx);
3741                 return -1;
3742         }
3743
3744         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3745         if (ret != 0) {
3746                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3747                                  "lock record inside transaction\n"));
3748                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3749                 talloc_free(tmp_ctx);
3750                 goto again;
3751         }
3752
3753         if (header.dmaster != ctdb_db->ctdb->pnn) {
3754                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3755                                    "transaction lock record\n"));
3756                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3757                 talloc_free(tmp_ctx);
3758                 goto again;
3759         }
3760
3761         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3762                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3763                                     "the transaction lock record\n"));
3764                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3765                 talloc_free(tmp_ctx);
3766                 goto again;
3767         }
3768
3769         talloc_free(tmp_ctx);
3770
3771         return 0;
3772 }
3773
3774
3775 /* start a transaction on a database */
3776 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3777                                                        TALLOC_CTX *mem_ctx)
3778 {
3779         struct ctdb_transaction_handle *h;
3780         int ret;
3781
3782         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3783         if (h == NULL) {
3784                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3785                 return NULL;
3786         }
3787
3788         h->ctdb_db = ctdb_db;
3789
3790         ret = ctdb_transaction_fetch_start(h);
3791         if (ret != 0) {
3792                 talloc_free(h);
3793                 return NULL;
3794         }
3795
3796         talloc_set_destructor(h, ctdb_transaction_destructor);
3797
3798         return h;
3799 }
3800
3801
3802
3803 /*
3804   fetch a record inside a transaction
3805  */
3806 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3807                            TALLOC_CTX *mem_ctx, 
3808                            TDB_DATA key, TDB_DATA *data)
3809 {
3810         struct ctdb_ltdb_header header;
3811         int ret;
3812
3813         ZERO_STRUCT(header);
3814
3815         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3816         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3817                 /* record doesn't exist yet */
3818                 *data = tdb_null;
3819                 ret = 0;
3820         }
3821         
3822         if (ret != 0) {
3823                 return ret;
3824         }
3825
3826         if (!h->in_replay) {
3827                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3828                 if (h->m_all == NULL) {
3829                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3830                         return -1;
3831                 }
3832         }
3833
3834         return 0;
3835 }
3836
3837 /*
3838   stores a record inside a transaction
3839  */
3840 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3841                            TDB_DATA key, TDB_DATA data)
3842 {
3843         TALLOC_CTX *tmp_ctx = talloc_new(h);
3844         struct ctdb_ltdb_header header;
3845         TDB_DATA olddata;
3846         int ret;
3847
3848         ZERO_STRUCT(header);
3849
3850         /* we need the header so we can update the RSN */
3851         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3852         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3853                 /* the record doesn't exist - create one with us as dmaster.
3854                    This is only safe because we are in a transaction and this
3855                    is a persistent database */
3856                 ZERO_STRUCT(header);
3857         } else if (ret != 0) {
3858                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3859                 talloc_free(tmp_ctx);
3860                 return ret;
3861         }
3862
3863         if (data.dsize == olddata.dsize &&
3864             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3865                 /* save writing the same data */
3866                 talloc_free(tmp_ctx);
3867                 return 0;
3868         }
3869
3870         header.dmaster = h->ctdb_db->ctdb->pnn;
3871         header.rsn++;
3872
3873         if (!h->in_replay) {
3874                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3875                 if (h->m_all == NULL) {
3876                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3877                         talloc_free(tmp_ctx);
3878                         return -1;
3879                 }
3880         }               
3881
3882         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3883         if (h->m_write == NULL) {
3884                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3885                 talloc_free(tmp_ctx);
3886                 return -1;
3887         }
3888         
3889         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3890
3891         talloc_free(tmp_ctx);
3892         
3893         return ret;
3894 }
3895
3896 /*
3897   replay a transaction
3898  */
3899 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3900 {
3901         int ret, i;
3902         struct ctdb_rec_data *rec = NULL;
3903
3904         h->in_replay = true;
3905         talloc_free(h->m_write);
3906         h->m_write = NULL;
3907
3908         ret = ctdb_transaction_fetch_start(h);
3909         if (ret != 0) {
3910                 return ret;
3911         }
3912
3913         for (i=0;i<h->m_all->count;i++) {
3914                 TDB_DATA key, data;
3915
3916                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3917                 if (rec == NULL) {
3918                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3919                         goto failed;
3920                 }
3921
3922                 if (rec->reqid == 0) {
3923                         /* its a store */
3924                         if (ctdb_transaction_store(h, key, data) != 0) {
3925                                 goto failed;
3926                         }
3927                 } else {
3928                         TDB_DATA data2;
3929                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3930
3931                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3932                                 talloc_free(tmp_ctx);
3933                                 goto failed;
3934                         }
3935                         if (data2.dsize != data.dsize ||
3936                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3937                                 /* the record has changed on us - we have to give up */
3938                                 talloc_free(tmp_ctx);
3939                                 goto failed;
3940                         }
3941                         talloc_free(tmp_ctx);
3942                 }
3943         }
3944         
3945         return 0;
3946
3947 failed:
3948         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3949         return -1;
3950 }
3951
3952
3953 /*
3954   commit a transaction
3955  */
3956 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3957 {
3958         int ret, retries=0;
3959         int32_t status;
3960         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3961         struct timeval timeout;
3962         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3963
3964         talloc_set_destructor(h, NULL);
3965
3966         /* our commit strategy is quite complex.
3967
3968            - we first try to commit the changes to all other nodes
3969
3970            - if that works, then we commit locally and we are done
3971
3972            - if a commit on another node fails, then we need to cancel
3973              the transaction, then restart the transaction (thus
3974              opening a window of time for a pending recovery to
3975              complete), then replay the transaction, checking all the
3976              reads and writes (checking that reads give the same data,
3977              and writes succeed). Then we retry the transaction to the
3978              other nodes
3979         */
3980
3981 again:
3982         if (h->m_write == NULL) {
3983                 /* no changes were made */
3984                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3985                 talloc_free(h);
3986                 return 0;
3987         }
3988
3989         /* tell ctdbd to commit to the other nodes */
3990         timeout = timeval_current_ofs(1, 0);
3991         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3992                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3993                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3994                            &timeout, NULL);
3995         if (ret != 0 || status != 0) {
3996                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3997                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3998                                      ", retrying after 1 second...\n",
3999                                      (retries==0)?"":"retry "));
4000                 sleep(1);
4001
4002                 if (ret != 0) {
4003                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4004                 } else {
4005                         /* work out what error code we will give if we 
4006                            have to fail the operation */
4007                         switch ((enum ctdb_trans2_commit_error)status) {
4008                         case CTDB_TRANS2_COMMIT_SUCCESS:
4009                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4010                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4011                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4012                                 break;
4013                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4014                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4015                                 break;
4016                         }
4017                 }
4018
4019                 if (++retries == 100) {
4020                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4021                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4022                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4023                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4024                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4025                         talloc_free(h);
4026                         return -1;
4027                 }               
4028
4029                 if (ctdb_replay_transaction(h) != 0) {
4030                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4031                                           "transaction on db 0x%08x, "
4032                                           "failure control =%u\n",
4033                                           h->ctdb_db->db_id,
4034                                           (unsigned)failure_control));
4035                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4036                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4037                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4038                         talloc_free(h);
4039                         return -1;
4040                 }
4041                 goto again;
4042         } else {
4043                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4044         }
4045
4046         /* do the real commit locally */
4047         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4048         if (ret != 0) {
4049                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4050                                   "on db id 0x%08x locally, "
4051                                   "failure_control=%u\n",
4052                                   h->ctdb_db->db_id,
4053                                   (unsigned)failure_control));
4054                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4055                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4056                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4057                 talloc_free(h);
4058                 return ret;
4059         }
4060
4061         /* tell ctdbd that we are finished with our local commit */
4062         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4063                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4064                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4065         talloc_free(h);
4066         return 0;
4067 }
4068
4069 /*
4070   recovery daemon ping to main daemon
4071  */
4072 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4073 {
4074         int ret;
4075         int32_t res;
4076
4077         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4078                            ctdb, NULL, &res, NULL, NULL);
4079         if (ret != 0 || res != 0) {
4080                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4081                 return -1;
4082         }
4083
4084         return 0;
4085 }
4086
4087 /* when forking the main daemon and the child process needs to connect back
4088  * to the daemon as a client process, this function can be used to change
4089  * the ctdb context from daemon into client mode
4090  */
4091 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4092 {
4093         int ret;
4094         va_list ap;
4095
4096         /* Add extra information so we can identify this in the logs */
4097         va_start(ap, fmt);
4098         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4099         va_end(ap);
4100
4101         /* shutdown the transport */
4102         if (ctdb->methods) {
4103                 ctdb->methods->shutdown(ctdb);
4104         }
4105
4106         /* get a new event context */
4107         talloc_free(ctdb->ev);
4108         ctdb->ev = event_context_init(ctdb);
4109         tevent_loop_allow_nesting(ctdb->ev);
4110
4111         close(ctdb->daemon.sd);
4112         ctdb->daemon.sd = -1;
4113
4114         /* the client does not need to be realtime */
4115         if (ctdb->do_setsched) {
4116                 ctdb_restore_scheduler(ctdb);
4117         }
4118
4119         /* initialise ctdb */
4120         ret = ctdb_socket_connect(ctdb);
4121         if (ret != 0) {
4122                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4123                 return -1;
4124         }
4125
4126          return 0;
4127 }
4128
4129 /*
4130   get the status of running the monitor eventscripts: NULL means never run.
4131  */
4132 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4133                 struct timeval timeout, uint32_t destnode, 
4134                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4135                 struct ctdb_scripts_wire **scripts)
4136 {
4137         int ret;
4138         TDB_DATA outdata, indata;
4139         int32_t res;
4140         uint32_t uinttype = type;
4141
4142         indata.dptr = (uint8_t *)&uinttype;
4143         indata.dsize = sizeof(uinttype);
4144
4145         ret = ctdb_control(ctdb, destnode, 0, 
4146                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4147                            mem_ctx, &outdata, &res, &timeout, NULL);
4148         if (ret != 0 || res != 0) {
4149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4150                 return -1;
4151         }
4152
4153         if (outdata.dsize == 0) {
4154                 *scripts = NULL;
4155         } else {
4156                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4157                 talloc_free(outdata.dptr);
4158         }
4159                     
4160         return 0;
4161 }
4162
4163 /*
4164   tell the main daemon how long it took to lock the reclock file
4165  */
4166 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4167 {
4168         int ret;
4169         int32_t res;
4170         TDB_DATA data;
4171
4172         data.dptr = (uint8_t *)&latency;
4173         data.dsize = sizeof(latency);
4174
4175         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4176                            ctdb, NULL, &res, NULL, NULL);
4177         if (ret != 0 || res != 0) {
4178                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4179                 return -1;
4180         }
4181
4182         return 0;
4183 }
4184
4185 /*
4186   get the name of the reclock file
4187  */
4188 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4189                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4190                          const char **name)
4191 {
4192         int ret;
4193         int32_t res;
4194         TDB_DATA data;
4195
4196         ret = ctdb_control(ctdb, destnode, 0, 
4197                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4198                            mem_ctx, &data, &res, &timeout, NULL);
4199         if (ret != 0 || res != 0) {
4200                 return -1;
4201         }
4202
4203         if (data.dsize == 0) {
4204                 *name = NULL;
4205         } else {
4206                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4207         }
4208         talloc_free(data.dptr);
4209
4210         return 0;
4211 }
4212
4213 /*
4214   set the reclock filename for a node
4215  */
4216 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4217 {
4218         int ret;
4219         TDB_DATA data;
4220         int32_t res;
4221
4222         if (reclock == NULL) {
4223                 data.dsize = 0;
4224                 data.dptr  = NULL;
4225         } else {
4226                 data.dsize = strlen(reclock) + 1;
4227                 data.dptr  = discard_const(reclock);
4228         }
4229
4230         ret = ctdb_control(ctdb, destnode, 0, 
4231                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4232                            NULL, NULL, &res, &timeout, NULL);
4233         if (ret != 0 || res != 0) {
4234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4235                 return -1;
4236         }
4237
4238         return 0;
4239 }
4240
4241 /*
4242   stop a node
4243  */
4244 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4245 {
4246         int ret;
4247         int32_t res;
4248
4249         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4250                            ctdb, NULL, &res, &timeout, NULL);
4251         if (ret != 0 || res != 0) {
4252                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4253                 return -1;
4254         }
4255
4256         return 0;
4257 }
4258
4259 /*
4260   continue a node
4261  */
4262 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4263 {
4264         int ret;
4265
4266         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4267                            ctdb, NULL, NULL, &timeout, NULL);
4268         if (ret != 0) {
4269                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4270                 return -1;
4271         }
4272
4273         return 0;
4274 }
4275
4276 /*
4277   set the natgw state for a node
4278  */
4279 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4280 {
4281         int ret;
4282         TDB_DATA data;
4283         int32_t res;
4284
4285         data.dsize = sizeof(natgwstate);
4286         data.dptr  = (uint8_t *)&natgwstate;
4287
4288         ret = ctdb_control(ctdb, destnode, 0, 
4289                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4290                            NULL, NULL, &res, &timeout, NULL);
4291         if (ret != 0 || res != 0) {
4292                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4293                 return -1;
4294         }
4295
4296         return 0;
4297 }
4298
4299 /*
4300   set the lmaster role for a node
4301  */
4302 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4303 {
4304         int ret;
4305         TDB_DATA data;
4306         int32_t res;
4307
4308         data.dsize = sizeof(lmasterrole);
4309         data.dptr  = (uint8_t *)&lmasterrole;
4310
4311         ret = ctdb_control(ctdb, destnode, 0, 
4312                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4313                            NULL, NULL, &res, &timeout, NULL);
4314         if (ret != 0 || res != 0) {
4315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4316                 return -1;
4317         }
4318
4319         return 0;
4320 }
4321
4322 /*
4323   set the recmaster role for a node
4324  */
4325 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4326 {
4327         int ret;
4328         TDB_DATA data;
4329         int32_t res;
4330
4331         data.dsize = sizeof(recmasterrole);
4332         data.dptr  = (uint8_t *)&recmasterrole;
4333
4334         ret = ctdb_control(ctdb, destnode, 0, 
4335                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4336                            NULL, NULL, &res, &timeout, NULL);
4337         if (ret != 0 || res != 0) {
4338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4339                 return -1;
4340         }
4341
4342         return 0;
4343 }
4344
4345 /* enable an eventscript
4346  */
4347 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4348 {
4349         int ret;
4350         TDB_DATA data;
4351         int32_t res;
4352
4353         data.dsize = strlen(script) + 1;
4354         data.dptr  = discard_const(script);
4355
4356         ret = ctdb_control(ctdb, destnode, 0, 
4357                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4358                            NULL, NULL, &res, &timeout, NULL);
4359         if (ret != 0 || res != 0) {
4360                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4361                 return -1;
4362         }
4363
4364         return 0;
4365 }
4366
4367 /* disable an eventscript
4368  */
4369 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4370 {
4371         int ret;
4372         TDB_DATA data;
4373         int32_t res;
4374
4375         data.dsize = strlen(script) + 1;
4376         data.dptr  = discard_const(script);
4377
4378         ret = ctdb_control(ctdb, destnode, 0, 
4379                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4380                            NULL, NULL, &res, &timeout, NULL);
4381         if (ret != 0 || res != 0) {
4382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4383                 return -1;
4384         }
4385
4386         return 0;
4387 }
4388
4389
4390 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4391 {
4392         int ret;
4393         TDB_DATA data;
4394         int32_t res;
4395
4396         data.dsize = sizeof(*bantime);
4397         data.dptr  = (uint8_t *)bantime;
4398
4399         ret = ctdb_control(ctdb, destnode, 0, 
4400                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4401                            NULL, NULL, &res, &timeout, NULL);
4402         if (ret != 0 || res != 0) {
4403                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4404                 return -1;
4405         }
4406
4407         return 0;
4408 }
4409
4410
4411 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4412 {
4413         int ret;
4414         TDB_DATA outdata;
4415         int32_t res;
4416         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4417
4418         ret = ctdb_control(ctdb, destnode, 0, 
4419                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4420                            tmp_ctx, &outdata, &res, &timeout, NULL);
4421         if (ret != 0 || res != 0) {
4422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4423                 talloc_free(tmp_ctx);
4424                 return -1;
4425         }
4426
4427         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4428         talloc_free(tmp_ctx);
4429
4430         return 0;
4431 }
4432
4433
4434 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4435 {
4436         int ret;
4437         int32_t res;
4438         TDB_DATA data;
4439         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4440
4441         data.dptr = (uint8_t*)db_prio;
4442         data.dsize = sizeof(*db_prio);
4443
4444         ret = ctdb_control(ctdb, destnode, 0, 
4445                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4446                            tmp_ctx, NULL, &res, &timeout, NULL);
4447         if (ret != 0 || res != 0) {
4448                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4449                 talloc_free(tmp_ctx);
4450                 return -1;
4451         }
4452
4453         talloc_free(tmp_ctx);
4454
4455         return 0;
4456 }
4457
4458 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4459 {
4460         int ret;
4461         int32_t res;
4462         TDB_DATA data;
4463         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4464
4465         data.dptr = (uint8_t*)&db_id;
4466         data.dsize = sizeof(db_id);
4467
4468         ret = ctdb_control(ctdb, destnode, 0, 
4469                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4470                            tmp_ctx, NULL, &res, &timeout, NULL);
4471         if (ret != 0 || res < 0) {
4472                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4473                 talloc_free(tmp_ctx);
4474                 return -1;
4475         }
4476
4477         if (priority) {
4478                 *priority = res;
4479         }
4480
4481         talloc_free(tmp_ctx);
4482
4483         return 0;
4484 }
4485
4486 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4487 {
4488         int ret;
4489         TDB_DATA outdata;
4490         int32_t res;
4491
4492         ret = ctdb_control(ctdb, destnode, 0, 
4493                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4494                            mem_ctx, &outdata, &res, &timeout, NULL);
4495         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4497                 return -1;
4498         }
4499
4500         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4501         talloc_free(outdata.dptr);
4502                     
4503         return 0;
4504 }
4505
4506 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4507 {
4508         if (h == NULL) {
4509                 return NULL;
4510         }
4511
4512         return &h->header;
4513 }
4514
4515
4516 struct ctdb_client_control_state *
4517 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4518 {
4519         struct ctdb_client_control_state *handle;
4520         struct ctdb_marshall_buffer *m;
4521         struct ctdb_rec_data *rec;
4522         TDB_DATA outdata;
4523
4524         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4525         if (m == NULL) {
4526                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4527                 return NULL;
4528         }
4529
4530         m->db_id = ctdb_db->db_id;
4531
4532         rec = ctdb_marshall_record(m, 0, key, header, data);
4533         if (rec == NULL) {
4534                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4535                 talloc_free(m);
4536                 return NULL;
4537         }
4538         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4539         if (m == NULL) {
4540                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4541                 talloc_free(m);
4542                 return NULL;
4543         }
4544         m->count++;
4545         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4546
4547
4548         outdata.dptr = (uint8_t *)m;
4549         outdata.dsize = talloc_get_size(m);
4550
4551         handle = ctdb_control_send(ctdb, destnode, 0, 
4552                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4553                            mem_ctx, &timeout, NULL);
4554         talloc_free(m);
4555         return handle;
4556 }
4557
4558 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4559 {
4560         int ret;
4561         int32_t res;
4562
4563         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4564         if ( (ret != 0) || (res != 0) ){
4565                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4566                 return -1;
4567         }
4568
4569         return 0;
4570 }
4571
4572 int
4573 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4574 {
4575         struct ctdb_client_control_state *state;
4576
4577         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4578         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4579 }
4580
4581
4582
4583
4584
4585
4586 /*
4587   set a database to be readonly
4588  */
4589 struct ctdb_client_control_state *
4590 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4591 {
4592         TDB_DATA data;
4593
4594         data.dptr = (uint8_t *)&dbid;
4595         data.dsize = sizeof(dbid);
4596
4597         return ctdb_control_send(ctdb, destnode, 0, 
4598                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4599                            ctdb, NULL, NULL);
4600 }
4601
4602 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4603 {
4604         int ret;
4605         int32_t res;
4606
4607         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4608         if (ret != 0 || res != 0) {
4609           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4610                 return -1;
4611         }
4612
4613         return 0;
4614 }
4615
4616 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4617 {
4618         struct ctdb_client_control_state *state;
4619
4620         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4621         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4622 }