Merge remote branch 'amitay/tdb-sync'
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data && updatetdb) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect with exponential backoff, thanks Stevens
257 */
258 #define CONNECT_MAXSLEEP 64
259 static int ctdb_connect_retry(struct ctdb_context *ctdb)
260 {
261         struct sockaddr_un addr;
262         int secs;
263         int ret = 0;
264
265         memset(&addr, 0, sizeof(addr));
266         addr.sun_family = AF_UNIX;
267         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
268
269         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
270                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
271                               sizeof(addr));
272                 if ((ret == 0) || (errno != EAGAIN)) {
273                         break;
274                 }
275
276                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
277                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
278                                          strerror(errno), secs));
279                         sleep(secs);
280                 }
281         }
282
283         return ret;
284 }
285
286 /*
287   connect to a unix domain socket
288 */
289 int ctdb_socket_connect(struct ctdb_context *ctdb)
290 {
291         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
292         if (ctdb->daemon.sd == -1) {
293                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
294                 return -1;
295         }
296
297         set_nonblocking(ctdb->daemon.sd);
298         set_close_on_exec(ctdb->daemon.sd);
299
300         if (ctdb_connect_retry(ctdb) == -1) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
302                 close(ctdb->daemon.sd);
303                 ctdb->daemon.sd = -1;
304                 return -1;
305         }
306
307         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
308                                               CTDB_DS_ALIGNMENT, 
309                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
310         return 0;
311 }
312
313
314 struct ctdb_record_handle {
315         struct ctdb_db_context *ctdb_db;
316         TDB_DATA key;
317         TDB_DATA *data;
318         struct ctdb_ltdb_header header;
319 };
320
321
322 /*
323   make a recv call to the local ctdb daemon - called from client context
324
325   This is called when the program wants to wait for a ctdb_call to complete and get the 
326   results. This call will block unless the call has already completed.
327 */
328 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
329 {
330         if (state == NULL) {
331                 return -1;
332         }
333
334         while (state->state < CTDB_CALL_DONE) {
335                 event_loop_once(state->ctdb_db->ctdb->ev);
336         }
337         if (state->state != CTDB_CALL_DONE) {
338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
339                 talloc_free(state);
340                 return -1;
341         }
342
343         if (state->call->reply_data.dsize) {
344                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
345                                                       state->call->reply_data.dptr,
346                                                       state->call->reply_data.dsize);
347                 call->reply_data.dsize = state->call->reply_data.dsize;
348         } else {
349                 call->reply_data.dptr = NULL;
350                 call->reply_data.dsize = 0;
351         }
352         call->status = state->call->status;
353         talloc_free(state);
354
355         return call->status;
356 }
357
358
359
360
361 /*
362   destroy a ctdb_call in client
363 */
364 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
365 {
366         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
367         return 0;
368 }
369
370 /*
371   construct an event driven local ctdb_call
372
373   this is used so that locally processed ctdb_call requests are processed
374   in an event driven manner
375 */
376 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
377                                                                   struct ctdb_call *call,
378                                                                   struct ctdb_ltdb_header *header,
379                                                                   TDB_DATA *data)
380 {
381         struct ctdb_client_call_state *state;
382         struct ctdb_context *ctdb = ctdb_db->ctdb;
383         int ret;
384
385         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
386         CTDB_NO_MEMORY_NULL(ctdb, state);
387         state->call = talloc_zero(state, struct ctdb_call);
388         CTDB_NO_MEMORY_NULL(ctdb, state->call);
389
390         talloc_steal(state, data->dptr);
391
392         state->state   = CTDB_CALL_DONE;
393         *(state->call) = *call;
394         state->ctdb_db = ctdb_db;
395
396         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
397         if (ret != 0) {
398                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
399         }
400
401         return state;
402 }
403
404 /*
405   make a ctdb call to the local daemon - async send. Called from client context.
406
407   This constructs a ctdb_call request and queues it for processing. 
408   This call never blocks.
409 */
410 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
411                                               struct ctdb_call *call)
412 {
413         struct ctdb_client_call_state *state;
414         struct ctdb_context *ctdb = ctdb_db->ctdb;
415         struct ctdb_ltdb_header header;
416         TDB_DATA data;
417         int ret;
418         size_t len;
419         struct ctdb_req_call *c;
420
421         /* if the domain socket is not yet open, open it */
422         if (ctdb->daemon.sd==-1) {
423                 ctdb_socket_connect(ctdb);
424         }
425
426         ret = ctdb_ltdb_lock(ctdb_db, call->key);
427         if (ret != 0) {
428                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
429                 return NULL;
430         }
431
432         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
433
434         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
435                 ret = -1;
436         }
437
438         if (ret == 0 && header.dmaster == ctdb->pnn) {
439                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
440                 talloc_free(data.dptr);
441                 ctdb_ltdb_unlock(ctdb_db, call->key);
442                 return state;
443         }
444
445         ctdb_ltdb_unlock(ctdb_db, call->key);
446         talloc_free(data.dptr);
447
448         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
449         if (state == NULL) {
450                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
451                 return NULL;
452         }
453         state->call = talloc_zero(state, struct ctdb_call);
454         if (state->call == NULL) {
455                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
456                 return NULL;
457         }
458
459         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
460         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
461         if (c == NULL) {
462                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
463                 return NULL;
464         }
465
466         state->reqid     = ctdb_reqid_new(ctdb, state);
467         state->ctdb_db = ctdb_db;
468         talloc_set_destructor(state, ctdb_client_call_destructor);
469
470         c->hdr.reqid     = state->reqid;
471         c->flags         = call->flags;
472         c->db_id         = ctdb_db->db_id;
473         c->callid        = call->call_id;
474         c->hopcount      = 0;
475         c->keylen        = call->key.dsize;
476         c->calldatalen   = call->call_data.dsize;
477         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
478         memcpy(&c->data[call->key.dsize], 
479                call->call_data.dptr, call->call_data.dsize);
480         *(state->call)              = *call;
481         state->call->call_data.dptr = &c->data[call->key.dsize];
482         state->call->key.dptr       = &c->data[0];
483
484         state->state  = CTDB_CALL_WAIT;
485
486
487         ctdb_client_queue_pkt(ctdb, &c->hdr);
488
489         return state;
490 }
491
492
493 /*
494   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
495 */
496 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
497 {
498         struct ctdb_client_call_state *state;
499
500         state = ctdb_call_send(ctdb_db, call);
501         return ctdb_call_recv(state, call);
502 }
503
504
505 /*
506   tell the daemon what messaging srvid we will use, and register the message
507   handler function in the client
508 */
509 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
510                              ctdb_msg_fn_t handler,
511                              void *private_data)
512                                     
513 {
514         int res;
515         int32_t status;
516         
517         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
518                            tdb_null, NULL, NULL, &status, NULL, NULL);
519         if (res != 0 || status != 0) {
520                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
521                 return -1;
522         }
523
524         /* also need to register the handler with our own ctdb structure */
525         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
526 }
527
528 /*
529   tell the daemon we no longer want a srvid
530 */
531 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
532 {
533         int res;
534         int32_t status;
535         
536         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
537                            tdb_null, NULL, NULL, &status, NULL, NULL);
538         if (res != 0 || status != 0) {
539                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
540                 return -1;
541         }
542
543         /* also need to register the handler with our own ctdb structure */
544         ctdb_deregister_message_handler(ctdb, srvid, private_data);
545         return 0;
546 }
547
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         if (res != 0) {
570                 return res;
571         }
572
573         talloc_free(r);
574         return 0;
575 }
576
577
578 /*
579   cancel a ctdb_fetch_lock operation, releasing the lock
580  */
581 static int fetch_lock_destructor(struct ctdb_record_handle *h)
582 {
583         ctdb_ltdb_unlock(h->ctdb_db, h->key);
584         return 0;
585 }
586
587 /*
588   force the migration of a record to this node
589  */
590 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
591 {
592         struct ctdb_call call;
593         ZERO_STRUCT(call);
594         call.call_id = CTDB_NULL_FUNC;
595         call.key = key;
596         call.flags = CTDB_IMMEDIATE_MIGRATION;
597         return ctdb_call(ctdb_db, &call);
598 }
599
600 /*
601   try to fetch a readonly copy of a record
602  */
603 static int
604 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
605 {
606         int ret;
607
608         struct ctdb_call call;
609         ZERO_STRUCT(call);
610
611         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
612         call.call_data.dptr = NULL;
613         call.call_data.dsize = 0;
614         call.key = key;
615         call.flags = CTDB_WANT_READONLY;
616         ret = ctdb_call(ctdb_db, &call);
617
618         if (ret != 0) {
619                 return -1;
620         }
621         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
622                 return -1;
623         }
624
625         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
626         if (*hdr == NULL) {
627                 talloc_free(call.reply_data.dptr);
628                 return -1;
629         }
630
631         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
632         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
633         if (data->dptr == NULL) {
634                 talloc_free(call.reply_data.dptr);
635                 talloc_free(hdr);
636                 return -1;
637         }
638
639         return 0;
640 }
641
642 /*
643   get a lock on a record, and return the records data. Blocks until it gets the lock
644  */
645 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
646                                            TDB_DATA key, TDB_DATA *data)
647 {
648         int ret;
649         struct ctdb_record_handle *h;
650
651         /*
652           procedure is as follows:
653
654           1) get the chain lock. 
655           2) check if we are dmaster
656           3) if we are the dmaster then return handle 
657           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
658              reply from ctdbd
659           5) when we get the reply, goto (1)
660          */
661
662         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
663         if (h == NULL) {
664                 return NULL;
665         }
666
667         h->ctdb_db = ctdb_db;
668         h->key     = key;
669         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
670         if (h->key.dptr == NULL) {
671                 talloc_free(h);
672                 return NULL;
673         }
674         h->data    = data;
675
676         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
677                  (const char *)key.dptr));
678
679 again:
680         /* step 1 - get the chain lock */
681         ret = ctdb_ltdb_lock(ctdb_db, key);
682         if (ret != 0) {
683                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
684                 talloc_free(h);
685                 return NULL;
686         }
687
688         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
689
690         talloc_set_destructor(h, fetch_lock_destructor);
691
692         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
693
694         /* when torturing, ensure we test the remote path */
695         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
696             random() % 5 == 0) {
697                 h->header.dmaster = (uint32_t)-1;
698         }
699
700
701         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
702
703         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
704                 ctdb_ltdb_unlock(ctdb_db, key);
705                 ret = ctdb_client_force_migration(ctdb_db, key);
706                 if (ret != 0) {
707                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
708                         talloc_free(h);
709                         return NULL;
710                 }
711                 goto again;
712         }
713
714         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
715         return h;
716 }
717
718 /*
719   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
720  */
721 struct ctdb_record_handle *
722 ctdb_fetch_readonly_lock(
723         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
724         TDB_DATA key, TDB_DATA *data,
725         int read_only)
726 {
727         int ret;
728         struct ctdb_record_handle *h;
729         struct ctdb_ltdb_header *roheader = NULL;
730
731         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
732         if (h == NULL) {
733                 return NULL;
734         }
735
736         h->ctdb_db = ctdb_db;
737         h->key     = key;
738         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
739         if (h->key.dptr == NULL) {
740                 talloc_free(h);
741                 return NULL;
742         }
743         h->data    = data;
744
745         data->dptr = NULL;
746         data->dsize = 0;
747
748
749 again:
750         talloc_free(roheader);
751         roheader = NULL;
752
753         talloc_free(data->dptr);
754         data->dptr = NULL;
755         data->dsize = 0;
756
757         /* Lock the record/chain */
758         ret = ctdb_ltdb_lock(ctdb_db, key);
759         if (ret != 0) {
760                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
761                 talloc_free(h);
762                 return NULL;
763         }
764
765         talloc_set_destructor(h, fetch_lock_destructor);
766
767         /* Check if record exists yet in the TDB */
768         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
769         if (ret != 0) {
770                 ctdb_ltdb_unlock(ctdb_db, key);
771                 ret = ctdb_client_force_migration(ctdb_db, key);
772                 if (ret != 0) {
773                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
774                         talloc_free(h);
775                         return NULL;
776                 }
777                 goto again;
778         }
779
780         /* if this is a request for read/write and we have delegations
781            we have to revoke all delegations first
782         */
783         if ((read_only == 0) 
784         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
785         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
786                 ctdb_ltdb_unlock(ctdb_db, key);
787                 ret = ctdb_client_force_migration(ctdb_db, key);
788                 if (ret != 0) {
789                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
790                         talloc_free(h);
791                         return NULL;
792                 }
793                 goto again;
794         }
795
796         /* if we are dmaster, just return the handle */
797         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
798                 return h;
799         }
800
801         if (read_only != 0) {
802                 TDB_DATA rodata = {NULL, 0};
803
804                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
805                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
806                         return h;
807                 }
808
809                 ctdb_ltdb_unlock(ctdb_db, key);
810                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
811                 if (ret != 0) {
812                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
813                         ret = ctdb_client_force_migration(ctdb_db, key);
814                         if (ret != 0) {
815                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
816                                 talloc_free(h);
817                                 return NULL;
818                         }
819
820                         goto again;
821                 }
822
823                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 ret = ctdb_ltdb_lock(ctdb_db, key);
835                 if (ret != 0) {
836                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
837                         talloc_free(h);
838                         return NULL;
839                 }
840
841                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
842                 if (ret != 0) {
843                         ctdb_ltdb_unlock(ctdb_db, key);
844
845                         ret = ctdb_client_force_migration(ctdb_db, key);
846                         if (ret != 0) {
847                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
848                                 talloc_free(h);
849                                 return NULL;
850                         }
851
852                         goto again;
853                 }
854
855                 return h;
856         }
857
858         /* we are not dmaster and this was not a request for a readonly lock
859          * so unlock the record, migrate it and try again
860          */
861         ctdb_ltdb_unlock(ctdb_db, key);
862         ret = ctdb_client_force_migration(ctdb_db, key);
863         if (ret != 0) {
864                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
865                 talloc_free(h);
866                 return NULL;
867         }
868         goto again;
869 }
870
871 /*
872   store some data to the record that was locked with ctdb_fetch_lock()
873 */
874 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
875 {
876         if (h->ctdb_db->persistent) {
877                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
878                 return -1;
879         }
880
881         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
882 }
883
884 /*
885   non-locking fetch of a record
886  */
887 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
888                TDB_DATA key, TDB_DATA *data)
889 {
890         struct ctdb_call call;
891         int ret;
892
893         call.call_id = CTDB_FETCH_FUNC;
894         call.call_data.dptr = NULL;
895         call.call_data.dsize = 0;
896         call.key = key;
897
898         ret = ctdb_call(ctdb_db, &call);
899
900         if (ret == 0) {
901                 *data = call.reply_data;
902                 talloc_steal(mem_ctx, data->dptr);
903         }
904
905         return ret;
906 }
907
908
909
910 /*
911    called when a control completes or timesout to invoke the callback
912    function the user provided
913 */
914 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
915         struct timeval t, void *private_data)
916 {
917         struct ctdb_client_control_state *state;
918         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
919         int ret;
920
921         state = talloc_get_type(private_data, struct ctdb_client_control_state);
922         talloc_steal(tmp_ctx, state);
923
924         ret = ctdb_control_recv(state->ctdb, state, state,
925                         NULL, 
926                         NULL, 
927                         NULL);
928         if (ret != 0) {
929                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
930         }
931
932         talloc_free(tmp_ctx);
933 }
934
935 /*
936   called when a CTDB_REPLY_CONTROL packet comes in in the client
937
938   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
939   contains any reply data from the control
940 */
941 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
942                                       struct ctdb_req_header *hdr)
943 {
944         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
945         struct ctdb_client_control_state *state;
946
947         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
948         if (state == NULL) {
949                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
950                 return;
951         }
952
953         if (hdr->reqid != state->reqid) {
954                 /* we found a record  but it was the wrong one */
955                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
956                 return;
957         }
958
959         state->outdata.dptr = c->data;
960         state->outdata.dsize = c->datalen;
961         state->status = c->status;
962         if (c->errorlen) {
963                 state->errormsg = talloc_strndup(state, 
964                                                  (char *)&c->data[c->datalen], 
965                                                  c->errorlen);
966         }
967
968         /* state->outdata now uses resources from c so we dont want c
969            to just dissappear from under us while state is still alive
970         */
971         talloc_steal(state, c);
972
973         state->state = CTDB_CONTROL_DONE;
974
975         /* if we had a callback registered for this control, pull the response
976            and call the callback.
977         */
978         if (state->async.fn) {
979                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
980         }
981 }
982
983
984 /*
985   destroy a ctdb_control in client
986 */
987 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
988 {
989         ctdb_reqid_remove(state->ctdb, state->reqid);
990         return 0;
991 }
992
993
994 /* time out handler for ctdb_control */
995 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
996         struct timeval t, void *private_data)
997 {
998         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
999
1000         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1001                          "dstnode:%u\n", state->reqid, state->c->opcode,
1002                          state->c->hdr.destnode));
1003
1004         state->state = CTDB_CONTROL_TIMEOUT;
1005
1006         /* if we had a callback registered for this control, pull the response
1007            and call the callback.
1008         */
1009         if (state->async.fn) {
1010                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1011         }
1012 }
1013
1014 /* async version of send control request */
1015 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1016                 uint32_t destnode, uint64_t srvid, 
1017                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1018                 TALLOC_CTX *mem_ctx,
1019                 struct timeval *timeout,
1020                 char **errormsg)
1021 {
1022         struct ctdb_client_control_state *state;
1023         size_t len;
1024         struct ctdb_req_control *c;
1025         int ret;
1026
1027         if (errormsg) {
1028                 *errormsg = NULL;
1029         }
1030
1031         /* if the domain socket is not yet open, open it */
1032         if (ctdb->daemon.sd==-1) {
1033                 ctdb_socket_connect(ctdb);
1034         }
1035
1036         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1037         CTDB_NO_MEMORY_NULL(ctdb, state);
1038
1039         state->ctdb       = ctdb;
1040         state->reqid      = ctdb_reqid_new(ctdb, state);
1041         state->state      = CTDB_CONTROL_WAIT;
1042         state->errormsg   = NULL;
1043
1044         talloc_set_destructor(state, ctdb_client_control_destructor);
1045
1046         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1047         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1048                                len, struct ctdb_req_control);
1049         state->c            = c;        
1050         CTDB_NO_MEMORY_NULL(ctdb, c);
1051         c->hdr.reqid        = state->reqid;
1052         c->hdr.destnode     = destnode;
1053         c->opcode           = opcode;
1054         c->client_id        = 0;
1055         c->flags            = flags;
1056         c->srvid            = srvid;
1057         c->datalen          = data.dsize;
1058         if (data.dsize) {
1059                 memcpy(&c->data[0], data.dptr, data.dsize);
1060         }
1061
1062         /* timeout */
1063         if (timeout && !timeval_is_zero(timeout)) {
1064                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1065         }
1066
1067         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1068         if (ret != 0) {
1069                 talloc_free(state);
1070                 return NULL;
1071         }
1072
1073         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1074                 talloc_free(state);
1075                 return NULL;
1076         }
1077
1078         return state;
1079 }
1080
1081
1082 /* async version of receive control reply */
1083 int ctdb_control_recv(struct ctdb_context *ctdb, 
1084                 struct ctdb_client_control_state *state, 
1085                 TALLOC_CTX *mem_ctx,
1086                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1087 {
1088         TALLOC_CTX *tmp_ctx;
1089
1090         if (status != NULL) {
1091                 *status = -1;
1092         }
1093         if (errormsg != NULL) {
1094                 *errormsg = NULL;
1095         }
1096
1097         if (state == NULL) {
1098                 return -1;
1099         }
1100
1101         /* prevent double free of state */
1102         tmp_ctx = talloc_new(ctdb);
1103         talloc_steal(tmp_ctx, state);
1104
1105         /* loop one event at a time until we either timeout or the control
1106            completes.
1107         */
1108         while (state->state == CTDB_CONTROL_WAIT) {
1109                 event_loop_once(ctdb->ev);
1110         }
1111
1112         if (state->state != CTDB_CONTROL_DONE) {
1113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1114                 if (state->async.fn) {
1115                         state->async.fn(state);
1116                 }
1117                 talloc_free(tmp_ctx);
1118                 return -1;
1119         }
1120
1121         if (state->errormsg) {
1122                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1123                 if (errormsg) {
1124                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1125                 }
1126                 if (state->async.fn) {
1127                         state->async.fn(state);
1128                 }
1129                 talloc_free(tmp_ctx);
1130                 return -1;
1131         }
1132
1133         if (outdata) {
1134                 *outdata = state->outdata;
1135                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1136         }
1137
1138         if (status) {
1139                 *status = state->status;
1140         }
1141
1142         if (state->async.fn) {
1143                 state->async.fn(state);
1144         }
1145
1146         talloc_free(tmp_ctx);
1147         return 0;
1148 }
1149
1150
1151
1152 /*
1153   send a ctdb control message
1154   timeout specifies how long we should wait for a reply.
1155   if timeout is NULL we wait indefinitely
1156  */
1157 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1158                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1159                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1160                  struct timeval *timeout,
1161                  char **errormsg)
1162 {
1163         struct ctdb_client_control_state *state;
1164
1165         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1166                         flags, data, mem_ctx,
1167                         timeout, errormsg);
1168         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1169                         errormsg);
1170 }
1171
1172
1173
1174
1175 /*
1176   a process exists call. Returns 0 if process exists, -1 otherwise
1177  */
1178 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1179 {
1180         int ret;
1181         TDB_DATA data;
1182         int32_t status;
1183
1184         data.dptr = (uint8_t*)&pid;
1185         data.dsize = sizeof(pid);
1186
1187         ret = ctdb_control(ctdb, destnode, 0, 
1188                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1189                            NULL, NULL, &status, NULL, NULL);
1190         if (ret != 0) {
1191                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1192                 return -1;
1193         }
1194
1195         return status;
1196 }
1197
1198 /*
1199   get remote statistics
1200  */
1201 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1202 {
1203         int ret;
1204         TDB_DATA data;
1205         int32_t res;
1206
1207         ret = ctdb_control(ctdb, destnode, 0, 
1208                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1209                            ctdb, &data, &res, NULL, NULL);
1210         if (ret != 0 || res != 0) {
1211                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1212                 return -1;
1213         }
1214
1215         if (data.dsize != sizeof(struct ctdb_statistics)) {
1216                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1217                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1218                       return -1;
1219         }
1220
1221         *status = *(struct ctdb_statistics *)data.dptr;
1222         talloc_free(data.dptr);
1223                         
1224         return 0;
1225 }
1226
1227 /*
1228   shutdown a remote ctdb node
1229  */
1230 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1231 {
1232         struct ctdb_client_control_state *state;
1233
1234         state = ctdb_control_send(ctdb, destnode, 0, 
1235                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1236                            NULL, &timeout, NULL);
1237         if (state == NULL) {
1238                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1239                 return -1;
1240         }
1241
1242         return 0;
1243 }
1244
1245 /*
1246   get vnn map from a remote node
1247  */
1248 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1249 {
1250         int ret;
1251         TDB_DATA outdata;
1252         int32_t res;
1253         struct ctdb_vnn_map_wire *map;
1254
1255         ret = ctdb_control(ctdb, destnode, 0, 
1256                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1257                            mem_ctx, &outdata, &res, &timeout, NULL);
1258         if (ret != 0 || res != 0) {
1259                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1260                 return -1;
1261         }
1262         
1263         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1264         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1265             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1266                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1267                 return -1;
1268         }
1269
1270         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1271         CTDB_NO_MEMORY(ctdb, *vnnmap);
1272         (*vnnmap)->generation = map->generation;
1273         (*vnnmap)->size       = map->size;
1274         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1275
1276         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1277         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1278         talloc_free(outdata.dptr);
1279                     
1280         return 0;
1281 }
1282
1283
1284 /*
1285   get the recovery mode of a remote node
1286  */
1287 struct ctdb_client_control_state *
1288 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1289 {
1290         return ctdb_control_send(ctdb, destnode, 0, 
1291                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1292                            mem_ctx, &timeout, NULL);
1293 }
1294
1295 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1296 {
1297         int ret;
1298         int32_t res;
1299
1300         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1301         if (ret != 0) {
1302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1303                 return -1;
1304         }
1305
1306         if (recmode) {
1307                 *recmode = (uint32_t)res;
1308         }
1309
1310         return 0;
1311 }
1312
1313 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1314 {
1315         struct ctdb_client_control_state *state;
1316
1317         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1318         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1319 }
1320
1321
1322
1323
1324 /*
1325   set the recovery mode of a remote node
1326  */
1327 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1328 {
1329         int ret;
1330         TDB_DATA data;
1331         int32_t res;
1332
1333         data.dsize = sizeof(uint32_t);
1334         data.dptr = (unsigned char *)&recmode;
1335
1336         ret = ctdb_control(ctdb, destnode, 0, 
1337                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1338                            NULL, NULL, &res, &timeout, NULL);
1339         if (ret != 0 || res != 0) {
1340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1341                 return -1;
1342         }
1343
1344         return 0;
1345 }
1346
1347
1348
1349 /*
1350   get the recovery master of a remote node
1351  */
1352 struct ctdb_client_control_state *
1353 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1354                         struct timeval timeout, uint32_t destnode)
1355 {
1356         return ctdb_control_send(ctdb, destnode, 0, 
1357                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1358                            mem_ctx, &timeout, NULL);
1359 }
1360
1361 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1362 {
1363         int ret;
1364         int32_t res;
1365
1366         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1367         if (ret != 0) {
1368                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1369                 return -1;
1370         }
1371
1372         if (recmaster) {
1373                 *recmaster = (uint32_t)res;
1374         }
1375
1376         return 0;
1377 }
1378
1379 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1380 {
1381         struct ctdb_client_control_state *state;
1382
1383         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1384         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1385 }
1386
1387
1388 /*
1389   set the recovery master of a remote node
1390  */
1391 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1392 {
1393         int ret;
1394         TDB_DATA data;
1395         int32_t res;
1396
1397         ZERO_STRUCT(data);
1398         data.dsize = sizeof(uint32_t);
1399         data.dptr = (unsigned char *)&recmaster;
1400
1401         ret = ctdb_control(ctdb, destnode, 0, 
1402                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1403                            NULL, NULL, &res, &timeout, NULL);
1404         if (ret != 0 || res != 0) {
1405                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1406                 return -1;
1407         }
1408
1409         return 0;
1410 }
1411
1412
1413 /*
1414   get a list of databases off a remote node
1415  */
1416 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1417                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1418 {
1419         int ret;
1420         TDB_DATA outdata;
1421         int32_t res;
1422
1423         ret = ctdb_control(ctdb, destnode, 0, 
1424                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1425                            mem_ctx, &outdata, &res, &timeout, NULL);
1426         if (ret != 0 || res != 0) {
1427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1428                 return -1;
1429         }
1430
1431         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1432         talloc_free(outdata.dptr);
1433                     
1434         return 0;
1435 }
1436
1437 /*
1438   get a list of nodes (vnn and flags ) from a remote node
1439  */
1440 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1441                 struct timeval timeout, uint32_t destnode, 
1442                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1443 {
1444         int ret;
1445         TDB_DATA outdata;
1446         int32_t res;
1447
1448         ret = ctdb_control(ctdb, destnode, 0, 
1449                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1450                            mem_ctx, &outdata, &res, &timeout, NULL);
1451         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1453                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1454         }
1455         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1457                 return -1;
1458         }
1459
1460         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1461         talloc_free(outdata.dptr);
1462                     
1463         return 0;
1464 }
1465
1466 /*
1467   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1468  */
1469 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1470                 struct timeval timeout, uint32_t destnode, 
1471                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1472 {
1473         int ret, i, len;
1474         TDB_DATA outdata;
1475         struct ctdb_node_mapv4 *nodemapv4;
1476         int32_t res;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1480                            mem_ctx, &outdata, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1483                 return -1;
1484         }
1485
1486         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1487
1488         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1489         (*nodemap) = talloc_zero_size(mem_ctx, len);
1490         CTDB_NO_MEMORY(ctdb, (*nodemap));
1491
1492         (*nodemap)->num = nodemapv4->num;
1493         for (i=0; i<nodemapv4->num; i++) {
1494                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1495                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1496                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1497                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1498         }
1499                 
1500         talloc_free(outdata.dptr);
1501                     
1502         return 0;
1503 }
1504
1505 /*
1506   drop the transport, reload the nodes file and restart the transport
1507  */
1508 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1509                     struct timeval timeout, uint32_t destnode)
1510 {
1511         int ret;
1512         int32_t res;
1513
1514         ret = ctdb_control(ctdb, destnode, 0, 
1515                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1516                            NULL, NULL, &res, &timeout, NULL);
1517         if (ret != 0 || res != 0) {
1518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1519                 return -1;
1520         }
1521
1522         return 0;
1523 }
1524
1525
1526 /*
1527   set vnn map on a node
1528  */
1529 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1530                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1531 {
1532         int ret;
1533         TDB_DATA data;
1534         int32_t res;
1535         struct ctdb_vnn_map_wire *map;
1536         size_t len;
1537
1538         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1539         map = talloc_size(mem_ctx, len);
1540         CTDB_NO_MEMORY(ctdb, map);
1541
1542         map->generation = vnnmap->generation;
1543         map->size = vnnmap->size;
1544         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1545         
1546         data.dsize = len;
1547         data.dptr  = (uint8_t *)map;
1548
1549         ret = ctdb_control(ctdb, destnode, 0, 
1550                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1551                            NULL, NULL, &res, &timeout, NULL);
1552         if (ret != 0 || res != 0) {
1553                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1554                 return -1;
1555         }
1556
1557         talloc_free(map);
1558
1559         return 0;
1560 }
1561
1562
1563 /*
1564   async send for pull database
1565  */
1566 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1567         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1568         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1569 {
1570         TDB_DATA indata;
1571         struct ctdb_control_pulldb *pull;
1572         struct ctdb_client_control_state *state;
1573
1574         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1575         CTDB_NO_MEMORY_NULL(ctdb, pull);
1576
1577         pull->db_id   = dbid;
1578         pull->lmaster = lmaster;
1579
1580         indata.dsize = sizeof(struct ctdb_control_pulldb);
1581         indata.dptr  = (unsigned char *)pull;
1582
1583         state = ctdb_control_send(ctdb, destnode, 0, 
1584                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1585                                   mem_ctx, &timeout, NULL);
1586         talloc_free(pull);
1587
1588         return state;
1589 }
1590
1591 /*
1592   async recv for pull database
1593  */
1594 int ctdb_ctrl_pulldb_recv(
1595         struct ctdb_context *ctdb, 
1596         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1597         TDB_DATA *outdata)
1598 {
1599         int ret;
1600         int32_t res;
1601
1602         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1603         if ( (ret != 0) || (res != 0) ){
1604                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1605                 return -1;
1606         }
1607
1608         return 0;
1609 }
1610
1611 /*
1612   pull all keys and records for a specific database on a node
1613  */
1614 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1615                 uint32_t dbid, uint32_t lmaster, 
1616                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1617                 TDB_DATA *outdata)
1618 {
1619         struct ctdb_client_control_state *state;
1620
1621         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1622                                       timeout);
1623         
1624         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1625 }
1626
1627
1628 /*
1629   change dmaster for all keys in the database to the new value
1630  */
1631 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1632                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1633 {
1634         int ret;
1635         TDB_DATA indata;
1636         int32_t res;
1637
1638         indata.dsize = 2*sizeof(uint32_t);
1639         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1640
1641         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1642         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1643
1644         ret = ctdb_control(ctdb, destnode, 0, 
1645                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1646                            NULL, NULL, &res, &timeout, NULL);
1647         if (ret != 0 || res != 0) {
1648                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1649                 return -1;
1650         }
1651
1652         return 0;
1653 }
1654
1655 /*
1656   ping a node, return number of clients connected
1657  */
1658 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1659 {
1660         int ret;
1661         int32_t res;
1662
1663         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1664                            tdb_null, NULL, NULL, &res, NULL, NULL);
1665         if (ret != 0) {
1666                 return -1;
1667         }
1668         return res;
1669 }
1670
1671 /*
1672   find the real path to a ltdb 
1673  */
1674 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1675                    const char **path)
1676 {
1677         int ret;
1678         int32_t res;
1679         TDB_DATA data;
1680
1681         data.dptr = (uint8_t *)&dbid;
1682         data.dsize = sizeof(dbid);
1683
1684         ret = ctdb_control(ctdb, destnode, 0, 
1685                            CTDB_CONTROL_GETDBPATH, 0, data, 
1686                            mem_ctx, &data, &res, &timeout, NULL);
1687         if (ret != 0 || res != 0) {
1688                 return -1;
1689         }
1690
1691         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1692         if ((*path) == NULL) {
1693                 return -1;
1694         }
1695
1696         talloc_free(data.dptr);
1697
1698         return 0;
1699 }
1700
1701 /*
1702   find the name of a db 
1703  */
1704 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1705                    const char **name)
1706 {
1707         int ret;
1708         int32_t res;
1709         TDB_DATA data;
1710
1711         data.dptr = (uint8_t *)&dbid;
1712         data.dsize = sizeof(dbid);
1713
1714         ret = ctdb_control(ctdb, destnode, 0, 
1715                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1716                            mem_ctx, &data, &res, &timeout, NULL);
1717         if (ret != 0 || res != 0) {
1718                 return -1;
1719         }
1720
1721         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1722         if ((*name) == NULL) {
1723                 return -1;
1724         }
1725
1726         talloc_free(data.dptr);
1727
1728         return 0;
1729 }
1730
1731 /*
1732   get the health status of a db
1733  */
1734 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1735                           struct timeval timeout,
1736                           uint32_t destnode,
1737                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1738                           const char **reason)
1739 {
1740         int ret;
1741         int32_t res;
1742         TDB_DATA data;
1743
1744         data.dptr = (uint8_t *)&dbid;
1745         data.dsize = sizeof(dbid);
1746
1747         ret = ctdb_control(ctdb, destnode, 0,
1748                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1749                            mem_ctx, &data, &res, &timeout, NULL);
1750         if (ret != 0 || res != 0) {
1751                 return -1;
1752         }
1753
1754         if (data.dsize == 0) {
1755                 (*reason) = NULL;
1756                 return 0;
1757         }
1758
1759         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1760         if ((*reason) == NULL) {
1761                 return -1;
1762         }
1763
1764         talloc_free(data.dptr);
1765
1766         return 0;
1767 }
1768
1769 /*
1770   create a database
1771  */
1772 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1773                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1774 {
1775         int ret;
1776         int32_t res;
1777         TDB_DATA data;
1778
1779         data.dptr = discard_const(name);
1780         data.dsize = strlen(name)+1;
1781
1782         ret = ctdb_control(ctdb, destnode, 0, 
1783                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1784                            0, data, 
1785                            mem_ctx, &data, &res, &timeout, NULL);
1786
1787         if (ret != 0 || res != 0) {
1788                 return -1;
1789         }
1790
1791         return 0;
1792 }
1793
1794 /*
1795   get debug level on a node
1796  */
1797 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1798 {
1799         int ret;
1800         int32_t res;
1801         TDB_DATA data;
1802
1803         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1804                            ctdb, &data, &res, NULL, NULL);
1805         if (ret != 0 || res != 0) {
1806                 return -1;
1807         }
1808         if (data.dsize != sizeof(int32_t)) {
1809                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1810                          (unsigned)data.dsize));
1811                 return -1;
1812         }
1813         *level = *(int32_t *)data.dptr;
1814         talloc_free(data.dptr);
1815         return 0;
1816 }
1817
1818 /*
1819   set debug level on a node
1820  */
1821 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1822 {
1823         int ret;
1824         int32_t res;
1825         TDB_DATA data;
1826
1827         data.dptr = (uint8_t *)&level;
1828         data.dsize = sizeof(level);
1829
1830         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1831                            NULL, NULL, &res, NULL, NULL);
1832         if (ret != 0 || res != 0) {
1833                 return -1;
1834         }
1835         return 0;
1836 }
1837
1838
1839 /*
1840   get a list of connected nodes
1841  */
1842 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1843                                 struct timeval timeout,
1844                                 TALLOC_CTX *mem_ctx,
1845                                 uint32_t *num_nodes)
1846 {
1847         struct ctdb_node_map *map=NULL;
1848         int ret, i;
1849         uint32_t *nodes;
1850
1851         *num_nodes = 0;
1852
1853         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1854         if (ret != 0) {
1855                 return NULL;
1856         }
1857
1858         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1859         if (nodes == NULL) {
1860                 return NULL;
1861         }
1862
1863         for (i=0;i<map->num;i++) {
1864                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1865                         nodes[*num_nodes] = map->nodes[i].pnn;
1866                         (*num_nodes)++;
1867                 }
1868         }
1869
1870         return nodes;
1871 }
1872
1873
1874 /*
1875   reset remote status
1876  */
1877 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1878 {
1879         int ret;
1880         int32_t res;
1881
1882         ret = ctdb_control(ctdb, destnode, 0, 
1883                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1884                            NULL, NULL, &res, NULL, NULL);
1885         if (ret != 0 || res != 0) {
1886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1887                 return -1;
1888         }
1889         return 0;
1890 }
1891
1892 /*
1893   attach to a specific database - client call
1894 */
1895 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1896                                     struct timeval timeout,
1897                                     const char *name,
1898                                     bool persistent,
1899                                     uint32_t tdb_flags)
1900 {
1901         struct ctdb_db_context *ctdb_db;
1902         TDB_DATA data;
1903         int ret;
1904         int32_t res;
1905
1906         ctdb_db = ctdb_db_handle(ctdb, name);
1907         if (ctdb_db) {
1908                 return ctdb_db;
1909         }
1910
1911         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1912         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1913
1914         ctdb_db->ctdb = ctdb;
1915         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1916         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1917
1918         data.dptr = discard_const(name);
1919         data.dsize = strlen(name)+1;
1920
1921         /* tell ctdb daemon to attach */
1922         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1923                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1924                            0, data, ctdb_db, &data, &res, NULL, NULL);
1925         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1926                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1927                 talloc_free(ctdb_db);
1928                 return NULL;
1929         }
1930         
1931         ctdb_db->db_id = *(uint32_t *)data.dptr;
1932         talloc_free(data.dptr);
1933
1934         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1935         if (ret != 0) {
1936                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1937                 talloc_free(ctdb_db);
1938                 return NULL;
1939         }
1940
1941         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1942         if (ctdb->valgrinding) {
1943                 tdb_flags |= TDB_NOMMAP;
1944         }
1945         tdb_flags |= TDB_DISALLOW_NESTING;
1946
1947         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1948         if (ctdb_db->ltdb == NULL) {
1949                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1950                 talloc_free(ctdb_db);
1951                 return NULL;
1952         }
1953
1954         ctdb_db->persistent = persistent;
1955
1956         DLIST_ADD(ctdb->db_list, ctdb_db);
1957
1958         /* add well known functions */
1959         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1960         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1961         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1962
1963         return ctdb_db;
1964 }
1965
1966
1967 /*
1968   setup a call for a database
1969  */
1970 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1971 {
1972         struct ctdb_registered_call *call;
1973
1974 #if 0
1975         TDB_DATA data;
1976         int32_t status;
1977         struct ctdb_control_set_call c;
1978         int ret;
1979
1980         /* this is no longer valid with the separate daemon architecture */
1981         c.db_id = ctdb_db->db_id;
1982         c.fn    = fn;
1983         c.id    = id;
1984
1985         data.dptr = (uint8_t *)&c;
1986         data.dsize = sizeof(c);
1987
1988         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1989                            data, NULL, NULL, &status, NULL, NULL);
1990         if (ret != 0 || status != 0) {
1991                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1992                 return -1;
1993         }
1994 #endif
1995
1996         /* also register locally */
1997         call = talloc(ctdb_db, struct ctdb_registered_call);
1998         call->fn = fn;
1999         call->id = id;
2000
2001         DLIST_ADD(ctdb_db->calls, call);        
2002         return 0;
2003 }
2004
2005
2006 struct traverse_state {
2007         bool done;
2008         uint32_t count;
2009         ctdb_traverse_func fn;
2010         void *private_data;
2011         bool listemptyrecords;
2012 };
2013
2014 /*
2015   called on each key during a ctdb_traverse
2016  */
2017 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2018 {
2019         struct traverse_state *state = (struct traverse_state *)p;
2020         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2021         TDB_DATA key;
2022
2023         if (data.dsize < sizeof(uint32_t) ||
2024             d->length != data.dsize) {
2025                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2026                 state->done = True;
2027                 return;
2028         }
2029
2030         key.dsize = d->keylen;
2031         key.dptr  = &d->data[0];
2032         data.dsize = d->datalen;
2033         data.dptr = &d->data[d->keylen];
2034
2035         if (key.dsize == 0 && data.dsize == 0) {
2036                 /* end of traverse */
2037                 state->done = True;
2038                 return;
2039         }
2040
2041         if (!state->listemptyrecords &&
2042             data.dsize == sizeof(struct ctdb_ltdb_header))
2043         {
2044                 /* empty records are deleted records in ctdb */
2045                 return;
2046         }
2047
2048         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2049                 state->done = True;
2050         }
2051
2052         state->count++;
2053 }
2054
2055 /**
2056  * start a cluster wide traverse, calling the supplied fn on each record
2057  * return the number of records traversed, or -1 on error
2058  *
2059  * Extendet variant with a flag to signal whether empty records should
2060  * be listed.
2061  */
2062 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2063                              ctdb_traverse_func fn,
2064                              bool withemptyrecords,
2065                              void *private_data)
2066 {
2067         TDB_DATA data;
2068         struct ctdb_traverse_start_ext t;
2069         int32_t status;
2070         int ret;
2071         uint64_t srvid = (getpid() | 0xFLL<<60);
2072         struct traverse_state state;
2073
2074         state.done = False;
2075         state.count = 0;
2076         state.private_data = private_data;
2077         state.fn = fn;
2078         state.listemptyrecords = withemptyrecords;
2079
2080         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2083                 return -1;
2084         }
2085
2086         t.db_id = ctdb_db->db_id;
2087         t.srvid = srvid;
2088         t.reqid = 0;
2089         t.withemptyrecords = withemptyrecords;
2090
2091         data.dptr = (uint8_t *)&t;
2092         data.dsize = sizeof(t);
2093
2094         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2095                            data, NULL, NULL, &status, NULL, NULL);
2096         if (ret != 0 || status != 0) {
2097                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2098                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2099                 return -1;
2100         }
2101
2102         while (!state.done) {
2103                 event_loop_once(ctdb_db->ctdb->ev);
2104         }
2105
2106         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2109                 return -1;
2110         }
2111
2112         return state.count;
2113 }
2114
2115 /**
2116  * start a cluster wide traverse, calling the supplied fn on each record
2117  * return the number of records traversed, or -1 on error
2118  *
2119  * Standard version which does not list the empty records:
2120  * These are considered deleted.
2121  */
2122 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2123 {
2124         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2125 }
2126
2127 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2128 /*
2129   called on each key during a catdb
2130  */
2131 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2132 {
2133         int i;
2134         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2135         FILE *f = c->f;
2136         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2137
2138         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2139         for (i=0;i<key.dsize;i++) {
2140                 if (ISASCII(key.dptr[i])) {
2141                         fprintf(f, "%c", key.dptr[i]);
2142                 } else {
2143                         fprintf(f, "\\%02X", key.dptr[i]);
2144                 }
2145         }
2146         fprintf(f, "\"\n");
2147
2148         fprintf(f, "dmaster: %u\n", h->dmaster);
2149         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2150
2151         if (c->printlmaster && ctdb->vnn_map != NULL) {
2152                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2153         }
2154
2155         if (c->printhash) {
2156                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2157         }
2158
2159         if (c->printrecordflags) {
2160                 fprintf(f, "flags: 0x%08x", h->flags);
2161                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2162                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2163                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2164                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2165                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2166                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2167                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2168                 fprintf(f, "\n");
2169         }
2170
2171         if (c->printdatasize) {
2172                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2173         } else {
2174                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2175                 for (i=sizeof(*h);i<data.dsize;i++) {
2176                         if (ISASCII(data.dptr[i])) {
2177                                 fprintf(f, "%c", data.dptr[i]);
2178                         } else {
2179                                 fprintf(f, "\\%02X", data.dptr[i]);
2180                         }
2181                 }
2182                 fprintf(f, "\"\n");
2183         }
2184
2185         fprintf(f, "\n");
2186
2187         return 0;
2188 }
2189
2190 /*
2191   convenience function to list all keys to stdout
2192  */
2193 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2194                  struct ctdb_dump_db_context *ctx)
2195 {
2196         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2197                                  ctx->printemptyrecords, ctx);
2198 }
2199
2200 /*
2201   get the pid of a ctdb daemon
2202  */
2203 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2204 {
2205         int ret;
2206         int32_t res;
2207
2208         ret = ctdb_control(ctdb, destnode, 0, 
2209                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2210                            NULL, NULL, &res, &timeout, NULL);
2211         if (ret != 0) {
2212                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2213                 return -1;
2214         }
2215
2216         *pid = res;
2217
2218         return 0;
2219 }
2220
2221
2222 /*
2223   async freeze send control
2224  */
2225 struct ctdb_client_control_state *
2226 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2227 {
2228         return ctdb_control_send(ctdb, destnode, priority, 
2229                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2230                            mem_ctx, &timeout, NULL);
2231 }
2232
2233 /* 
2234    async freeze recv control
2235 */
2236 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2237 {
2238         int ret;
2239         int32_t res;
2240
2241         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2242         if ( (ret != 0) || (res != 0) ){
2243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2244                 return -1;
2245         }
2246
2247         return 0;
2248 }
2249
2250 /*
2251   freeze databases of a certain priority
2252  */
2253 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2254 {
2255         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2256         struct ctdb_client_control_state *state;
2257         int ret;
2258
2259         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2260         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2261         talloc_free(tmp_ctx);
2262
2263         return ret;
2264 }
2265
2266 /* Freeze all databases */
2267 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2268 {
2269         int i;
2270
2271         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2272                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2273                         return -1;
2274                 }
2275         }
2276         return 0;
2277 }
2278
2279 /*
2280   thaw databases of a certain priority
2281  */
2282 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2283 {
2284         int ret;
2285         int32_t res;
2286
2287         ret = ctdb_control(ctdb, destnode, priority, 
2288                            CTDB_CONTROL_THAW, 0, tdb_null, 
2289                            NULL, NULL, &res, &timeout, NULL);
2290         if (ret != 0 || res != 0) {
2291                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2292                 return -1;
2293         }
2294
2295         return 0;
2296 }
2297
2298 /* thaw all databases */
2299 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2300 {
2301         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2302 }
2303
2304 /*
2305   get pnn of a node, or -1
2306  */
2307 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2308 {
2309         int ret;
2310         int32_t res;
2311
2312         ret = ctdb_control(ctdb, destnode, 0, 
2313                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2314                            NULL, NULL, &res, &timeout, NULL);
2315         if (ret != 0) {
2316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2317                 return -1;
2318         }
2319
2320         return res;
2321 }
2322
2323 /*
2324   get the monitoring mode of a remote node
2325  */
2326 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2327 {
2328         int ret;
2329         int32_t res;
2330
2331         ret = ctdb_control(ctdb, destnode, 0, 
2332                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2333                            NULL, NULL, &res, &timeout, NULL);
2334         if (ret != 0) {
2335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2336                 return -1;
2337         }
2338
2339         *monmode = res;
2340
2341         return 0;
2342 }
2343
2344
2345 /*
2346  set the monitoring mode of a remote node to active
2347  */
2348 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2349 {
2350         int ret;
2351         
2352
2353         ret = ctdb_control(ctdb, destnode, 0, 
2354                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2355                            NULL, NULL,NULL, &timeout, NULL);
2356         if (ret != 0) {
2357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2358                 return -1;
2359         }
2360
2361         
2362
2363         return 0;
2364 }
2365
2366 /*
2367   set the monitoring mode of a remote node to disable
2368  */
2369 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2370 {
2371         int ret;
2372         
2373
2374         ret = ctdb_control(ctdb, destnode, 0, 
2375                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2376                            NULL, NULL, NULL, &timeout, NULL);
2377         if (ret != 0) {
2378                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2379                 return -1;
2380         }
2381
2382         
2383
2384         return 0;
2385 }
2386
2387
2388
2389 /* 
2390   sent to a node to make it take over an ip address
2391 */
2392 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2393                           uint32_t destnode, struct ctdb_public_ip *ip)
2394 {
2395         TDB_DATA data;
2396         struct ctdb_public_ipv4 ipv4;
2397         int ret;
2398         int32_t res;
2399
2400         if (ip->addr.sa.sa_family == AF_INET) {
2401                 ipv4.pnn = ip->pnn;
2402                 ipv4.sin = ip->addr.ip;
2403
2404                 data.dsize = sizeof(ipv4);
2405                 data.dptr  = (uint8_t *)&ipv4;
2406
2407                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2408                            NULL, &res, &timeout, NULL);
2409         } else {
2410                 data.dsize = sizeof(*ip);
2411                 data.dptr  = (uint8_t *)ip;
2412
2413                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2414                            NULL, &res, &timeout, NULL);
2415         }
2416
2417         if (ret != 0 || res != 0) {
2418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2419                 return -1;
2420         }
2421
2422         return 0;       
2423 }
2424
2425
2426 /* 
2427   sent to a node to make it release an ip address
2428 */
2429 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2430                          uint32_t destnode, struct ctdb_public_ip *ip)
2431 {
2432         TDB_DATA data;
2433         struct ctdb_public_ipv4 ipv4;
2434         int ret;
2435         int32_t res;
2436
2437         if (ip->addr.sa.sa_family == AF_INET) {
2438                 ipv4.pnn = ip->pnn;
2439                 ipv4.sin = ip->addr.ip;
2440
2441                 data.dsize = sizeof(ipv4);
2442                 data.dptr  = (uint8_t *)&ipv4;
2443
2444                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2445                                    NULL, &res, &timeout, NULL);
2446         } else {
2447                 data.dsize = sizeof(*ip);
2448                 data.dptr  = (uint8_t *)ip;
2449
2450                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2451                                    NULL, &res, &timeout, NULL);
2452         }
2453
2454         if (ret != 0 || res != 0) {
2455                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2456                 return -1;
2457         }
2458
2459         return 0;       
2460 }
2461
2462
2463 /*
2464   get a tunable
2465  */
2466 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2467                           struct timeval timeout, 
2468                           uint32_t destnode,
2469                           const char *name, uint32_t *value)
2470 {
2471         struct ctdb_control_get_tunable *t;
2472         TDB_DATA data, outdata;
2473         int32_t res;
2474         int ret;
2475
2476         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2477         data.dptr  = talloc_size(ctdb, data.dsize);
2478         CTDB_NO_MEMORY(ctdb, data.dptr);
2479
2480         t = (struct ctdb_control_get_tunable *)data.dptr;
2481         t->length = strlen(name)+1;
2482         memcpy(t->name, name, t->length);
2483
2484         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2485                            &outdata, &res, &timeout, NULL);
2486         talloc_free(data.dptr);
2487         if (ret != 0 || res != 0) {
2488                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2489                 return -1;
2490         }
2491
2492         if (outdata.dsize != sizeof(uint32_t)) {
2493                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2494                 talloc_free(outdata.dptr);
2495                 return -1;
2496         }
2497         
2498         *value = *(uint32_t *)outdata.dptr;
2499         talloc_free(outdata.dptr);
2500
2501         return 0;
2502 }
2503
2504 /*
2505   set a tunable
2506  */
2507 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2508                           struct timeval timeout, 
2509                           uint32_t destnode,
2510                           const char *name, uint32_t value)
2511 {
2512         struct ctdb_control_set_tunable *t;
2513         TDB_DATA data;
2514         int32_t res;
2515         int ret;
2516
2517         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2518         data.dptr  = talloc_size(ctdb, data.dsize);
2519         CTDB_NO_MEMORY(ctdb, data.dptr);
2520
2521         t = (struct ctdb_control_set_tunable *)data.dptr;
2522         t->length = strlen(name)+1;
2523         memcpy(t->name, name, t->length);
2524         t->value = value;
2525
2526         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2527                            NULL, &res, &timeout, NULL);
2528         talloc_free(data.dptr);
2529         if (ret != 0 || res != 0) {
2530                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2531                 return -1;
2532         }
2533
2534         return 0;
2535 }
2536
2537 /*
2538   list tunables
2539  */
2540 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2541                             struct timeval timeout, 
2542                             uint32_t destnode,
2543                             TALLOC_CTX *mem_ctx,
2544                             const char ***list, uint32_t *count)
2545 {
2546         TDB_DATA outdata;
2547         int32_t res;
2548         int ret;
2549         struct ctdb_control_list_tunable *t;
2550         char *p, *s, *ptr;
2551
2552         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2553                            mem_ctx, &outdata, &res, &timeout, NULL);
2554         if (ret != 0 || res != 0) {
2555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2556                 return -1;
2557         }
2558
2559         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2560         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2561             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2562                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2563                 talloc_free(outdata.dptr);
2564                 return -1;              
2565         }
2566         
2567         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2568         CTDB_NO_MEMORY(ctdb, p);
2569
2570         talloc_free(outdata.dptr);
2571         
2572         (*list) = NULL;
2573         (*count) = 0;
2574
2575         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2576                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2577                 CTDB_NO_MEMORY(ctdb, *list);
2578                 (*list)[*count] = talloc_strdup(*list, s);
2579                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2580                 (*count)++;
2581         }
2582
2583         talloc_free(p);
2584
2585         return 0;
2586 }
2587
2588
2589 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2590                                    struct timeval timeout, uint32_t destnode,
2591                                    TALLOC_CTX *mem_ctx,
2592                                    uint32_t flags,
2593                                    struct ctdb_all_public_ips **ips)
2594 {
2595         int ret;
2596         TDB_DATA outdata;
2597         int32_t res;
2598
2599         ret = ctdb_control(ctdb, destnode, 0, 
2600                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2601                            mem_ctx, &outdata, &res, &timeout, NULL);
2602         if (ret == 0 && res == -1) {
2603                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2604                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2605         }
2606         if (ret != 0 || res != 0) {
2607           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2608                 return -1;
2609         }
2610
2611         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2612         talloc_free(outdata.dptr);
2613                     
2614         return 0;
2615 }
2616
2617 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2618                              struct timeval timeout, uint32_t destnode,
2619                              TALLOC_CTX *mem_ctx,
2620                              struct ctdb_all_public_ips **ips)
2621 {
2622         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2623                                               destnode, mem_ctx,
2624                                               0, ips);
2625 }
2626
2627 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2628                         struct timeval timeout, uint32_t destnode, 
2629                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2630 {
2631         int ret, i, len;
2632         TDB_DATA outdata;
2633         int32_t res;
2634         struct ctdb_all_public_ipsv4 *ipsv4;
2635
2636         ret = ctdb_control(ctdb, destnode, 0, 
2637                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2638                            mem_ctx, &outdata, &res, &timeout, NULL);
2639         if (ret != 0 || res != 0) {
2640                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2641                 return -1;
2642         }
2643
2644         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2645         len = offsetof(struct ctdb_all_public_ips, ips) +
2646                 ipsv4->num*sizeof(struct ctdb_public_ip);
2647         *ips = talloc_zero_size(mem_ctx, len);
2648         CTDB_NO_MEMORY(ctdb, *ips);
2649         (*ips)->num = ipsv4->num;
2650         for (i=0; i<ipsv4->num; i++) {
2651                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2652                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2653         }
2654
2655         talloc_free(outdata.dptr);
2656                     
2657         return 0;
2658 }
2659
2660 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2661                                  struct timeval timeout, uint32_t destnode,
2662                                  TALLOC_CTX *mem_ctx,
2663                                  const ctdb_sock_addr *addr,
2664                                  struct ctdb_control_public_ip_info **_info)
2665 {
2666         int ret;
2667         TDB_DATA indata;
2668         TDB_DATA outdata;
2669         int32_t res;
2670         struct ctdb_control_public_ip_info *info;
2671         uint32_t len;
2672         uint32_t i;
2673
2674         indata.dptr = discard_const_p(uint8_t, addr);
2675         indata.dsize = sizeof(*addr);
2676
2677         ret = ctdb_control(ctdb, destnode, 0,
2678                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2679                            mem_ctx, &outdata, &res, &timeout, NULL);
2680         if (ret != 0 || res != 0) {
2681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2682                                 "failed ret:%d res:%d\n",
2683                                 ret, res));
2684                 return -1;
2685         }
2686
2687         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2688         if (len > outdata.dsize) {
2689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2690                                 "returned invalid data with size %u > %u\n",
2691                                 (unsigned int)outdata.dsize,
2692                                 (unsigned int)len));
2693                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2694                 return -1;
2695         }
2696
2697         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2698         len += info->num*sizeof(struct ctdb_control_iface_info);
2699
2700         if (len > outdata.dsize) {
2701                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2702                                 "returned invalid data with size %u > %u\n",
2703                                 (unsigned int)outdata.dsize,
2704                                 (unsigned int)len));
2705                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2706                 return -1;
2707         }
2708
2709         /* make sure we null terminate the returned strings */
2710         for (i=0; i < info->num; i++) {
2711                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2712         }
2713
2714         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2715                                                                 outdata.dptr,
2716                                                                 outdata.dsize);
2717         talloc_free(outdata.dptr);
2718         if (*_info == NULL) {
2719                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2720                                 "talloc_memdup size %u failed\n",
2721                                 (unsigned int)outdata.dsize));
2722                 return -1;
2723         }
2724
2725         return 0;
2726 }
2727
2728 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2729                          struct timeval timeout, uint32_t destnode,
2730                          TALLOC_CTX *mem_ctx,
2731                          struct ctdb_control_get_ifaces **_ifaces)
2732 {
2733         int ret;
2734         TDB_DATA outdata;
2735         int32_t res;
2736         struct ctdb_control_get_ifaces *ifaces;
2737         uint32_t len;
2738         uint32_t i;
2739
2740         ret = ctdb_control(ctdb, destnode, 0,
2741                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2742                            mem_ctx, &outdata, &res, &timeout, NULL);
2743         if (ret != 0 || res != 0) {
2744                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2745                                 "failed ret:%d res:%d\n",
2746                                 ret, res));
2747                 return -1;
2748         }
2749
2750         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2751         if (len > outdata.dsize) {
2752                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2753                                 "returned invalid data with size %u > %u\n",
2754                                 (unsigned int)outdata.dsize,
2755                                 (unsigned int)len));
2756                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2757                 return -1;
2758         }
2759
2760         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2761         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2762
2763         if (len > outdata.dsize) {
2764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2765                                 "returned invalid data with size %u > %u\n",
2766                                 (unsigned int)outdata.dsize,
2767                                 (unsigned int)len));
2768                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2769                 return -1;
2770         }
2771
2772         /* make sure we null terminate the returned strings */
2773         for (i=0; i < ifaces->num; i++) {
2774                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2775         }
2776
2777         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2778                                                                   outdata.dptr,
2779                                                                   outdata.dsize);
2780         talloc_free(outdata.dptr);
2781         if (*_ifaces == NULL) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2783                                 "talloc_memdup size %u failed\n",
2784                                 (unsigned int)outdata.dsize));
2785                 return -1;
2786         }
2787
2788         return 0;
2789 }
2790
2791 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2792                              struct timeval timeout, uint32_t destnode,
2793                              TALLOC_CTX *mem_ctx,
2794                              const struct ctdb_control_iface_info *info)
2795 {
2796         int ret;
2797         TDB_DATA indata;
2798         int32_t res;
2799
2800         indata.dptr = discard_const_p(uint8_t, info);
2801         indata.dsize = sizeof(*info);
2802
2803         ret = ctdb_control(ctdb, destnode, 0,
2804                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2805                            mem_ctx, NULL, &res, &timeout, NULL);
2806         if (ret != 0 || res != 0) {
2807                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2808                                 "failed ret:%d res:%d\n",
2809                                 ret, res));
2810                 return -1;
2811         }
2812
2813         return 0;
2814 }
2815
2816 /*
2817   set/clear the permanent disabled bit on a remote node
2818  */
2819 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2820                        uint32_t set, uint32_t clear)
2821 {
2822         int ret;
2823         TDB_DATA data;
2824         struct ctdb_node_map *nodemap=NULL;
2825         struct ctdb_node_flag_change c;
2826         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2827         uint32_t recmaster;
2828         uint32_t *nodes;
2829
2830
2831         /* find the recovery master */
2832         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2833         if (ret != 0) {
2834                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2835                 talloc_free(tmp_ctx);
2836                 return ret;
2837         }
2838
2839
2840         /* read the node flags from the recmaster */
2841         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2842         if (ret != 0) {
2843                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2844                 talloc_free(tmp_ctx);
2845                 return -1;
2846         }
2847         if (destnode >= nodemap->num) {
2848                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2849                 talloc_free(tmp_ctx);
2850                 return -1;
2851         }
2852
2853         c.pnn       = destnode;
2854         c.old_flags = nodemap->nodes[destnode].flags;
2855         c.new_flags = c.old_flags;
2856         c.new_flags |= set;
2857         c.new_flags &= ~clear;
2858
2859         data.dsize = sizeof(c);
2860         data.dptr = (unsigned char *)&c;
2861
2862         /* send the flags update to all connected nodes */
2863         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2864
2865         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2866                                         nodes, 0,
2867                                         timeout, false, data,
2868                                         NULL, NULL,
2869                                         NULL) != 0) {
2870                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2871
2872                 talloc_free(tmp_ctx);
2873                 return -1;
2874         }
2875
2876         talloc_free(tmp_ctx);
2877         return 0;
2878 }
2879
2880
2881 /*
2882   get all tunables
2883  */
2884 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2885                                struct timeval timeout, 
2886                                uint32_t destnode,
2887                                struct ctdb_tunable *tunables)
2888 {
2889         TDB_DATA outdata;
2890         int ret;
2891         int32_t res;
2892
2893         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2894                            &outdata, &res, &timeout, NULL);
2895         if (ret != 0 || res != 0) {
2896                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2897                 return -1;
2898         }
2899
2900         if (outdata.dsize != sizeof(*tunables)) {
2901                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2902                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2903                 return -1;              
2904         }
2905
2906         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2907         talloc_free(outdata.dptr);
2908         return 0;
2909 }
2910
2911 /*
2912   add a public address to a node
2913  */
2914 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2915                       struct timeval timeout, 
2916                       uint32_t destnode,
2917                       struct ctdb_control_ip_iface *pub)
2918 {
2919         TDB_DATA data;
2920         int32_t res;
2921         int ret;
2922
2923         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2924         data.dptr  = (unsigned char *)pub;
2925
2926         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2927                            NULL, &res, &timeout, NULL);
2928         if (ret != 0 || res != 0) {
2929                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2930                 return -1;
2931         }
2932
2933         return 0;
2934 }
2935
2936 /*
2937   delete a public address from a node
2938  */
2939 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2940                       struct timeval timeout, 
2941                       uint32_t destnode,
2942                       struct ctdb_control_ip_iface *pub)
2943 {
2944         TDB_DATA data;
2945         int32_t res;
2946         int ret;
2947
2948         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2949         data.dptr  = (unsigned char *)pub;
2950
2951         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2952                            NULL, &res, &timeout, NULL);
2953         if (ret != 0 || res != 0) {
2954                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2955                 return -1;
2956         }
2957
2958         return 0;
2959 }
2960
2961 /*
2962   kill a tcp connection
2963  */
2964 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2965                       struct timeval timeout, 
2966                       uint32_t destnode,
2967                       struct ctdb_control_killtcp *killtcp)
2968 {
2969         TDB_DATA data;
2970         int32_t res;
2971         int ret;
2972
2973         data.dsize = sizeof(struct ctdb_control_killtcp);
2974         data.dptr  = (unsigned char *)killtcp;
2975
2976         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2977                            NULL, &res, &timeout, NULL);
2978         if (ret != 0 || res != 0) {
2979                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2980                 return -1;
2981         }
2982
2983         return 0;
2984 }
2985
2986 /*
2987   send a gratious arp
2988  */
2989 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2990                       struct timeval timeout, 
2991                       uint32_t destnode,
2992                       ctdb_sock_addr *addr,
2993                       const char *ifname)
2994 {
2995         TDB_DATA data;
2996         int32_t res;
2997         int ret, len;
2998         struct ctdb_control_gratious_arp *gratious_arp;
2999         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3000
3001
3002         len = strlen(ifname)+1;
3003         gratious_arp = talloc_size(tmp_ctx, 
3004                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3005         CTDB_NO_MEMORY(ctdb, gratious_arp);
3006
3007         gratious_arp->addr = *addr;
3008         gratious_arp->len = len;
3009         memcpy(&gratious_arp->iface[0], ifname, len);
3010
3011
3012         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3013         data.dptr  = (unsigned char *)gratious_arp;
3014
3015         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3016                            NULL, &res, &timeout, NULL);
3017         if (ret != 0 || res != 0) {
3018                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3019                 talloc_free(tmp_ctx);
3020                 return -1;
3021         }
3022
3023         talloc_free(tmp_ctx);
3024         return 0;
3025 }
3026
3027 /*
3028   get a list of all tcp tickles that a node knows about for a particular vnn
3029  */
3030 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3031                               struct timeval timeout, uint32_t destnode, 
3032                               TALLOC_CTX *mem_ctx, 
3033                               ctdb_sock_addr *addr,
3034                               struct ctdb_control_tcp_tickle_list **list)
3035 {
3036         int ret;
3037         TDB_DATA data, outdata;
3038         int32_t status;
3039
3040         data.dptr = (uint8_t*)addr;
3041         data.dsize = sizeof(ctdb_sock_addr);
3042
3043         ret = ctdb_control(ctdb, destnode, 0, 
3044                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3045                            mem_ctx, &outdata, &status, NULL, NULL);
3046         if (ret != 0 || status != 0) {
3047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3048                 return -1;
3049         }
3050
3051         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3052
3053         return status;
3054 }
3055
3056 /*
3057   register a server id
3058  */
3059 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3060                       struct timeval timeout, 
3061                       struct ctdb_server_id *id)
3062 {
3063         TDB_DATA data;
3064         int32_t res;
3065         int ret;
3066
3067         data.dsize = sizeof(struct ctdb_server_id);
3068         data.dptr  = (unsigned char *)id;
3069
3070         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3071                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3072                         0, data, NULL,
3073                         NULL, &res, &timeout, NULL);
3074         if (ret != 0 || res != 0) {
3075                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3076                 return -1;
3077         }
3078
3079         return 0;
3080 }
3081
3082 /*
3083   unregister a server id
3084  */
3085 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3086                       struct timeval timeout, 
3087                       struct ctdb_server_id *id)
3088 {
3089         TDB_DATA data;
3090         int32_t res;
3091         int ret;
3092
3093         data.dsize = sizeof(struct ctdb_server_id);
3094         data.dptr  = (unsigned char *)id;
3095
3096         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3097                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3098                         0, data, NULL,
3099                         NULL, &res, &timeout, NULL);
3100         if (ret != 0 || res != 0) {
3101                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3102                 return -1;
3103         }
3104
3105         return 0;
3106 }
3107
3108
3109 /*
3110   check if a server id exists
3111
3112   if a server id does exist, return *status == 1, otherwise *status == 0
3113  */
3114 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3115                       struct timeval timeout, 
3116                       uint32_t destnode,
3117                       struct ctdb_server_id *id,
3118                       uint32_t *status)
3119 {
3120         TDB_DATA data;
3121         int32_t res;
3122         int ret;
3123
3124         data.dsize = sizeof(struct ctdb_server_id);
3125         data.dptr  = (unsigned char *)id;
3126
3127         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3128                         0, data, NULL,
3129                         NULL, &res, &timeout, NULL);
3130         if (ret != 0) {
3131                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3132                 return -1;
3133         }
3134
3135         if (res) {
3136                 *status = 1;
3137         } else {
3138                 *status = 0;
3139         }
3140
3141         return 0;
3142 }
3143
3144 /*
3145    get the list of server ids that are registered on a node
3146 */
3147 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3148                 TALLOC_CTX *mem_ctx,
3149                 struct timeval timeout, uint32_t destnode, 
3150                 struct ctdb_server_id_list **svid_list)
3151 {
3152         int ret;
3153         TDB_DATA outdata;
3154         int32_t res;
3155
3156         ret = ctdb_control(ctdb, destnode, 0, 
3157                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3158                            mem_ctx, &outdata, &res, &timeout, NULL);
3159         if (ret != 0 || res != 0) {
3160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3161                 return -1;
3162         }
3163
3164         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3165                     
3166         return 0;
3167 }
3168
3169 /*
3170   initialise the ctdb daemon for client applications
3171
3172   NOTE: In current code the daemon does not fork. This is for testing purposes only
3173   and to simplify the code.
3174 */
3175 struct ctdb_context *ctdb_init(struct event_context *ev)
3176 {
3177         int ret;
3178         struct ctdb_context *ctdb;
3179
3180         ctdb = talloc_zero(ev, struct ctdb_context);
3181         if (ctdb == NULL) {
3182                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3183                 return NULL;
3184         }
3185         ctdb->ev  = ev;
3186         ctdb->idr = idr_init(ctdb);
3187         /* Wrap early to exercise code. */
3188         ctdb->lastid = INT_MAX-200;
3189         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3190
3191         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3192         if (ret != 0) {
3193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3194                 talloc_free(ctdb);
3195                 return NULL;
3196         }
3197
3198         ctdb->statistics.statistics_start_time = timeval_current();
3199
3200         return ctdb;
3201 }
3202
3203
3204 /*
3205   set some ctdb flags
3206 */
3207 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3208 {
3209         ctdb->flags |= flags;
3210 }
3211
3212 /*
3213   setup the local socket name
3214 */
3215 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3216 {
3217         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3218         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3219
3220         return 0;
3221 }
3222
3223 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3224 {
3225         return ctdb->daemon.name;
3226 }
3227
3228 /*
3229   return the pnn of this node
3230 */
3231 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3232 {
3233         return ctdb->pnn;
3234 }
3235
3236
3237 /*
3238   get the uptime of a remote node
3239  */
3240 struct ctdb_client_control_state *
3241 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3242 {
3243         return ctdb_control_send(ctdb, destnode, 0, 
3244                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3245                            mem_ctx, &timeout, NULL);
3246 }
3247
3248 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3249 {
3250         int ret;
3251         int32_t res;
3252         TDB_DATA outdata;
3253
3254         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3255         if (ret != 0 || res != 0) {
3256                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3257                 return -1;
3258         }
3259
3260         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3261
3262         return 0;
3263 }
3264
3265 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3266 {
3267         struct ctdb_client_control_state *state;
3268
3269         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3270         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3271 }
3272
3273 /*
3274   send a control to execute the "recovered" event script on a node
3275  */
3276 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3277 {
3278         int ret;
3279         int32_t status;
3280
3281         ret = ctdb_control(ctdb, destnode, 0, 
3282                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3283                            NULL, NULL, &status, &timeout, NULL);
3284         if (ret != 0 || status != 0) {
3285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3286                 return -1;
3287         }
3288
3289         return 0;
3290 }
3291
3292 /* 
3293   callback for the async helpers used when sending the same control
3294   to multiple nodes in parallell.
3295 */
3296 static void async_callback(struct ctdb_client_control_state *state)
3297 {
3298         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3299         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3300         int ret;
3301         TDB_DATA outdata;
3302         int32_t res;
3303         uint32_t destnode = state->c->hdr.destnode;
3304
3305         /* one more node has responded with recmode data */
3306         data->count--;
3307
3308         /* if we failed to push the db, then return an error and let
3309            the main loop try again.
3310         */
3311         if (state->state != CTDB_CONTROL_DONE) {
3312                 if ( !data->dont_log_errors) {
3313                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3314                 }
3315                 data->fail_count++;
3316                 if (data->fail_callback) {
3317                         data->fail_callback(ctdb, destnode, res, outdata,
3318                                         data->callback_data);
3319                 }
3320                 return;
3321         }
3322         
3323         state->async.fn = NULL;
3324
3325         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3326         if ((ret != 0) || (res != 0)) {
3327                 if ( !data->dont_log_errors) {
3328                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3329                 }
3330                 data->fail_count++;
3331                 if (data->fail_callback) {
3332                         data->fail_callback(ctdb, destnode, res, outdata,
3333                                         data->callback_data);
3334                 }
3335         }
3336         if ((ret == 0) && (data->callback != NULL)) {
3337                 data->callback(ctdb, destnode, res, outdata,
3338                                         data->callback_data);
3339         }
3340 }
3341
3342
3343 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3344 {
3345         /* set up the callback functions */
3346         state->async.fn = async_callback;
3347         state->async.private_data = data;
3348         
3349         /* one more control to wait for to complete */
3350         data->count++;
3351 }
3352
3353
3354 /* wait for up to the maximum number of seconds allowed
3355    or until all nodes we expect a response from has replied
3356 */
3357 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3358 {
3359         while (data->count > 0) {
3360                 event_loop_once(ctdb->ev);
3361         }
3362         if (data->fail_count != 0) {
3363                 if (!data->dont_log_errors) {
3364                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3365                                  data->fail_count));
3366                 }
3367                 return -1;
3368         }
3369         return 0;
3370 }
3371
3372
3373 /* 
3374    perform a simple control on the listed nodes
3375    The control cannot return data
3376  */
3377 int ctdb_client_async_control(struct ctdb_context *ctdb,
3378                                 enum ctdb_controls opcode,
3379                                 uint32_t *nodes,
3380                                 uint64_t srvid,
3381                                 struct timeval timeout,
3382                                 bool dont_log_errors,
3383                                 TDB_DATA data,
3384                                 client_async_callback client_callback,
3385                                 client_async_callback fail_callback,
3386                                 void *callback_data)
3387 {
3388         struct client_async_data *async_data;
3389         struct ctdb_client_control_state *state;
3390         int j, num_nodes;
3391
3392         async_data = talloc_zero(ctdb, struct client_async_data);
3393         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3394         async_data->dont_log_errors = dont_log_errors;
3395         async_data->callback = client_callback;
3396         async_data->fail_callback = fail_callback;
3397         async_data->callback_data = callback_data;
3398         async_data->opcode        = opcode;
3399
3400         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3401
3402         /* loop over all nodes and send an async control to each of them */
3403         for (j=0; j<num_nodes; j++) {
3404                 uint32_t pnn = nodes[j];
3405
3406                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3407                                           0, data, async_data, &timeout, NULL);
3408                 if (state == NULL) {
3409                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3410                         talloc_free(async_data);
3411                         return -1;
3412                 }
3413                 
3414                 ctdb_client_async_add(async_data, state);
3415         }
3416
3417         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3418                 talloc_free(async_data);
3419                 return -1;
3420         }
3421
3422         talloc_free(async_data);
3423         return 0;
3424 }
3425
3426 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3427                                 struct ctdb_vnn_map *vnn_map,
3428                                 TALLOC_CTX *mem_ctx,
3429                                 bool include_self)
3430 {
3431         int i, j, num_nodes;
3432         uint32_t *nodes;
3433
3434         for (i=num_nodes=0;i<vnn_map->size;i++) {
3435                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3436                         continue;
3437                 }
3438                 num_nodes++;
3439         } 
3440
3441         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3442         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3443
3444         for (i=j=0;i<vnn_map->size;i++) {
3445                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3446                         continue;
3447                 }
3448                 nodes[j++] = vnn_map->map[i];
3449         } 
3450
3451         return nodes;
3452 }
3453
3454 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3455                                 struct ctdb_node_map *node_map,
3456                                 TALLOC_CTX *mem_ctx,
3457                                 bool include_self)
3458 {
3459         int i, j, num_nodes;
3460         uint32_t *nodes;
3461
3462         for (i=num_nodes=0;i<node_map->num;i++) {
3463                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3464                         continue;
3465                 }
3466                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3467                         continue;
3468                 }
3469                 num_nodes++;
3470         } 
3471
3472         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3473         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3474
3475         for (i=j=0;i<node_map->num;i++) {
3476                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3477                         continue;
3478                 }
3479                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3480                         continue;
3481                 }
3482                 nodes[j++] = node_map->nodes[i].pnn;
3483         } 
3484
3485         return nodes;
3486 }
3487
3488 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3489                                 struct ctdb_node_map *node_map,
3490                                 TALLOC_CTX *mem_ctx,
3491                                 uint32_t pnn)
3492 {
3493         int i, j, num_nodes;
3494         uint32_t *nodes;
3495
3496         for (i=num_nodes=0;i<node_map->num;i++) {
3497                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3498                         continue;
3499                 }
3500                 if (node_map->nodes[i].pnn == pnn) {
3501                         continue;
3502                 }
3503                 num_nodes++;
3504         } 
3505
3506         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3507         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3508
3509         for (i=j=0;i<node_map->num;i++) {
3510                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3511                         continue;
3512                 }
3513                 if (node_map->nodes[i].pnn == pnn) {
3514                         continue;
3515                 }
3516                 nodes[j++] = node_map->nodes[i].pnn;
3517         } 
3518
3519         return nodes;
3520 }
3521
3522 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3523                                 struct ctdb_node_map *node_map,
3524                                 TALLOC_CTX *mem_ctx,
3525                                 bool include_self)
3526 {
3527         int i, j, num_nodes;
3528         uint32_t *nodes;
3529
3530         for (i=num_nodes=0;i<node_map->num;i++) {
3531                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3532                         continue;
3533                 }
3534                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3535                         continue;
3536                 }
3537                 num_nodes++;
3538         } 
3539
3540         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3541         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3542
3543         for (i=j=0;i<node_map->num;i++) {
3544                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3545                         continue;
3546                 }
3547                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3548                         continue;
3549                 }
3550                 nodes[j++] = node_map->nodes[i].pnn;
3551         } 
3552
3553         return nodes;
3554 }
3555
3556 /* 
3557   this is used to test if a pnn lock exists and if it exists will return
3558   the number of connections that pnn has reported or -1 if that recovery
3559   daemon is not running.
3560 */
3561 int
3562 ctdb_read_pnn_lock(int fd, int32_t pnn)
3563 {
3564         struct flock lock;
3565         char c;
3566
3567         lock.l_type = F_WRLCK;
3568         lock.l_whence = SEEK_SET;
3569         lock.l_start = pnn;
3570         lock.l_len = 1;
3571         lock.l_pid = 0;
3572
3573         if (fcntl(fd, F_GETLK, &lock) != 0) {
3574                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3575                 return -1;
3576         }
3577
3578         if (lock.l_type == F_UNLCK) {
3579                 return -1;
3580         }
3581
3582         if (pread(fd, &c, 1, pnn) == -1) {
3583                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3584                 return -1;
3585         }
3586
3587         return c;
3588 }
3589
3590 /*
3591   get capabilities of a remote node
3592  */
3593 struct ctdb_client_control_state *
3594 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3595 {
3596         return ctdb_control_send(ctdb, destnode, 0, 
3597                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3598                            mem_ctx, &timeout, NULL);
3599 }
3600
3601 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3602 {
3603         int ret;
3604         int32_t res;
3605         TDB_DATA outdata;
3606
3607         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3608         if ( (ret != 0) || (res != 0) ) {
3609                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3610                 return -1;
3611         }
3612
3613         if (capabilities) {
3614                 *capabilities = *((uint32_t *)outdata.dptr);
3615         }
3616
3617         return 0;
3618 }
3619
3620 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3621 {
3622         struct ctdb_client_control_state *state;
3623         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3624         int ret;
3625
3626         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3627         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3628         talloc_free(tmp_ctx);
3629         return ret;
3630 }
3631
3632 /**
3633  * check whether a transaction is active on a given db on a given node
3634  */
3635 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3636                                      uint32_t destnode,
3637                                      uint32_t db_id)
3638 {
3639         int32_t status;
3640         int ret;
3641         TDB_DATA indata;
3642
3643         indata.dptr = (uint8_t *)&db_id;
3644         indata.dsize = sizeof(db_id);
3645
3646         ret = ctdb_control(ctdb, destnode, 0,
3647                            CTDB_CONTROL_TRANS2_ACTIVE,
3648                            0, indata, NULL, NULL, &status,
3649                            NULL, NULL);
3650
3651         if (ret != 0) {
3652                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3653                 return -1;
3654         }
3655
3656         return status;
3657 }
3658
3659
3660 struct ctdb_transaction_handle {
3661         struct ctdb_db_context *ctdb_db;
3662         bool in_replay;
3663         /*
3664          * we store the reads and writes done under a transaction:
3665          * - one list stores both reads and writes (m_all),
3666          * - the other just writes (m_write)
3667          */
3668         struct ctdb_marshall_buffer *m_all;
3669         struct ctdb_marshall_buffer *m_write;
3670 };
3671
3672 /* start a transaction on a database */
3673 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3674 {
3675         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3676         return 0;
3677 }
3678
3679 /* start a transaction on a database */
3680 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3681 {
3682         struct ctdb_record_handle *rh;
3683         TDB_DATA key;
3684         TDB_DATA data;
3685         struct ctdb_ltdb_header header;
3686         TALLOC_CTX *tmp_ctx;
3687         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3688         int ret;
3689         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3690         pid_t pid;
3691         int32_t status;
3692
3693         key.dptr = discard_const(keyname);
3694         key.dsize = strlen(keyname);
3695
3696         if (!ctdb_db->persistent) {
3697                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3698                 return -1;
3699         }
3700
3701 again:
3702         tmp_ctx = talloc_new(h);
3703
3704         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3705         if (rh == NULL) {
3706                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3707                 talloc_free(tmp_ctx);
3708                 return -1;
3709         }
3710
3711         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3712                                               CTDB_CURRENT_NODE,
3713                                               ctdb_db->db_id);
3714         if (status == 1) {
3715                 unsigned long int usec = (1000 + random()) % 100000;
3716                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3717                                     "on db_id[0x%08x]. waiting for %lu "
3718                                     "microseconds\n",
3719                                     ctdb_db->db_id, usec));
3720                 talloc_free(tmp_ctx);
3721                 usleep(usec);
3722                 goto again;
3723         }
3724
3725         /*
3726          * store the pid in the database:
3727          * it is not enough that the node is dmaster...
3728          */
3729         pid = getpid();
3730         data.dptr = (unsigned char *)&pid;
3731         data.dsize = sizeof(pid_t);
3732         rh->header.rsn++;
3733         rh->header.dmaster = ctdb_db->ctdb->pnn;
3734         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3735         if (ret != 0) {
3736                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3737                                   "transaction record\n"));
3738                 talloc_free(tmp_ctx);
3739                 return -1;
3740         }
3741
3742         talloc_free(rh);
3743
3744         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3745         if (ret != 0) {
3746                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3747                 talloc_free(tmp_ctx);
3748                 return -1;
3749         }
3750
3751         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3752         if (ret != 0) {
3753                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3754                                  "lock record inside transaction\n"));
3755                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3756                 talloc_free(tmp_ctx);
3757                 goto again;
3758         }
3759
3760         if (header.dmaster != ctdb_db->ctdb->pnn) {
3761                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3762                                    "transaction lock record\n"));
3763                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3764                 talloc_free(tmp_ctx);
3765                 goto again;
3766         }
3767
3768         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3769                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3770                                     "the transaction lock record\n"));
3771                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3772                 talloc_free(tmp_ctx);
3773                 goto again;
3774         }
3775
3776         talloc_free(tmp_ctx);
3777
3778         return 0;
3779 }
3780
3781
3782 /* start a transaction on a database */
3783 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3784                                                        TALLOC_CTX *mem_ctx)
3785 {
3786         struct ctdb_transaction_handle *h;
3787         int ret;
3788
3789         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3790         if (h == NULL) {
3791                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3792                 return NULL;
3793         }
3794
3795         h->ctdb_db = ctdb_db;
3796
3797         ret = ctdb_transaction_fetch_start(h);
3798         if (ret != 0) {
3799                 talloc_free(h);
3800                 return NULL;
3801         }
3802
3803         talloc_set_destructor(h, ctdb_transaction_destructor);
3804
3805         return h;
3806 }
3807
3808
3809
3810 /*
3811   fetch a record inside a transaction
3812  */
3813 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3814                            TALLOC_CTX *mem_ctx, 
3815                            TDB_DATA key, TDB_DATA *data)
3816 {
3817         struct ctdb_ltdb_header header;
3818         int ret;
3819
3820         ZERO_STRUCT(header);
3821
3822         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3823         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3824                 /* record doesn't exist yet */
3825                 *data = tdb_null;
3826                 ret = 0;
3827         }
3828         
3829         if (ret != 0) {
3830                 return ret;
3831         }
3832
3833         if (!h->in_replay) {
3834                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3835                 if (h->m_all == NULL) {
3836                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3837                         return -1;
3838                 }
3839         }
3840
3841         return 0;
3842 }
3843
3844 /*
3845   stores a record inside a transaction
3846  */
3847 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3848                            TDB_DATA key, TDB_DATA data)
3849 {
3850         TALLOC_CTX *tmp_ctx = talloc_new(h);
3851         struct ctdb_ltdb_header header;
3852         TDB_DATA olddata;
3853         int ret;
3854
3855         ZERO_STRUCT(header);
3856
3857         /* we need the header so we can update the RSN */
3858         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3859         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3860                 /* the record doesn't exist - create one with us as dmaster.
3861                    This is only safe because we are in a transaction and this
3862                    is a persistent database */
3863                 ZERO_STRUCT(header);
3864         } else if (ret != 0) {
3865                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3866                 talloc_free(tmp_ctx);
3867                 return ret;
3868         }
3869
3870         if (data.dsize == olddata.dsize &&
3871             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3872                 /* save writing the same data */
3873                 talloc_free(tmp_ctx);
3874                 return 0;
3875         }
3876
3877         header.dmaster = h->ctdb_db->ctdb->pnn;
3878         header.rsn++;
3879
3880         if (!h->in_replay) {
3881                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3882                 if (h->m_all == NULL) {
3883                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3884                         talloc_free(tmp_ctx);
3885                         return -1;
3886                 }
3887         }               
3888
3889         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3890         if (h->m_write == NULL) {
3891                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3892                 talloc_free(tmp_ctx);
3893                 return -1;
3894         }
3895         
3896         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3897
3898         talloc_free(tmp_ctx);
3899         
3900         return ret;
3901 }
3902
3903 /*
3904   replay a transaction
3905  */
3906 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3907 {
3908         int ret, i;
3909         struct ctdb_rec_data *rec = NULL;
3910
3911         h->in_replay = true;
3912         talloc_free(h->m_write);
3913         h->m_write = NULL;
3914
3915         ret = ctdb_transaction_fetch_start(h);
3916         if (ret != 0) {
3917                 return ret;
3918         }
3919
3920         for (i=0;i<h->m_all->count;i++) {
3921                 TDB_DATA key, data;
3922
3923                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3924                 if (rec == NULL) {
3925                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3926                         goto failed;
3927                 }
3928
3929                 if (rec->reqid == 0) {
3930                         /* its a store */
3931                         if (ctdb_transaction_store(h, key, data) != 0) {
3932                                 goto failed;
3933                         }
3934                 } else {
3935                         TDB_DATA data2;
3936                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3937
3938                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3939                                 talloc_free(tmp_ctx);
3940                                 goto failed;
3941                         }
3942                         if (data2.dsize != data.dsize ||
3943                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3944                                 /* the record has changed on us - we have to give up */
3945                                 talloc_free(tmp_ctx);
3946                                 goto failed;
3947                         }
3948                         talloc_free(tmp_ctx);
3949                 }
3950         }
3951         
3952         return 0;
3953
3954 failed:
3955         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3956         return -1;
3957 }
3958
3959
3960 /*
3961   commit a transaction
3962  */
3963 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3964 {
3965         int ret, retries=0;
3966         int32_t status;
3967         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3968         struct timeval timeout;
3969         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3970
3971         talloc_set_destructor(h, NULL);
3972
3973         /* our commit strategy is quite complex.
3974
3975            - we first try to commit the changes to all other nodes
3976
3977            - if that works, then we commit locally and we are done
3978
3979            - if a commit on another node fails, then we need to cancel
3980              the transaction, then restart the transaction (thus
3981              opening a window of time for a pending recovery to
3982              complete), then replay the transaction, checking all the
3983              reads and writes (checking that reads give the same data,
3984              and writes succeed). Then we retry the transaction to the
3985              other nodes
3986         */
3987
3988 again:
3989         if (h->m_write == NULL) {
3990                 /* no changes were made */
3991                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3992                 talloc_free(h);
3993                 return 0;
3994         }
3995
3996         /* tell ctdbd to commit to the other nodes */
3997         timeout = timeval_current_ofs(1, 0);
3998         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3999                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
4000                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
4001                            &timeout, NULL);
4002         if (ret != 0 || status != 0) {
4003                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
4004                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
4005                                      ", retrying after 1 second...\n",
4006                                      (retries==0)?"":"retry "));
4007                 sleep(1);
4008
4009                 if (ret != 0) {
4010                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
4011                 } else {
4012                         /* work out what error code we will give if we 
4013                            have to fail the operation */
4014                         switch ((enum ctdb_trans2_commit_error)status) {
4015                         case CTDB_TRANS2_COMMIT_SUCCESS:
4016                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4017                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4018                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4019                                 break;
4020                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4021                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4022                                 break;
4023                         }
4024                 }
4025
4026                 if (++retries == 100) {
4027                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4028                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4029                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4030                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4031                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4032                         talloc_free(h);
4033                         return -1;
4034                 }               
4035
4036                 if (ctdb_replay_transaction(h) != 0) {
4037                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4038                                           "transaction on db 0x%08x, "
4039                                           "failure control =%u\n",
4040                                           h->ctdb_db->db_id,
4041                                           (unsigned)failure_control));
4042                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4043                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4044                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4045                         talloc_free(h);
4046                         return -1;
4047                 }
4048                 goto again;
4049         } else {
4050                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4051         }
4052
4053         /* do the real commit locally */
4054         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4055         if (ret != 0) {
4056                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4057                                   "on db id 0x%08x locally, "
4058                                   "failure_control=%u\n",
4059                                   h->ctdb_db->db_id,
4060                                   (unsigned)failure_control));
4061                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4062                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4063                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4064                 talloc_free(h);
4065                 return ret;
4066         }
4067
4068         /* tell ctdbd that we are finished with our local commit */
4069         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4070                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4071                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4072         talloc_free(h);
4073         return 0;
4074 }
4075
4076 /*
4077   recovery daemon ping to main daemon
4078  */
4079 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4080 {
4081         int ret;
4082         int32_t res;
4083
4084         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4085                            ctdb, NULL, &res, NULL, NULL);
4086         if (ret != 0 || res != 0) {
4087                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4088                 return -1;
4089         }
4090
4091         return 0;
4092 }
4093
4094 /* when forking the main daemon and the child process needs to connect back
4095  * to the daemon as a client process, this function can be used to change
4096  * the ctdb context from daemon into client mode
4097  */
4098 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4099 {
4100         int ret;
4101         va_list ap;
4102
4103         /* Add extra information so we can identify this in the logs */
4104         va_start(ap, fmt);
4105         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4106         va_end(ap);
4107
4108         /* shutdown the transport */
4109         if (ctdb->methods) {
4110                 ctdb->methods->shutdown(ctdb);
4111         }
4112
4113         /* get a new event context */
4114         talloc_free(ctdb->ev);
4115         ctdb->ev = event_context_init(ctdb);
4116         tevent_loop_allow_nesting(ctdb->ev);
4117
4118         close(ctdb->daemon.sd);
4119         ctdb->daemon.sd = -1;
4120
4121         /* the client does not need to be realtime */
4122         if (ctdb->do_setsched) {
4123                 ctdb_restore_scheduler(ctdb);
4124         }
4125
4126         /* initialise ctdb */
4127         ret = ctdb_socket_connect(ctdb);
4128         if (ret != 0) {
4129                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4130                 return -1;
4131         }
4132
4133         ctdb->can_send_controls = true;
4134
4135         return 0;
4136 }
4137
4138 /*
4139   get the status of running the monitor eventscripts: NULL means never run.
4140  */
4141 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4142                 struct timeval timeout, uint32_t destnode, 
4143                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4144                 struct ctdb_scripts_wire **scripts)
4145 {
4146         int ret;
4147         TDB_DATA outdata, indata;
4148         int32_t res;
4149         uint32_t uinttype = type;
4150
4151         indata.dptr = (uint8_t *)&uinttype;
4152         indata.dsize = sizeof(uinttype);
4153
4154         ret = ctdb_control(ctdb, destnode, 0, 
4155                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4156                            mem_ctx, &outdata, &res, &timeout, NULL);
4157         if (ret != 0 || res != 0) {
4158                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4159                 return -1;
4160         }
4161
4162         if (outdata.dsize == 0) {
4163                 *scripts = NULL;
4164         } else {
4165                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4166                 talloc_free(outdata.dptr);
4167         }
4168                     
4169         return 0;
4170 }
4171
4172 /*
4173   tell the main daemon how long it took to lock the reclock file
4174  */
4175 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4176 {
4177         int ret;
4178         int32_t res;
4179         TDB_DATA data;
4180
4181         data.dptr = (uint8_t *)&latency;
4182         data.dsize = sizeof(latency);
4183
4184         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4185                            ctdb, NULL, &res, NULL, NULL);
4186         if (ret != 0 || res != 0) {
4187                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4188                 return -1;
4189         }
4190
4191         return 0;
4192 }
4193
4194 /*
4195   get the name of the reclock file
4196  */
4197 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4198                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4199                          const char **name)
4200 {
4201         int ret;
4202         int32_t res;
4203         TDB_DATA data;
4204
4205         ret = ctdb_control(ctdb, destnode, 0, 
4206                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4207                            mem_ctx, &data, &res, &timeout, NULL);
4208         if (ret != 0 || res != 0) {
4209                 return -1;
4210         }
4211
4212         if (data.dsize == 0) {
4213                 *name = NULL;
4214         } else {
4215                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4216         }
4217         talloc_free(data.dptr);
4218
4219         return 0;
4220 }
4221
4222 /*
4223   set the reclock filename for a node
4224  */
4225 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4226 {
4227         int ret;
4228         TDB_DATA data;
4229         int32_t res;
4230
4231         if (reclock == NULL) {
4232                 data.dsize = 0;
4233                 data.dptr  = NULL;
4234         } else {
4235                 data.dsize = strlen(reclock) + 1;
4236                 data.dptr  = discard_const(reclock);
4237         }
4238
4239         ret = ctdb_control(ctdb, destnode, 0, 
4240                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4241                            NULL, NULL, &res, &timeout, NULL);
4242         if (ret != 0 || res != 0) {
4243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4244                 return -1;
4245         }
4246
4247         return 0;
4248 }
4249
4250 /*
4251   stop a node
4252  */
4253 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4254 {
4255         int ret;
4256         int32_t res;
4257
4258         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4259                            ctdb, NULL, &res, &timeout, NULL);
4260         if (ret != 0 || res != 0) {
4261                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4262                 return -1;
4263         }
4264
4265         return 0;
4266 }
4267
4268 /*
4269   continue a node
4270  */
4271 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4272 {
4273         int ret;
4274
4275         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4276                            ctdb, NULL, NULL, &timeout, NULL);
4277         if (ret != 0) {
4278                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4279                 return -1;
4280         }
4281
4282         return 0;
4283 }
4284
4285 /*
4286   set the natgw state for a node
4287  */
4288 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4289 {
4290         int ret;
4291         TDB_DATA data;
4292         int32_t res;
4293
4294         data.dsize = sizeof(natgwstate);
4295         data.dptr  = (uint8_t *)&natgwstate;
4296
4297         ret = ctdb_control(ctdb, destnode, 0, 
4298                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4299                            NULL, NULL, &res, &timeout, NULL);
4300         if (ret != 0 || res != 0) {
4301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4302                 return -1;
4303         }
4304
4305         return 0;
4306 }
4307
4308 /*
4309   set the lmaster role for a node
4310  */
4311 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4312 {
4313         int ret;
4314         TDB_DATA data;
4315         int32_t res;
4316
4317         data.dsize = sizeof(lmasterrole);
4318         data.dptr  = (uint8_t *)&lmasterrole;
4319
4320         ret = ctdb_control(ctdb, destnode, 0, 
4321                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4322                            NULL, NULL, &res, &timeout, NULL);
4323         if (ret != 0 || res != 0) {
4324                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4325                 return -1;
4326         }
4327
4328         return 0;
4329 }
4330
4331 /*
4332   set the recmaster role for a node
4333  */
4334 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4335 {
4336         int ret;
4337         TDB_DATA data;
4338         int32_t res;
4339
4340         data.dsize = sizeof(recmasterrole);
4341         data.dptr  = (uint8_t *)&recmasterrole;
4342
4343         ret = ctdb_control(ctdb, destnode, 0, 
4344                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4345                            NULL, NULL, &res, &timeout, NULL);
4346         if (ret != 0 || res != 0) {
4347                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4348                 return -1;
4349         }
4350
4351         return 0;
4352 }
4353
4354 /* enable an eventscript
4355  */
4356 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4357 {
4358         int ret;
4359         TDB_DATA data;
4360         int32_t res;
4361
4362         data.dsize = strlen(script) + 1;
4363         data.dptr  = discard_const(script);
4364
4365         ret = ctdb_control(ctdb, destnode, 0, 
4366                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4367                            NULL, NULL, &res, &timeout, NULL);
4368         if (ret != 0 || res != 0) {
4369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4370                 return -1;
4371         }
4372
4373         return 0;
4374 }
4375
4376 /* disable an eventscript
4377  */
4378 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4379 {
4380         int ret;
4381         TDB_DATA data;
4382         int32_t res;
4383
4384         data.dsize = strlen(script) + 1;
4385         data.dptr  = discard_const(script);
4386
4387         ret = ctdb_control(ctdb, destnode, 0, 
4388                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4389                            NULL, NULL, &res, &timeout, NULL);
4390         if (ret != 0 || res != 0) {
4391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4392                 return -1;
4393         }
4394
4395         return 0;
4396 }
4397
4398
4399 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4400 {
4401         int ret;
4402         TDB_DATA data;
4403         int32_t res;
4404
4405         data.dsize = sizeof(*bantime);
4406         data.dptr  = (uint8_t *)bantime;
4407
4408         ret = ctdb_control(ctdb, destnode, 0, 
4409                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4410                            NULL, NULL, &res, &timeout, NULL);
4411         if (ret != 0 || res != 0) {
4412                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4413                 return -1;
4414         }
4415
4416         return 0;
4417 }
4418
4419
4420 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4421 {
4422         int ret;
4423         TDB_DATA outdata;
4424         int32_t res;
4425         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4426
4427         ret = ctdb_control(ctdb, destnode, 0, 
4428                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4429                            tmp_ctx, &outdata, &res, &timeout, NULL);
4430         if (ret != 0 || res != 0) {
4431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4432                 talloc_free(tmp_ctx);
4433                 return -1;
4434         }
4435
4436         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4437         talloc_free(tmp_ctx);
4438
4439         return 0;
4440 }
4441
4442
4443 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4444 {
4445         int ret;
4446         int32_t res;
4447         TDB_DATA data;
4448         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4449
4450         data.dptr = (uint8_t*)db_prio;
4451         data.dsize = sizeof(*db_prio);
4452
4453         ret = ctdb_control(ctdb, destnode, 0, 
4454                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4455                            tmp_ctx, NULL, &res, &timeout, NULL);
4456         if (ret != 0 || res != 0) {
4457                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4458                 talloc_free(tmp_ctx);
4459                 return -1;
4460         }
4461
4462         talloc_free(tmp_ctx);
4463
4464         return 0;
4465 }
4466
4467 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4468 {
4469         int ret;
4470         int32_t res;
4471         TDB_DATA data;
4472         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4473
4474         data.dptr = (uint8_t*)&db_id;
4475         data.dsize = sizeof(db_id);
4476
4477         ret = ctdb_control(ctdb, destnode, 0, 
4478                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4479                            tmp_ctx, NULL, &res, &timeout, NULL);
4480         if (ret != 0 || res < 0) {
4481                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4482                 talloc_free(tmp_ctx);
4483                 return -1;
4484         }
4485
4486         if (priority) {
4487                 *priority = res;
4488         }
4489
4490         talloc_free(tmp_ctx);
4491
4492         return 0;
4493 }
4494
4495 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4496 {
4497         int ret;
4498         TDB_DATA outdata;
4499         int32_t res;
4500
4501         ret = ctdb_control(ctdb, destnode, 0, 
4502                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4503                            mem_ctx, &outdata, &res, &timeout, NULL);
4504         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4506                 return -1;
4507         }
4508
4509         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4510         talloc_free(outdata.dptr);
4511                     
4512         return 0;
4513 }
4514
4515 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4516 {
4517         if (h == NULL) {
4518                 return NULL;
4519         }
4520
4521         return &h->header;
4522 }
4523
4524
4525 struct ctdb_client_control_state *
4526 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4527 {
4528         struct ctdb_client_control_state *handle;
4529         struct ctdb_marshall_buffer *m;
4530         struct ctdb_rec_data *rec;
4531         TDB_DATA outdata;
4532
4533         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4534         if (m == NULL) {
4535                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4536                 return NULL;
4537         }
4538
4539         m->db_id = ctdb_db->db_id;
4540
4541         rec = ctdb_marshall_record(m, 0, key, header, data);
4542         if (rec == NULL) {
4543                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4544                 talloc_free(m);
4545                 return NULL;
4546         }
4547         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4548         if (m == NULL) {
4549                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4550                 talloc_free(m);
4551                 return NULL;
4552         }
4553         m->count++;
4554         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4555
4556
4557         outdata.dptr = (uint8_t *)m;
4558         outdata.dsize = talloc_get_size(m);
4559
4560         handle = ctdb_control_send(ctdb, destnode, 0, 
4561                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4562                            mem_ctx, &timeout, NULL);
4563         talloc_free(m);
4564         return handle;
4565 }
4566
4567 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4568 {
4569         int ret;
4570         int32_t res;
4571
4572         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4573         if ( (ret != 0) || (res != 0) ){
4574                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4575                 return -1;
4576         }
4577
4578         return 0;
4579 }
4580
4581 int
4582 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4583 {
4584         struct ctdb_client_control_state *state;
4585
4586         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4587         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4588 }
4589
4590
4591
4592
4593
4594
4595 /*
4596   set a database to be readonly
4597  */
4598 struct ctdb_client_control_state *
4599 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4600 {
4601         TDB_DATA data;
4602
4603         data.dptr = (uint8_t *)&dbid;
4604         data.dsize = sizeof(dbid);
4605
4606         return ctdb_control_send(ctdb, destnode, 0, 
4607                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4608                            ctdb, NULL, NULL);
4609 }
4610
4611 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4612 {
4613         int ret;
4614         int32_t res;
4615
4616         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4617         if (ret != 0 || res != 0) {
4618           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4619                 return -1;
4620         }
4621
4622         return 0;
4623 }
4624
4625 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4626 {
4627         struct ctdb_client_control_state *state;
4628
4629         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4630         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4631 }
4632
4633 /*
4634   set a database to be sticky
4635  */
4636 struct ctdb_client_control_state *
4637 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4638 {
4639         TDB_DATA data;
4640
4641         data.dptr = (uint8_t *)&dbid;
4642         data.dsize = sizeof(dbid);
4643
4644         return ctdb_control_send(ctdb, destnode, 0, 
4645                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4646                            ctdb, NULL, NULL);
4647 }
4648
4649 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4650 {
4651         int ret;
4652         int32_t res;
4653
4654         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4655         if (ret != 0 || res != 0) {
4656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4657                 return -1;
4658         }
4659
4660         return 0;
4661 }
4662
4663 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4664 {
4665         struct ctdb_client_control_state *state;
4666
4667         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4668         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4669 }