19a935023f4d31a1e5d45d547d2ed2e134973697
[rusty/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 /*
34   allocate a packet for use in client<->daemon communication
35  */
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37                                             TALLOC_CTX *mem_ctx, 
38                                             enum ctdb_operation operation, 
39                                             size_t length, size_t slength,
40                                             const char *type)
41 {
42         int size;
43         struct ctdb_req_header *hdr;
44
45         length = MAX(length, slength);
46         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47
48         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49         if (hdr == NULL) {
50                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51                          operation, (unsigned)length));
52                 return NULL;
53         }
54         talloc_set_name_const(hdr, type);
55         memset(hdr, 0, slength);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90
91         for (fn=ctdb_db->calls;fn;fn=fn->next) {
92                 if (fn->id == call->call_id) break;
93         }
94         if (fn == NULL) {
95                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
96                 talloc_free(c);
97                 return -1;
98         }
99
100         if (fn->fn(c) != 0) {
101                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
102                 talloc_free(c);
103                 return -1;
104         }
105
106         if (header->laccessor != caller) {
107                 header->lacount = 0;
108         }
109         header->laccessor = caller;
110         header->lacount++;
111
112         /* we need to force the record to be written out if this was a remote access,
113            so that the lacount is updated */
114         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115                 c->new_data = &c->record_data;
116         }
117
118         if (c->new_data) {
119                 /* XXX check that we always have the lock here? */
120                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122                         talloc_free(c);
123                         return -1;
124                 }
125         }
126
127         if (c->reply_data) {
128                 call->reply_data = *c->reply_data;
129
130                 talloc_steal(call, call->reply_data.dptr);
131                 talloc_set_name_const(call->reply_data.dptr, __location__);
132         } else {
133                 call->reply_data.dptr = NULL;
134                 call->reply_data.dsize = 0;
135         }
136         call->status = c->status;
137
138         talloc_free(c);
139
140         return 0;
141 }
142
143
144 /*
145   queue a packet for sending from client to daemon
146 */
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 {
149         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 }
151
152
153 /*
154   called when a CTDB_REPLY_CALL packet comes in in the client
155
156   This packet comes in response to a CTDB_REQ_CALL request packet. It
157   contains any reply data from the call
158 */
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 {
161         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162         struct ctdb_client_call_state *state;
163
164         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165         if (state == NULL) {
166                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
167                 return;
168         }
169
170         if (hdr->reqid != state->reqid) {
171                 /* we found a record  but it was the wrong one */
172                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
173                 return;
174         }
175
176         state->call->reply_data.dptr = c->data;
177         state->call->reply_data.dsize = c->datalen;
178         state->call->status = c->status;
179
180         talloc_steal(state, c);
181
182         state->state = CTDB_CALL_DONE;
183
184         if (state->async.fn) {
185                 state->async.fn(state);
186         }
187 }
188
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
190
191 /*
192   this is called in the client, when data comes in from the daemon
193  */
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 {
196         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
198         TALLOC_CTX *tmp_ctx;
199
200         /* place the packet as a child of a tmp_ctx. We then use
201            talloc_free() below to free it. If any of the calls want
202            to keep it, then they will steal it somewhere else, and the
203            talloc_free() will be a no-op */
204         tmp_ctx = talloc_new(ctdb);
205         talloc_steal(tmp_ctx, hdr);
206
207         if (cnt == 0) {
208                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
209                 exit(0);
210         }
211
212         if (cnt < sizeof(*hdr)) {
213                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
214                 goto done;
215         }
216         if (cnt != hdr->length) {
217                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
218                                (unsigned)hdr->length, (unsigned)cnt);
219                 goto done;
220         }
221
222         if (hdr->ctdb_magic != CTDB_MAGIC) {
223                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
224                 goto done;
225         }
226
227         if (hdr->ctdb_version != CTDB_VERSION) {
228                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
229                 goto done;
230         }
231
232         switch (hdr->operation) {
233         case CTDB_REPLY_CALL:
234                 ctdb_client_reply_call(ctdb, hdr);
235                 break;
236
237         case CTDB_REQ_MESSAGE:
238                 ctdb_request_message(ctdb, hdr);
239                 break;
240
241         case CTDB_REPLY_CONTROL:
242                 ctdb_client_reply_control(ctdb, hdr);
243                 break;
244
245         default:
246                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
247         }
248
249 done:
250         talloc_free(tmp_ctx);
251 }
252
253 /*
254   connect to a unix domain socket
255 */
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 {
258         struct sockaddr_un addr;
259
260         memset(&addr, 0, sizeof(addr));
261         addr.sun_family = AF_UNIX;
262         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263
264         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265         if (ctdb->daemon.sd == -1) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
267                 return -1;
268         }
269
270         set_nonblocking(ctdb->daemon.sd);
271         set_close_on_exec(ctdb->daemon.sd);
272         
273         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
274                 close(ctdb->daemon.sd);
275                 ctdb->daemon.sd = -1;
276                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
277                 return -1;
278         }
279
280         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
281                                               CTDB_DS_ALIGNMENT, 
282                                               ctdb_client_read_cb, ctdb);
283         return 0;
284 }
285
286
287 struct ctdb_record_handle {
288         struct ctdb_db_context *ctdb_db;
289         TDB_DATA key;
290         TDB_DATA *data;
291         struct ctdb_ltdb_header header;
292 };
293
294
295 /*
296   make a recv call to the local ctdb daemon - called from client context
297
298   This is called when the program wants to wait for a ctdb_call to complete and get the 
299   results. This call will block unless the call has already completed.
300 */
301 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
302 {
303         if (state == NULL) {
304                 return -1;
305         }
306
307         while (state->state < CTDB_CALL_DONE) {
308                 event_loop_once(state->ctdb_db->ctdb->ev);
309         }
310         if (state->state != CTDB_CALL_DONE) {
311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
312                 talloc_free(state);
313                 return -1;
314         }
315
316         if (state->call->reply_data.dsize) {
317                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
318                                                       state->call->reply_data.dptr,
319                                                       state->call->reply_data.dsize);
320                 call->reply_data.dsize = state->call->reply_data.dsize;
321         } else {
322                 call->reply_data.dptr = NULL;
323                 call->reply_data.dsize = 0;
324         }
325         call->status = state->call->status;
326         talloc_free(state);
327
328         return 0;
329 }
330
331
332
333
334 /*
335   destroy a ctdb_call in client
336 */
337 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
338 {
339         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
340         return 0;
341 }
342
343 /*
344   construct an event driven local ctdb_call
345
346   this is used so that locally processed ctdb_call requests are processed
347   in an event driven manner
348 */
349 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
350                                                                   struct ctdb_call *call,
351                                                                   struct ctdb_ltdb_header *header,
352                                                                   TDB_DATA *data)
353 {
354         struct ctdb_client_call_state *state;
355         struct ctdb_context *ctdb = ctdb_db->ctdb;
356         int ret;
357
358         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
359         CTDB_NO_MEMORY_NULL(ctdb, state);
360         state->call = talloc_zero(state, struct ctdb_call);
361         CTDB_NO_MEMORY_NULL(ctdb, state->call);
362
363         talloc_steal(state, data->dptr);
364
365         state->state   = CTDB_CALL_DONE;
366         *(state->call) = *call;
367         state->ctdb_db = ctdb_db;
368
369         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
370
371         return state;
372 }
373
374 /*
375   make a ctdb call to the local daemon - async send. Called from client context.
376
377   This constructs a ctdb_call request and queues it for processing. 
378   This call never blocks.
379 */
380 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
381                                               struct ctdb_call *call)
382 {
383         struct ctdb_client_call_state *state;
384         struct ctdb_context *ctdb = ctdb_db->ctdb;
385         struct ctdb_ltdb_header header;
386         TDB_DATA data;
387         int ret;
388         size_t len;
389         struct ctdb_req_call *c;
390
391         /* if the domain socket is not yet open, open it */
392         if (ctdb->daemon.sd==-1) {
393                 ctdb_socket_connect(ctdb);
394         }
395
396         ret = ctdb_ltdb_lock(ctdb_db, call->key);
397         if (ret != 0) {
398                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
399                 return NULL;
400         }
401
402         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
403
404         if (ret == 0 && header.dmaster == ctdb->pnn) {
405                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
406                 talloc_free(data.dptr);
407                 ctdb_ltdb_unlock(ctdb_db, call->key);
408                 return state;
409         }
410
411         ctdb_ltdb_unlock(ctdb_db, call->key);
412         talloc_free(data.dptr);
413
414         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
415         if (state == NULL) {
416                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
417                 return NULL;
418         }
419         state->call = talloc_zero(state, struct ctdb_call);
420         if (state->call == NULL) {
421                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
422                 return NULL;
423         }
424
425         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
426         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
427         if (c == NULL) {
428                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
429                 return NULL;
430         }
431
432         state->reqid     = ctdb_reqid_new(ctdb, state);
433         state->ctdb_db = ctdb_db;
434         talloc_set_destructor(state, ctdb_client_call_destructor);
435
436         c->hdr.reqid     = state->reqid;
437         c->flags         = call->flags;
438         c->db_id         = ctdb_db->db_id;
439         c->callid        = call->call_id;
440         c->hopcount      = 0;
441         c->keylen        = call->key.dsize;
442         c->calldatalen   = call->call_data.dsize;
443         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
444         memcpy(&c->data[call->key.dsize], 
445                call->call_data.dptr, call->call_data.dsize);
446         *(state->call)              = *call;
447         state->call->call_data.dptr = &c->data[call->key.dsize];
448         state->call->key.dptr       = &c->data[0];
449
450         state->state  = CTDB_CALL_WAIT;
451
452
453         ctdb_client_queue_pkt(ctdb, &c->hdr);
454
455         return state;
456 }
457
458
459 /*
460   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
461 */
462 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
463 {
464         struct ctdb_client_call_state *state;
465
466         state = ctdb_call_send(ctdb_db, call);
467         return ctdb_call_recv(state, call);
468 }
469
470
471 /*
472   tell the daemon what messaging srvid we will use, and register the message
473   handler function in the client
474 */
475 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
476                              ctdb_message_fn_t handler,
477                              void *private_data)
478                                     
479 {
480         int res;
481         int32_t status;
482         
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501         
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514
515 /*
516   send a message - from client context
517  */
518 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
519                       uint64_t srvid, TDB_DATA data)
520 {
521         struct ctdb_req_message *r;
522         int len, res;
523
524         len = offsetof(struct ctdb_req_message, data) + data.dsize;
525         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
526                                len, struct ctdb_req_message);
527         CTDB_NO_MEMORY(ctdb, r);
528
529         r->hdr.destnode  = pnn;
530         r->srvid         = srvid;
531         r->datalen       = data.dsize;
532         memcpy(&r->data[0], data.dptr, data.dsize);
533         
534         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
535         if (res != 0) {
536                 return res;
537         }
538
539         talloc_free(r);
540         return 0;
541 }
542
543
544 /*
545   cancel a ctdb_fetch_lock operation, releasing the lock
546  */
547 static int fetch_lock_destructor(struct ctdb_record_handle *h)
548 {
549         ctdb_ltdb_unlock(h->ctdb_db, h->key);
550         return 0;
551 }
552
553 /*
554   force the migration of a record to this node
555  */
556 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
557 {
558         struct ctdb_call call;
559         ZERO_STRUCT(call);
560         call.call_id = CTDB_NULL_FUNC;
561         call.key = key;
562         call.flags = CTDB_IMMEDIATE_MIGRATION;
563         return ctdb_call(ctdb_db, &call);
564 }
565
566 /*
567   get a lock on a record, and return the records data. Blocks until it gets the lock
568  */
569 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
570                                            TDB_DATA key, TDB_DATA *data)
571 {
572         int ret;
573         struct ctdb_record_handle *h;
574
575         /*
576           procedure is as follows:
577
578           1) get the chain lock. 
579           2) check if we are dmaster
580           3) if we are the dmaster then return handle 
581           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
582              reply from ctdbd
583           5) when we get the reply, goto (1)
584          */
585
586         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
587         if (h == NULL) {
588                 return NULL;
589         }
590
591         h->ctdb_db = ctdb_db;
592         h->key     = key;
593         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
594         if (h->key.dptr == NULL) {
595                 talloc_free(h);
596                 return NULL;
597         }
598         h->data    = data;
599
600         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
601                  (const char *)key.dptr));
602
603 again:
604         /* step 1 - get the chain lock */
605         ret = ctdb_ltdb_lock(ctdb_db, key);
606         if (ret != 0) {
607                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
608                 talloc_free(h);
609                 return NULL;
610         }
611
612         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
613
614         talloc_set_destructor(h, fetch_lock_destructor);
615
616         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
617
618         /* when torturing, ensure we test the remote path */
619         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
620             random() % 5 == 0) {
621                 h->header.dmaster = (uint32_t)-1;
622         }
623
624
625         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
626
627         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
628                 ctdb_ltdb_unlock(ctdb_db, key);
629                 ret = ctdb_client_force_migration(ctdb_db, key);
630                 if (ret != 0) {
631                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
632                         talloc_free(h);
633                         return NULL;
634                 }
635                 goto again;
636         }
637
638         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
639         return h;
640 }
641
642 /*
643   store some data to the record that was locked with ctdb_fetch_lock()
644 */
645 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
646 {
647         int ret;
648         int32_t status;
649         struct ctdb_rec_data *rec;
650         TDB_DATA recdata;
651
652         if (h->ctdb_db->persistent) {
653                 h->header.rsn++;
654         }
655
656         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
657         if (ret != 0) {
658                 return ret;
659         }
660
661         /* don't need the persistent_store control for non-persistent databases */
662         if (!h->ctdb_db->persistent) {
663                 return 0;
664         }
665
666         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
667         if (rec == NULL) {
668                 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
669                 return -1;
670         }
671
672         recdata.dptr = (uint8_t *)rec;
673         recdata.dsize = rec->length;
674
675         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
676                            CTDB_CONTROL_PERSISTENT_STORE, 0,
677                            recdata, NULL, NULL, &status, NULL, NULL);
678
679         talloc_free(rec);
680
681         if (ret != 0 || status != 0) {
682                 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
683                 return -1;
684         }
685
686         return 0;
687 }
688
689 /*
690   non-locking fetch of a record
691  */
692 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
693                TDB_DATA key, TDB_DATA *data)
694 {
695         struct ctdb_call call;
696         int ret;
697
698         call.call_id = CTDB_FETCH_FUNC;
699         call.call_data.dptr = NULL;
700         call.call_data.dsize = 0;
701
702         ret = ctdb_call(ctdb_db, &call);
703
704         if (ret == 0) {
705                 *data = call.reply_data;
706                 talloc_steal(mem_ctx, data->dptr);
707         }
708
709         return ret;
710 }
711
712
713
714 /*
715    called when a control completes or timesout to invoke the callback
716    function the user provided
717 */
718 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
719         struct timeval t, void *private_data)
720 {
721         struct ctdb_client_control_state *state;
722         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
723         int ret;
724
725         state = talloc_get_type(private_data, struct ctdb_client_control_state);
726         talloc_steal(tmp_ctx, state);
727
728         ret = ctdb_control_recv(state->ctdb, state, state,
729                         NULL, 
730                         NULL, 
731                         NULL);
732
733         talloc_free(tmp_ctx);
734 }
735
736 /*
737   called when a CTDB_REPLY_CONTROL packet comes in in the client
738
739   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
740   contains any reply data from the control
741 */
742 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
743                                       struct ctdb_req_header *hdr)
744 {
745         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
746         struct ctdb_client_control_state *state;
747
748         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
749         if (state == NULL) {
750                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
751                 return;
752         }
753
754         if (hdr->reqid != state->reqid) {
755                 /* we found a record  but it was the wrong one */
756                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
757                 return;
758         }
759
760         state->outdata.dptr = c->data;
761         state->outdata.dsize = c->datalen;
762         state->status = c->status;
763         if (c->errorlen) {
764                 state->errormsg = talloc_strndup(state, 
765                                                  (char *)&c->data[c->datalen], 
766                                                  c->errorlen);
767         }
768
769         /* state->outdata now uses resources from c so we dont want c
770            to just dissappear from under us while state is still alive
771         */
772         talloc_steal(state, c);
773
774         state->state = CTDB_CONTROL_DONE;
775
776         /* if we had a callback registered for this control, pull the response
777            and call the callback.
778         */
779         if (state->async.fn) {
780                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
781         }
782 }
783
784
785 /*
786   destroy a ctdb_control in client
787 */
788 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
789 {
790         ctdb_reqid_remove(state->ctdb, state->reqid);
791         return 0;
792 }
793
794
795 /* time out handler for ctdb_control */
796 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
797         struct timeval t, void *private_data)
798 {
799         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
800
801         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
802
803         state->state = CTDB_CONTROL_TIMEOUT;
804
805         /* if we had a callback registered for this control, pull the response
806            and call the callback.
807         */
808         if (state->async.fn) {
809                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
810         }
811 }
812
813 /* async version of send control request */
814 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
815                 uint32_t destnode, uint64_t srvid, 
816                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
817                 TALLOC_CTX *mem_ctx,
818                 struct timeval *timeout,
819                 char **errormsg)
820 {
821         struct ctdb_client_control_state *state;
822         size_t len;
823         struct ctdb_req_control *c;
824         int ret;
825
826         if (errormsg) {
827                 *errormsg = NULL;
828         }
829
830         /* if the domain socket is not yet open, open it */
831         if (ctdb->daemon.sd==-1) {
832                 ctdb_socket_connect(ctdb);
833         }
834
835         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
836         CTDB_NO_MEMORY_NULL(ctdb, state);
837
838         state->ctdb       = ctdb;
839         state->reqid      = ctdb_reqid_new(ctdb, state);
840         state->state      = CTDB_CONTROL_WAIT;
841         state->errormsg   = NULL;
842
843         talloc_set_destructor(state, ctdb_control_destructor);
844
845         len = offsetof(struct ctdb_req_control, data) + data.dsize;
846         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
847                                len, struct ctdb_req_control);
848         state->c            = c;        
849         CTDB_NO_MEMORY_NULL(ctdb, c);
850         c->hdr.reqid        = state->reqid;
851         c->hdr.destnode     = destnode;
852         c->hdr.reqid        = state->reqid;
853         c->opcode           = opcode;
854         c->client_id        = 0;
855         c->flags            = flags;
856         c->srvid            = srvid;
857         c->datalen          = data.dsize;
858         if (data.dsize) {
859                 memcpy(&c->data[0], data.dptr, data.dsize);
860         }
861
862         /* timeout */
863         if (timeout && !timeval_is_zero(timeout)) {
864                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
865         }
866
867         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
868         if (ret != 0) {
869                 talloc_free(state);
870                 return NULL;
871         }
872
873         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
874                 talloc_free(state);
875                 return NULL;
876         }
877
878         return state;
879 }
880
881
882 /* async version of receive control reply */
883 int ctdb_control_recv(struct ctdb_context *ctdb, 
884                 struct ctdb_client_control_state *state, 
885                 TALLOC_CTX *mem_ctx,
886                 TDB_DATA *outdata, int32_t *status, char **errormsg)
887 {
888         TALLOC_CTX *tmp_ctx;
889
890         if (status != NULL) {
891                 *status = -1;
892         }
893         if (errormsg != NULL) {
894                 *errormsg = NULL;
895         }
896
897         if (state == NULL) {
898                 return -1;
899         }
900
901         /* prevent double free of state */
902         tmp_ctx = talloc_new(ctdb);
903         talloc_steal(tmp_ctx, state);
904
905         /* loop one event at a time until we either timeout or the control
906            completes.
907         */
908         while (state->state == CTDB_CONTROL_WAIT) {
909                 event_loop_once(ctdb->ev);
910         }
911
912         if (state->state != CTDB_CONTROL_DONE) {
913                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
914                 if (state->async.fn) {
915                         state->async.fn(state);
916                 }
917                 talloc_free(tmp_ctx);
918                 return -1;
919         }
920
921         if (state->errormsg) {
922                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
923                 if (errormsg) {
924                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
925                 }
926                 if (state->async.fn) {
927                         state->async.fn(state);
928                 }
929                 talloc_free(tmp_ctx);
930                 return -1;
931         }
932
933         if (outdata) {
934                 *outdata = state->outdata;
935                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
936         }
937
938         if (status) {
939                 *status = state->status;
940         }
941
942         if (state->async.fn) {
943                 state->async.fn(state);
944         }
945
946         talloc_free(tmp_ctx);
947         return 0;
948 }
949
950
951
952 /*
953   send a ctdb control message
954   timeout specifies how long we should wait for a reply.
955   if timeout is NULL we wait indefinitely
956  */
957 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
958                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
959                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
960                  struct timeval *timeout,
961                  char **errormsg)
962 {
963         struct ctdb_client_control_state *state;
964
965         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
966                         flags, data, mem_ctx,
967                         timeout, errormsg);
968         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
969                         errormsg);
970 }
971
972
973
974
975 /*
976   a process exists call. Returns 0 if process exists, -1 otherwise
977  */
978 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
979 {
980         int ret;
981         TDB_DATA data;
982         int32_t status;
983
984         data.dptr = (uint8_t*)&pid;
985         data.dsize = sizeof(pid);
986
987         ret = ctdb_control(ctdb, destnode, 0, 
988                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
989                            NULL, NULL, &status, NULL, NULL);
990         if (ret != 0) {
991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
992                 return -1;
993         }
994
995         return status;
996 }
997
998 /*
999   get remote statistics
1000  */
1001 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1002 {
1003         int ret;
1004         TDB_DATA data;
1005         int32_t res;
1006
1007         ret = ctdb_control(ctdb, destnode, 0, 
1008                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1009                            ctdb, &data, &res, NULL, NULL);
1010         if (ret != 0 || res != 0) {
1011                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1012                 return -1;
1013         }
1014
1015         if (data.dsize != sizeof(struct ctdb_statistics)) {
1016                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1017                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1018                       return -1;
1019         }
1020
1021         *status = *(struct ctdb_statistics *)data.dptr;
1022         talloc_free(data.dptr);
1023                         
1024         return 0;
1025 }
1026
1027 /*
1028   shutdown a remote ctdb node
1029  */
1030 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1031 {
1032         struct ctdb_client_control_state *state;
1033
1034         state = ctdb_control_send(ctdb, destnode, 0, 
1035                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1036                            NULL, &timeout, NULL);
1037         if (state == NULL) {
1038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1039                 return -1;
1040         }
1041
1042         return 0;
1043 }
1044
1045 /*
1046   get vnn map from a remote node
1047  */
1048 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1049 {
1050         int ret;
1051         TDB_DATA outdata;
1052         int32_t res;
1053         struct ctdb_vnn_map_wire *map;
1054
1055         ret = ctdb_control(ctdb, destnode, 0, 
1056                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1057                            mem_ctx, &outdata, &res, &timeout, NULL);
1058         if (ret != 0 || res != 0) {
1059                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1060                 return -1;
1061         }
1062         
1063         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1064         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1065             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1066                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1067                 return -1;
1068         }
1069
1070         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1071         CTDB_NO_MEMORY(ctdb, *vnnmap);
1072         (*vnnmap)->generation = map->generation;
1073         (*vnnmap)->size       = map->size;
1074         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1075
1076         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1077         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1078         talloc_free(outdata.dptr);
1079                     
1080         return 0;
1081 }
1082
1083
1084 /*
1085   get the recovery mode of a remote node
1086  */
1087 struct ctdb_client_control_state *
1088 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1089 {
1090         return ctdb_control_send(ctdb, destnode, 0, 
1091                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1092                            mem_ctx, &timeout, NULL);
1093 }
1094
1095 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1096 {
1097         int ret;
1098         int32_t res;
1099
1100         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1101         if (ret != 0) {
1102                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1103                 return -1;
1104         }
1105
1106         if (recmode) {
1107                 *recmode = (uint32_t)res;
1108         }
1109
1110         return 0;
1111 }
1112
1113 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1114 {
1115         struct ctdb_client_control_state *state;
1116
1117         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1118         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1119 }
1120
1121
1122
1123
1124 /*
1125   set the recovery mode of a remote node
1126  */
1127 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1128 {
1129         int ret;
1130         TDB_DATA data;
1131         int32_t res;
1132
1133         data.dsize = sizeof(uint32_t);
1134         data.dptr = (unsigned char *)&recmode;
1135
1136         ret = ctdb_control(ctdb, destnode, 0, 
1137                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1138                            NULL, NULL, &res, &timeout, NULL);
1139         if (ret != 0 || res != 0) {
1140                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1141                 return -1;
1142         }
1143
1144         return 0;
1145 }
1146
1147
1148
1149 /*
1150   get the recovery master of a remote node
1151  */
1152 struct ctdb_client_control_state *
1153 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1154                         struct timeval timeout, uint32_t destnode)
1155 {
1156         return ctdb_control_send(ctdb, destnode, 0, 
1157                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1158                            mem_ctx, &timeout, NULL);
1159 }
1160
1161 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1162 {
1163         int ret;
1164         int32_t res;
1165
1166         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1167         if (ret != 0) {
1168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1169                 return -1;
1170         }
1171
1172         if (recmaster) {
1173                 *recmaster = (uint32_t)res;
1174         }
1175
1176         return 0;
1177 }
1178
1179 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1180 {
1181         struct ctdb_client_control_state *state;
1182
1183         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1184         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1185 }
1186
1187
1188 /*
1189   set the recovery master of a remote node
1190  */
1191 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1192 {
1193         int ret;
1194         TDB_DATA data;
1195         int32_t res;
1196
1197         ZERO_STRUCT(data);
1198         data.dsize = sizeof(uint32_t);
1199         data.dptr = (unsigned char *)&recmaster;
1200
1201         ret = ctdb_control(ctdb, destnode, 0, 
1202                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1203                            NULL, NULL, &res, &timeout, NULL);
1204         if (ret != 0 || res != 0) {
1205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1206                 return -1;
1207         }
1208
1209         return 0;
1210 }
1211
1212
1213 /*
1214   get a list of databases off a remote node
1215  */
1216 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1217                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1218 {
1219         int ret;
1220         TDB_DATA outdata;
1221         int32_t res;
1222
1223         ret = ctdb_control(ctdb, destnode, 0, 
1224                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1225                            mem_ctx, &outdata, &res, &timeout, NULL);
1226         if (ret != 0 || res != 0) {
1227                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1228                 return -1;
1229         }
1230
1231         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1232         talloc_free(outdata.dptr);
1233                     
1234         return 0;
1235 }
1236
1237 /*
1238   get a list of nodes (vnn and flags ) from a remote node
1239  */
1240 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1241                 struct timeval timeout, uint32_t destnode, 
1242                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1243 {
1244         int ret;
1245         TDB_DATA outdata;
1246         int32_t res;
1247
1248         ret = ctdb_control(ctdb, destnode, 0, 
1249                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1250                            mem_ctx, &outdata, &res, &timeout, NULL);
1251         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1252                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1253                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1254         }
1255         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1256                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1257                 return -1;
1258         }
1259
1260         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1261         talloc_free(outdata.dptr);
1262                     
1263         return 0;
1264 }
1265
1266 /*
1267   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1268  */
1269 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1270                 struct timeval timeout, uint32_t destnode, 
1271                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1272 {
1273         int ret, i, len;
1274         TDB_DATA outdata;
1275         struct ctdb_node_mapv4 *nodemapv4;
1276         int32_t res;
1277
1278         ret = ctdb_control(ctdb, destnode, 0, 
1279                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1280                            mem_ctx, &outdata, &res, &timeout, NULL);
1281         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1282                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1283                 return -1;
1284         }
1285
1286         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1287
1288         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1289         (*nodemap) = talloc_zero_size(mem_ctx, len);
1290         CTDB_NO_MEMORY(ctdb, (*nodemap));
1291
1292         (*nodemap)->num = nodemapv4->num;
1293         for (i=0; i<nodemapv4->num; i++) {
1294                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1295                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1296                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1297                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1298         }
1299                 
1300         talloc_free(outdata.dptr);
1301                     
1302         return 0;
1303 }
1304
1305 /*
1306   drop the transport, reload the nodes file and restart the transport
1307  */
1308 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1309                     struct timeval timeout, uint32_t destnode)
1310 {
1311         int ret;
1312         int32_t res;
1313
1314         ret = ctdb_control(ctdb, destnode, 0, 
1315                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1316                            NULL, NULL, &res, &timeout, NULL);
1317         if (ret != 0 || res != 0) {
1318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1319                 return -1;
1320         }
1321
1322         return 0;
1323 }
1324
1325
1326 /*
1327   set vnn map on a node
1328  */
1329 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1330                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1331 {
1332         int ret;
1333         TDB_DATA data;
1334         int32_t res;
1335         struct ctdb_vnn_map_wire *map;
1336         size_t len;
1337
1338         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1339         map = talloc_size(mem_ctx, len);
1340         CTDB_NO_MEMORY(ctdb, map);
1341
1342         map->generation = vnnmap->generation;
1343         map->size = vnnmap->size;
1344         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1345         
1346         data.dsize = len;
1347         data.dptr  = (uint8_t *)map;
1348
1349         ret = ctdb_control(ctdb, destnode, 0, 
1350                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1351                            NULL, NULL, &res, &timeout, NULL);
1352         if (ret != 0 || res != 0) {
1353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1354                 return -1;
1355         }
1356
1357         talloc_free(map);
1358
1359         return 0;
1360 }
1361
1362
1363 /*
1364   async send for pull database
1365  */
1366 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1367         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1368         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1369 {
1370         TDB_DATA indata;
1371         struct ctdb_control_pulldb *pull;
1372         struct ctdb_client_control_state *state;
1373
1374         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1375         CTDB_NO_MEMORY_NULL(ctdb, pull);
1376
1377         pull->db_id   = dbid;
1378         pull->lmaster = lmaster;
1379
1380         indata.dsize = sizeof(struct ctdb_control_pulldb);
1381         indata.dptr  = (unsigned char *)pull;
1382
1383         state = ctdb_control_send(ctdb, destnode, 0, 
1384                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1385                                   mem_ctx, &timeout, NULL);
1386         talloc_free(pull);
1387
1388         return state;
1389 }
1390
1391 /*
1392   async recv for pull database
1393  */
1394 int ctdb_ctrl_pulldb_recv(
1395         struct ctdb_context *ctdb, 
1396         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1397         TDB_DATA *outdata)
1398 {
1399         int ret;
1400         int32_t res;
1401
1402         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1403         if ( (ret != 0) || (res != 0) ){
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1405                 return -1;
1406         }
1407
1408         return 0;
1409 }
1410
1411 /*
1412   pull all keys and records for a specific database on a node
1413  */
1414 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1415                 uint32_t dbid, uint32_t lmaster, 
1416                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1417                 TDB_DATA *outdata)
1418 {
1419         struct ctdb_client_control_state *state;
1420
1421         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1422                                       timeout);
1423         
1424         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1425 }
1426
1427
1428 /*
1429   change dmaster for all keys in the database to the new value
1430  */
1431 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1432                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1433 {
1434         int ret;
1435         TDB_DATA indata;
1436         int32_t res;
1437
1438         indata.dsize = 2*sizeof(uint32_t);
1439         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1440
1441         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1442         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1443
1444         ret = ctdb_control(ctdb, destnode, 0, 
1445                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1446                            NULL, NULL, &res, &timeout, NULL);
1447         if (ret != 0 || res != 0) {
1448                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1449                 return -1;
1450         }
1451
1452         return 0;
1453 }
1454
1455 /*
1456   ping a node, return number of clients connected
1457  */
1458 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1459 {
1460         int ret;
1461         int32_t res;
1462
1463         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1464                            tdb_null, NULL, NULL, &res, NULL, NULL);
1465         if (ret != 0) {
1466                 return -1;
1467         }
1468         return res;
1469 }
1470
1471 /*
1472   find the real path to a ltdb 
1473  */
1474 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1475                    const char **path)
1476 {
1477         int ret;
1478         int32_t res;
1479         TDB_DATA data;
1480
1481         data.dptr = (uint8_t *)&dbid;
1482         data.dsize = sizeof(dbid);
1483
1484         ret = ctdb_control(ctdb, destnode, 0, 
1485                            CTDB_CONTROL_GETDBPATH, 0, data, 
1486                            mem_ctx, &data, &res, &timeout, NULL);
1487         if (ret != 0 || res != 0) {
1488                 return -1;
1489         }
1490
1491         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1492         if ((*path) == NULL) {
1493                 return -1;
1494         }
1495
1496         talloc_free(data.dptr);
1497
1498         return 0;
1499 }
1500
1501 /*
1502   find the name of a db 
1503  */
1504 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1505                    const char **name)
1506 {
1507         int ret;
1508         int32_t res;
1509         TDB_DATA data;
1510
1511         data.dptr = (uint8_t *)&dbid;
1512         data.dsize = sizeof(dbid);
1513
1514         ret = ctdb_control(ctdb, destnode, 0, 
1515                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1516                            mem_ctx, &data, &res, &timeout, NULL);
1517         if (ret != 0 || res != 0) {
1518                 return -1;
1519         }
1520
1521         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1522         if ((*name) == NULL) {
1523                 return -1;
1524         }
1525
1526         talloc_free(data.dptr);
1527
1528         return 0;
1529 }
1530
1531 /*
1532   create a database
1533  */
1534 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1535                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1536 {
1537         int ret;
1538         int32_t res;
1539         TDB_DATA data;
1540
1541         data.dptr = discard_const(name);
1542         data.dsize = strlen(name)+1;
1543
1544         ret = ctdb_control(ctdb, destnode, 0, 
1545                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1546                            0, data, 
1547                            mem_ctx, &data, &res, &timeout, NULL);
1548
1549         if (ret != 0 || res != 0) {
1550                 return -1;
1551         }
1552
1553         return 0;
1554 }
1555
1556 /*
1557   get debug level on a node
1558  */
1559 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1560 {
1561         int ret;
1562         int32_t res;
1563         TDB_DATA data;
1564
1565         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1566                            ctdb, &data, &res, NULL, NULL);
1567         if (ret != 0 || res != 0) {
1568                 return -1;
1569         }
1570         if (data.dsize != sizeof(int32_t)) {
1571                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1572                          (unsigned)data.dsize));
1573                 return -1;
1574         }
1575         *level = *(int32_t *)data.dptr;
1576         talloc_free(data.dptr);
1577         return 0;
1578 }
1579
1580 /*
1581   set debug level on a node
1582  */
1583 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1584 {
1585         int ret;
1586         int32_t res;
1587         TDB_DATA data;
1588
1589         data.dptr = (uint8_t *)&level;
1590         data.dsize = sizeof(level);
1591
1592         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1593                            NULL, NULL, &res, NULL, NULL);
1594         if (ret != 0 || res != 0) {
1595                 return -1;
1596         }
1597         return 0;
1598 }
1599
1600
1601 /*
1602   get a list of connected nodes
1603  */
1604 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1605                                 struct timeval timeout,
1606                                 TALLOC_CTX *mem_ctx,
1607                                 uint32_t *num_nodes)
1608 {
1609         struct ctdb_node_map *map=NULL;
1610         int ret, i;
1611         uint32_t *nodes;
1612
1613         *num_nodes = 0;
1614
1615         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1616         if (ret != 0) {
1617                 return NULL;
1618         }
1619
1620         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1621         if (nodes == NULL) {
1622                 return NULL;
1623         }
1624
1625         for (i=0;i<map->num;i++) {
1626                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1627                         nodes[*num_nodes] = map->nodes[i].pnn;
1628                         (*num_nodes)++;
1629                 }
1630         }
1631
1632         return nodes;
1633 }
1634
1635
1636 /*
1637   reset remote status
1638  */
1639 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1640 {
1641         int ret;
1642         int32_t res;
1643
1644         ret = ctdb_control(ctdb, destnode, 0, 
1645                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1646                            NULL, NULL, &res, NULL, NULL);
1647         if (ret != 0 || res != 0) {
1648                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1649                 return -1;
1650         }
1651         return 0;
1652 }
1653
1654 /*
1655   this is the dummy null procedure that all databases support
1656 */
1657 static int ctdb_null_func(struct ctdb_call_info *call)
1658 {
1659         return 0;
1660 }
1661
1662 /*
1663   this is a plain fetch procedure that all databases support
1664 */
1665 static int ctdb_fetch_func(struct ctdb_call_info *call)
1666 {
1667         call->reply_data = &call->record_data;
1668         return 0;
1669 }
1670
1671 /*
1672   attach to a specific database - client call
1673 */
1674 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1675 {
1676         struct ctdb_db_context *ctdb_db;
1677         TDB_DATA data;
1678         int ret;
1679         int32_t res;
1680
1681         ctdb_db = ctdb_db_handle(ctdb, name);
1682         if (ctdb_db) {
1683                 return ctdb_db;
1684         }
1685
1686         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1687         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1688
1689         ctdb_db->ctdb = ctdb;
1690         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1691         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1692
1693         data.dptr = discard_const(name);
1694         data.dsize = strlen(name)+1;
1695
1696         /* tell ctdb daemon to attach */
1697         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1698                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1699                            0, data, ctdb_db, &data, &res, NULL, NULL);
1700         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1701                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1702                 talloc_free(ctdb_db);
1703                 return NULL;
1704         }
1705         
1706         ctdb_db->db_id = *(uint32_t *)data.dptr;
1707         talloc_free(data.dptr);
1708
1709         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1710         if (ret != 0) {
1711                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1712                 talloc_free(ctdb_db);
1713                 return NULL;
1714         }
1715
1716         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1717         if (!ctdb->do_setsched) {
1718                 tdb_flags |= TDB_NOMMAP;
1719         }
1720
1721         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1722         if (ctdb_db->ltdb == NULL) {
1723                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1724                 talloc_free(ctdb_db);
1725                 return NULL;
1726         }
1727
1728         ctdb_db->persistent = persistent;
1729
1730         DLIST_ADD(ctdb->db_list, ctdb_db);
1731
1732         /* add well known functions */
1733         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1734         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1735
1736         return ctdb_db;
1737 }
1738
1739
1740 /*
1741   setup a call for a database
1742  */
1743 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1744 {
1745         struct ctdb_registered_call *call;
1746
1747 #if 0
1748         TDB_DATA data;
1749         int32_t status;
1750         struct ctdb_control_set_call c;
1751         int ret;
1752
1753         /* this is no longer valid with the separate daemon architecture */
1754         c.db_id = ctdb_db->db_id;
1755         c.fn    = fn;
1756         c.id    = id;
1757
1758         data.dptr = (uint8_t *)&c;
1759         data.dsize = sizeof(c);
1760
1761         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1762                            data, NULL, NULL, &status, NULL, NULL);
1763         if (ret != 0 || status != 0) {
1764                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1765                 return -1;
1766         }
1767 #endif
1768
1769         /* also register locally */
1770         call = talloc(ctdb_db, struct ctdb_registered_call);
1771         call->fn = fn;
1772         call->id = id;
1773
1774         DLIST_ADD(ctdb_db->calls, call);        
1775         return 0;
1776 }
1777
1778
1779 struct traverse_state {
1780         bool done;
1781         uint32_t count;
1782         ctdb_traverse_func fn;
1783         void *private_data;
1784 };
1785
1786 /*
1787   called on each key during a ctdb_traverse
1788  */
1789 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1790 {
1791         struct traverse_state *state = (struct traverse_state *)p;
1792         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1793         TDB_DATA key;
1794
1795         if (data.dsize < sizeof(uint32_t) ||
1796             d->length != data.dsize) {
1797                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1798                 state->done = True;
1799                 return;
1800         }
1801
1802         key.dsize = d->keylen;
1803         key.dptr  = &d->data[0];
1804         data.dsize = d->datalen;
1805         data.dptr = &d->data[d->keylen];
1806
1807         if (key.dsize == 0 && data.dsize == 0) {
1808                 /* end of traverse */
1809                 state->done = True;
1810                 return;
1811         }
1812
1813         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1814                 /* empty records are deleted records in ctdb */
1815                 return;
1816         }
1817
1818         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1819                 state->done = True;
1820         }
1821
1822         state->count++;
1823 }
1824
1825
1826 /*
1827   start a cluster wide traverse, calling the supplied fn on each record
1828   return the number of records traversed, or -1 on error
1829  */
1830 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1831 {
1832         TDB_DATA data;
1833         struct ctdb_traverse_start t;
1834         int32_t status;
1835         int ret;
1836         uint64_t srvid = (getpid() | 0xFLL<<60);
1837         struct traverse_state state;
1838
1839         state.done = False;
1840         state.count = 0;
1841         state.private_data = private_data;
1842         state.fn = fn;
1843
1844         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1845         if (ret != 0) {
1846                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1847                 return -1;
1848         }
1849
1850         t.db_id = ctdb_db->db_id;
1851         t.srvid = srvid;
1852         t.reqid = 0;
1853
1854         data.dptr = (uint8_t *)&t;
1855         data.dsize = sizeof(t);
1856
1857         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1858                            data, NULL, NULL, &status, NULL, NULL);
1859         if (ret != 0 || status != 0) {
1860                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1861                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1862                 return -1;
1863         }
1864
1865         while (!state.done) {
1866                 event_loop_once(ctdb_db->ctdb->ev);
1867         }
1868
1869         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1870         if (ret != 0) {
1871                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1872                 return -1;
1873         }
1874
1875         return state.count;
1876 }
1877
1878 #define ISASCII(x) ((x>31)&&(x<128))
1879 /*
1880   called on each key during a catdb
1881  */
1882 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1883 {
1884         int i;
1885         FILE *f = (FILE *)p;
1886         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1887
1888         fprintf(f, "dmaster: %u\n", h->dmaster);
1889         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1890
1891         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1892         for (i=0;i<key.dsize;i++) {
1893                 if (ISASCII(key.dptr[i])) {
1894                         fprintf(f, "%c", key.dptr[i]);
1895                 } else {
1896                         fprintf(f, "\\%02X", key.dptr[i]);
1897                 }
1898         }
1899         fprintf(f, "\"\n");
1900
1901         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1902         for (i=sizeof(*h);i<data.dsize;i++) {
1903                 if (ISASCII(data.dptr[i])) {
1904                         fprintf(f, "%c", data.dptr[i]);
1905                 } else {
1906                         fprintf(f, "\\%02X", data.dptr[i]);
1907                 }
1908         }
1909         fprintf(f, "\"\n");
1910
1911         return 0;
1912 }
1913
1914 /*
1915   convenience function to list all keys to stdout
1916  */
1917 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1918 {
1919         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1920 }
1921
1922 /*
1923   get the pid of a ctdb daemon
1924  */
1925 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1926 {
1927         int ret;
1928         int32_t res;
1929
1930         ret = ctdb_control(ctdb, destnode, 0, 
1931                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1932                            NULL, NULL, &res, &timeout, NULL);
1933         if (ret != 0) {
1934                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1935                 return -1;
1936         }
1937
1938         *pid = res;
1939
1940         return 0;
1941 }
1942
1943
1944 /*
1945   async freeze send control
1946  */
1947 struct ctdb_client_control_state *
1948 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1949 {
1950         return ctdb_control_send(ctdb, destnode, 0, 
1951                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1952                            mem_ctx, &timeout, NULL);
1953 }
1954
1955 /* 
1956    async freeze recv control
1957 */
1958 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1959 {
1960         int ret;
1961         int32_t res;
1962
1963         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1964         if ( (ret != 0) || (res != 0) ){
1965                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1966                 return -1;
1967         }
1968
1969         return 0;
1970 }
1971
1972 /*
1973   freeze a node
1974  */
1975 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1976 {
1977         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1978         struct ctdb_client_control_state *state;
1979         int ret;
1980
1981         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1982         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1983         talloc_free(tmp_ctx);
1984
1985         return ret;
1986 }
1987
1988 /*
1989   thaw a node
1990  */
1991 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1992 {
1993         int ret;
1994         int32_t res;
1995
1996         ret = ctdb_control(ctdb, destnode, 0, 
1997                            CTDB_CONTROL_THAW, 0, tdb_null, 
1998                            NULL, NULL, &res, &timeout, NULL);
1999         if (ret != 0 || res != 0) {
2000                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2001                 return -1;
2002         }
2003
2004         return 0;
2005 }
2006
2007 /*
2008   get pnn of a node, or -1
2009  */
2010 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2011 {
2012         int ret;
2013         int32_t res;
2014
2015         ret = ctdb_control(ctdb, destnode, 0, 
2016                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2017                            NULL, NULL, &res, &timeout, NULL);
2018         if (ret != 0) {
2019                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2020                 return -1;
2021         }
2022
2023         return res;
2024 }
2025
2026 /*
2027   get the monitoring mode of a remote node
2028  */
2029 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2030 {
2031         int ret;
2032         int32_t res;
2033
2034         ret = ctdb_control(ctdb, destnode, 0, 
2035                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2036                            NULL, NULL, &res, &timeout, NULL);
2037         if (ret != 0) {
2038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2039                 return -1;
2040         }
2041
2042         *monmode = res;
2043
2044         return 0;
2045 }
2046
2047
2048 /*
2049  set the monitoring mode of a remote node to active
2050  */
2051 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2052 {
2053         int ret;
2054         
2055
2056         ret = ctdb_control(ctdb, destnode, 0, 
2057                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2058                            NULL, NULL,NULL, &timeout, NULL);
2059         if (ret != 0) {
2060                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2061                 return -1;
2062         }
2063
2064         
2065
2066         return 0;
2067 }
2068
2069 /*
2070   set the monitoring mode of a remote node to disable
2071  */
2072 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2073 {
2074         int ret;
2075         
2076
2077         ret = ctdb_control(ctdb, destnode, 0, 
2078                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2079                            NULL, NULL, NULL, &timeout, NULL);
2080         if (ret != 0) {
2081                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2082                 return -1;
2083         }
2084
2085         
2086
2087         return 0;
2088 }
2089
2090
2091
2092 /* 
2093   sent to a node to make it take over an ip address
2094 */
2095 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2096                           uint32_t destnode, struct ctdb_public_ip *ip)
2097 {
2098         TDB_DATA data;
2099         struct ctdb_public_ipv4 ipv4;
2100         int ret;
2101         int32_t res;
2102
2103         if (ip->addr.sa.sa_family == AF_INET) {
2104                 ipv4.pnn = ip->pnn;
2105                 ipv4.sin = ip->addr.ip;
2106
2107                 data.dsize = sizeof(ipv4);
2108                 data.dptr  = (uint8_t *)&ipv4;
2109
2110                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2111                            NULL, &res, &timeout, NULL);
2112         } else {
2113                 data.dsize = sizeof(*ip);
2114                 data.dptr  = (uint8_t *)ip;
2115
2116                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2117                            NULL, &res, &timeout, NULL);
2118         }
2119
2120         if (ret != 0 || res != 0) {
2121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2122                 return -1;
2123         }
2124
2125         return 0;       
2126 }
2127
2128
2129 /* 
2130   sent to a node to make it release an ip address
2131 */
2132 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2133                          uint32_t destnode, struct ctdb_public_ip *ip)
2134 {
2135         TDB_DATA data;
2136         struct ctdb_public_ipv4 ipv4;
2137         int ret;
2138         int32_t res;
2139
2140         if (ip->addr.sa.sa_family == AF_INET) {
2141                 ipv4.pnn = ip->pnn;
2142                 ipv4.sin = ip->addr.ip;
2143
2144                 data.dsize = sizeof(ipv4);
2145                 data.dptr  = (uint8_t *)&ipv4;
2146
2147                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2148                                    NULL, &res, &timeout, NULL);
2149         } else {
2150                 data.dsize = sizeof(*ip);
2151                 data.dptr  = (uint8_t *)ip;
2152
2153                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2154                                    NULL, &res, &timeout, NULL);
2155         }
2156
2157         if (ret != 0 || res != 0) {
2158                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2159                 return -1;
2160         }
2161
2162         return 0;       
2163 }
2164
2165
2166 /*
2167   get a tunable
2168  */
2169 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2170                           struct timeval timeout, 
2171                           uint32_t destnode,
2172                           const char *name, uint32_t *value)
2173 {
2174         struct ctdb_control_get_tunable *t;
2175         TDB_DATA data, outdata;
2176         int32_t res;
2177         int ret;
2178
2179         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2180         data.dptr  = talloc_size(ctdb, data.dsize);
2181         CTDB_NO_MEMORY(ctdb, data.dptr);
2182
2183         t = (struct ctdb_control_get_tunable *)data.dptr;
2184         t->length = strlen(name)+1;
2185         memcpy(t->name, name, t->length);
2186
2187         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2188                            &outdata, &res, &timeout, NULL);
2189         talloc_free(data.dptr);
2190         if (ret != 0 || res != 0) {
2191                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2192                 return -1;
2193         }
2194
2195         if (outdata.dsize != sizeof(uint32_t)) {
2196                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2197                 talloc_free(outdata.dptr);
2198                 return -1;
2199         }
2200         
2201         *value = *(uint32_t *)outdata.dptr;
2202         talloc_free(outdata.dptr);
2203
2204         return 0;
2205 }
2206
2207 /*
2208   set a tunable
2209  */
2210 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2211                           struct timeval timeout, 
2212                           uint32_t destnode,
2213                           const char *name, uint32_t value)
2214 {
2215         struct ctdb_control_set_tunable *t;
2216         TDB_DATA data;
2217         int32_t res;
2218         int ret;
2219
2220         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2221         data.dptr  = talloc_size(ctdb, data.dsize);
2222         CTDB_NO_MEMORY(ctdb, data.dptr);
2223
2224         t = (struct ctdb_control_set_tunable *)data.dptr;
2225         t->length = strlen(name)+1;
2226         memcpy(t->name, name, t->length);
2227         t->value = value;
2228
2229         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2230                            NULL, &res, &timeout, NULL);
2231         talloc_free(data.dptr);
2232         if (ret != 0 || res != 0) {
2233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2234                 return -1;
2235         }
2236
2237         return 0;
2238 }
2239
2240 /*
2241   list tunables
2242  */
2243 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2244                             struct timeval timeout, 
2245                             uint32_t destnode,
2246                             TALLOC_CTX *mem_ctx,
2247                             const char ***list, uint32_t *count)
2248 {
2249         TDB_DATA outdata;
2250         int32_t res;
2251         int ret;
2252         struct ctdb_control_list_tunable *t;
2253         char *p, *s, *ptr;
2254
2255         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2256                            mem_ctx, &outdata, &res, &timeout, NULL);
2257         if (ret != 0 || res != 0) {
2258                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2259                 return -1;
2260         }
2261
2262         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2263         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2264             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2265                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2266                 talloc_free(outdata.dptr);
2267                 return -1;              
2268         }
2269         
2270         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2271         CTDB_NO_MEMORY(ctdb, p);
2272
2273         talloc_free(outdata.dptr);
2274         
2275         (*list) = NULL;
2276         (*count) = 0;
2277
2278         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2279                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2280                 CTDB_NO_MEMORY(ctdb, *list);
2281                 (*list)[*count] = talloc_strdup(*list, s);
2282                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2283                 (*count)++;
2284         }
2285
2286         talloc_free(p);
2287
2288         return 0;
2289 }
2290
2291
2292 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2293                         struct timeval timeout, uint32_t destnode, 
2294                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2295 {
2296         int ret;
2297         TDB_DATA outdata;
2298         int32_t res;
2299
2300         ret = ctdb_control(ctdb, destnode, 0, 
2301                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2302                            mem_ctx, &outdata, &res, &timeout, NULL);
2303         if (ret == 0 && res == -1) {
2304                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2305                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2306         }
2307         if (ret != 0 || res != 0) {
2308           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2309                 return -1;
2310         }
2311
2312         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2313         talloc_free(outdata.dptr);
2314                     
2315         return 0;
2316 }
2317
2318 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2319                         struct timeval timeout, uint32_t destnode, 
2320                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2321 {
2322         int ret, i, len;
2323         TDB_DATA outdata;
2324         int32_t res;
2325         struct ctdb_all_public_ipsv4 *ipsv4;
2326
2327         ret = ctdb_control(ctdb, destnode, 0, 
2328                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2329                            mem_ctx, &outdata, &res, &timeout, NULL);
2330         if (ret != 0 || res != 0) {
2331                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2332                 return -1;
2333         }
2334
2335         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2336         len = offsetof(struct ctdb_all_public_ips, ips) +
2337                 ipsv4->num*sizeof(struct ctdb_public_ip);
2338         *ips = talloc_zero_size(mem_ctx, len);
2339         CTDB_NO_MEMORY(ctdb, *ips);
2340         (*ips)->num = ipsv4->num;
2341         for (i=0; i<ipsv4->num; i++) {
2342                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2343                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2344         }
2345
2346         talloc_free(outdata.dptr);
2347                     
2348         return 0;
2349 }
2350
2351 /*
2352   set/clear the permanent disabled bit on a remote node
2353  */
2354 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2355                        uint32_t set, uint32_t clear)
2356 {
2357         int ret;
2358         TDB_DATA data;
2359         struct ctdb_node_map *nodemap=NULL;
2360         struct ctdb_node_flag_change c;
2361         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2362         uint32_t recmaster;
2363         uint32_t *nodes;
2364
2365
2366         /* find the recovery master */
2367         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2368         if (ret != 0) {
2369                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2370                 talloc_free(tmp_ctx);
2371                 return ret;
2372         }
2373
2374
2375         /* read the node flags from the recmaster */
2376         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2377         if (ret != 0) {
2378                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2379                 talloc_free(tmp_ctx);
2380                 return -1;
2381         }
2382         if (destnode >= nodemap->num) {
2383                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2384                 talloc_free(tmp_ctx);
2385                 return -1;
2386         }
2387
2388         c.pnn       = destnode;
2389         c.old_flags = nodemap->nodes[destnode].flags;
2390         c.new_flags = c.old_flags;
2391         c.new_flags |= set;
2392         c.new_flags &= ~clear;
2393
2394         data.dsize = sizeof(c);
2395         data.dptr = (unsigned char *)&c;
2396
2397         /* send the flags update to all connected nodes */
2398         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2399
2400         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2401                                         nodes,
2402                                         timeout, false, data,
2403                                         NULL, NULL,
2404                                         NULL) != 0) {
2405                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to disable node failed\n"));
2406
2407                 talloc_free(tmp_ctx);
2408                 return -1;
2409         }
2410
2411         talloc_free(tmp_ctx);
2412         return 0;
2413 }
2414
2415
2416 /*
2417   get all tunables
2418  */
2419 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2420                                struct timeval timeout, 
2421                                uint32_t destnode,
2422                                struct ctdb_tunable *tunables)
2423 {
2424         TDB_DATA outdata;
2425         int ret;
2426         int32_t res;
2427
2428         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2429                            &outdata, &res, &timeout, NULL);
2430         if (ret != 0 || res != 0) {
2431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2432                 return -1;
2433         }
2434
2435         if (outdata.dsize != sizeof(*tunables)) {
2436                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2437                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2438                 return -1;              
2439         }
2440
2441         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2442         talloc_free(outdata.dptr);
2443         return 0;
2444 }
2445
2446 /*
2447   add a public address to a node
2448  */
2449 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2450                       struct timeval timeout, 
2451                       uint32_t destnode,
2452                       struct ctdb_control_ip_iface *pub)
2453 {
2454         TDB_DATA data;
2455         int32_t res;
2456         int ret;
2457
2458         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2459         data.dptr  = (unsigned char *)pub;
2460
2461         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2462                            NULL, &res, &timeout, NULL);
2463         if (ret != 0 || res != 0) {
2464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2465                 return -1;
2466         }
2467
2468         return 0;
2469 }
2470
2471 /*
2472   delete a public address from a node
2473  */
2474 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2475                       struct timeval timeout, 
2476                       uint32_t destnode,
2477                       struct ctdb_control_ip_iface *pub)
2478 {
2479         TDB_DATA data;
2480         int32_t res;
2481         int ret;
2482
2483         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2484         data.dptr  = (unsigned char *)pub;
2485
2486         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2487                            NULL, &res, &timeout, NULL);
2488         if (ret != 0 || res != 0) {
2489                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2490                 return -1;
2491         }
2492
2493         return 0;
2494 }
2495
2496 /*
2497   kill a tcp connection
2498  */
2499 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2500                       struct timeval timeout, 
2501                       uint32_t destnode,
2502                       struct ctdb_control_killtcp *killtcp)
2503 {
2504         TDB_DATA data;
2505         int32_t res;
2506         int ret;
2507
2508         data.dsize = sizeof(struct ctdb_control_killtcp);
2509         data.dptr  = (unsigned char *)killtcp;
2510
2511         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2512                            NULL, &res, &timeout, NULL);
2513         if (ret != 0 || res != 0) {
2514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2515                 return -1;
2516         }
2517
2518         return 0;
2519 }
2520
2521 /*
2522   send a gratious arp
2523  */
2524 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2525                       struct timeval timeout, 
2526                       uint32_t destnode,
2527                       ctdb_sock_addr *addr,
2528                       const char *ifname)
2529 {
2530         TDB_DATA data;
2531         int32_t res;
2532         int ret, len;
2533         struct ctdb_control_gratious_arp *gratious_arp;
2534         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2535
2536
2537         len = strlen(ifname)+1;
2538         gratious_arp = talloc_size(tmp_ctx, 
2539                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2540         CTDB_NO_MEMORY(ctdb, gratious_arp);
2541
2542         gratious_arp->addr = *addr;
2543         gratious_arp->len = len;
2544         memcpy(&gratious_arp->iface[0], ifname, len);
2545
2546
2547         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2548         data.dptr  = (unsigned char *)gratious_arp;
2549
2550         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2551                            NULL, &res, &timeout, NULL);
2552         if (ret != 0 || res != 0) {
2553                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2554                 talloc_free(tmp_ctx);
2555                 return -1;
2556         }
2557
2558         talloc_free(tmp_ctx);
2559         return 0;
2560 }
2561
2562 /*
2563   get a list of all tcp tickles that a node knows about for a particular vnn
2564  */
2565 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2566                               struct timeval timeout, uint32_t destnode, 
2567                               TALLOC_CTX *mem_ctx, 
2568                               ctdb_sock_addr *addr,
2569                               struct ctdb_control_tcp_tickle_list **list)
2570 {
2571         int ret;
2572         TDB_DATA data, outdata;
2573         int32_t status;
2574
2575         data.dptr = (uint8_t*)addr;
2576         data.dsize = sizeof(ctdb_sock_addr);
2577
2578         ret = ctdb_control(ctdb, destnode, 0, 
2579                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2580                            mem_ctx, &outdata, &status, NULL, NULL);
2581         if (ret != 0 || status != 0) {
2582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2583                 return -1;
2584         }
2585
2586         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2587
2588         return status;
2589 }
2590
2591 /*
2592   register a server id
2593  */
2594 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2595                       struct timeval timeout, 
2596                       struct ctdb_server_id *id)
2597 {
2598         TDB_DATA data;
2599         int32_t res;
2600         int ret;
2601
2602         data.dsize = sizeof(struct ctdb_server_id);
2603         data.dptr  = (unsigned char *)id;
2604
2605         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2606                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2607                         0, data, NULL,
2608                         NULL, &res, &timeout, NULL);
2609         if (ret != 0 || res != 0) {
2610                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2611                 return -1;
2612         }
2613
2614         return 0;
2615 }
2616
2617 /*
2618   unregister a server id
2619  */
2620 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2621                       struct timeval timeout, 
2622                       struct ctdb_server_id *id)
2623 {
2624         TDB_DATA data;
2625         int32_t res;
2626         int ret;
2627
2628         data.dsize = sizeof(struct ctdb_server_id);
2629         data.dptr  = (unsigned char *)id;
2630
2631         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2632                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2633                         0, data, NULL,
2634                         NULL, &res, &timeout, NULL);
2635         if (ret != 0 || res != 0) {
2636                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2637                 return -1;
2638         }
2639
2640         return 0;
2641 }
2642
2643
2644 /*
2645   check if a server id exists
2646
2647   if a server id does exist, return *status == 1, otherwise *status == 0
2648  */
2649 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2650                       struct timeval timeout, 
2651                       uint32_t destnode,
2652                       struct ctdb_server_id *id,
2653                       uint32_t *status)
2654 {
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = sizeof(struct ctdb_server_id);
2660         data.dptr  = (unsigned char *)id;
2661
2662         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2663                         0, data, NULL,
2664                         NULL, &res, &timeout, NULL);
2665         if (ret != 0) {
2666                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2667                 return -1;
2668         }
2669
2670         if (res) {
2671                 *status = 1;
2672         } else {
2673                 *status = 0;
2674         }
2675
2676         return 0;
2677 }
2678
2679 /*
2680    get the list of server ids that are registered on a node
2681 */
2682 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2683                 TALLOC_CTX *mem_ctx,
2684                 struct timeval timeout, uint32_t destnode, 
2685                 struct ctdb_server_id_list **svid_list)
2686 {
2687         int ret;
2688         TDB_DATA outdata;
2689         int32_t res;
2690
2691         ret = ctdb_control(ctdb, destnode, 0, 
2692                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2693                            mem_ctx, &outdata, &res, &timeout, NULL);
2694         if (ret != 0 || res != 0) {
2695                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2696                 return -1;
2697         }
2698
2699         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2700                     
2701         return 0;
2702 }
2703
2704 /*
2705   initialise the ctdb daemon for client applications
2706
2707   NOTE: In current code the daemon does not fork. This is for testing purposes only
2708   and to simplify the code.
2709 */
2710 struct ctdb_context *ctdb_init(struct event_context *ev)
2711 {
2712         int ret;
2713         struct ctdb_context *ctdb;
2714
2715         ctdb = talloc_zero(ev, struct ctdb_context);
2716         if (ctdb == NULL) {
2717                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2718                 return NULL;
2719         }
2720         ctdb->ev  = ev;
2721         ctdb->idr = idr_init(ctdb);
2722         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2723
2724         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2725         if (ret != 0) {
2726                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2727                 talloc_free(ctdb);
2728                 return NULL;
2729         }
2730
2731         return ctdb;
2732 }
2733
2734
2735 /*
2736   set some ctdb flags
2737 */
2738 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2739 {
2740         ctdb->flags |= flags;
2741 }
2742
2743 /*
2744   setup the local socket name
2745 */
2746 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2747 {
2748         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2749         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2750
2751         return 0;
2752 }
2753
2754 /*
2755   return the pnn of this node
2756 */
2757 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2758 {
2759         return ctdb->pnn;
2760 }
2761
2762
2763 /*
2764   get the uptime of a remote node
2765  */
2766 struct ctdb_client_control_state *
2767 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2768 {
2769         return ctdb_control_send(ctdb, destnode, 0, 
2770                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2771                            mem_ctx, &timeout, NULL);
2772 }
2773
2774 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2775 {
2776         int ret;
2777         int32_t res;
2778         TDB_DATA outdata;
2779
2780         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2783                 return -1;
2784         }
2785
2786         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2787
2788         return 0;
2789 }
2790
2791 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2792 {
2793         struct ctdb_client_control_state *state;
2794
2795         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2796         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2797 }
2798
2799 /*
2800   send a control to execute the "recovered" event script on a node
2801  */
2802 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2803 {
2804         int ret;
2805         int32_t status;
2806
2807         ret = ctdb_control(ctdb, destnode, 0, 
2808                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2809                            NULL, NULL, &status, &timeout, NULL);
2810         if (ret != 0 || status != 0) {
2811                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2812                 return -1;
2813         }
2814
2815         return 0;
2816 }
2817
2818 /* 
2819   callback for the async helpers used when sending the same control
2820   to multiple nodes in parallell.
2821 */
2822 static void async_callback(struct ctdb_client_control_state *state)
2823 {
2824         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2825         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2826         int ret;
2827         TDB_DATA outdata;
2828         int32_t res;
2829         uint32_t destnode = state->c->hdr.destnode;
2830
2831         /* one more node has responded with recmode data */
2832         data->count--;
2833
2834         /* if we failed to push the db, then return an error and let
2835            the main loop try again.
2836         */
2837         if (state->state != CTDB_CONTROL_DONE) {
2838                 if ( !data->dont_log_errors) {
2839                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2840                 }
2841                 data->fail_count++;
2842                 if (data->fail_callback) {
2843                         data->fail_callback(ctdb, destnode, res, outdata,
2844                                         data->callback_data);
2845                 }
2846                 return;
2847         }
2848         
2849         state->async.fn = NULL;
2850
2851         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2852         if ((ret != 0) || (res != 0)) {
2853                 if ( !data->dont_log_errors) {
2854                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2855                 }
2856                 data->fail_count++;
2857                 if (data->fail_callback) {
2858                         data->fail_callback(ctdb, destnode, res, outdata,
2859                                         data->callback_data);
2860                 }
2861         }
2862         if ((ret == 0) && (data->callback != NULL)) {
2863                 data->callback(ctdb, destnode, res, outdata,
2864                                         data->callback_data);
2865         }
2866 }
2867
2868
2869 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2870 {
2871         /* set up the callback functions */
2872         state->async.fn = async_callback;
2873         state->async.private_data = data;
2874         
2875         /* one more control to wait for to complete */
2876         data->count++;
2877 }
2878
2879
2880 /* wait for up to the maximum number of seconds allowed
2881    or until all nodes we expect a response from has replied
2882 */
2883 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2884 {
2885         while (data->count > 0) {
2886                 event_loop_once(ctdb->ev);
2887         }
2888         if (data->fail_count != 0) {
2889                 if (!data->dont_log_errors) {
2890                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2891                                  data->fail_count));
2892                 }
2893                 return -1;
2894         }
2895         return 0;
2896 }
2897
2898
2899 /* 
2900    perform a simple control on the listed nodes
2901    The control cannot return data
2902  */
2903 int ctdb_client_async_control(struct ctdb_context *ctdb,
2904                                 enum ctdb_controls opcode,
2905                                 uint32_t *nodes,
2906                                 struct timeval timeout,
2907                                 bool dont_log_errors,
2908                                 TDB_DATA data,
2909                                 client_async_callback client_callback,
2910                                 client_async_callback fail_callback,
2911                                 void *callback_data)
2912 {
2913         struct client_async_data *async_data;
2914         struct ctdb_client_control_state *state;
2915         int j, num_nodes;
2916
2917         async_data = talloc_zero(ctdb, struct client_async_data);
2918         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2919         async_data->dont_log_errors = dont_log_errors;
2920         async_data->callback = client_callback;
2921         async_data->fail_callback = fail_callback;
2922         async_data->callback_data = callback_data;
2923         async_data->opcode        = opcode;
2924
2925         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2926
2927         /* loop over all nodes and send an async control to each of them */
2928         for (j=0; j<num_nodes; j++) {
2929                 uint32_t pnn = nodes[j];
2930
2931                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2932                                           0, data, async_data, &timeout, NULL);
2933                 if (state == NULL) {
2934                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2935                         talloc_free(async_data);
2936                         return -1;
2937                 }
2938                 
2939                 ctdb_client_async_add(async_data, state);
2940         }
2941
2942         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2943                 talloc_free(async_data);
2944                 return -1;
2945         }
2946
2947         talloc_free(async_data);
2948         return 0;
2949 }
2950
2951 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2952                                 struct ctdb_vnn_map *vnn_map,
2953                                 TALLOC_CTX *mem_ctx,
2954                                 bool include_self)
2955 {
2956         int i, j, num_nodes;
2957         uint32_t *nodes;
2958
2959         for (i=num_nodes=0;i<vnn_map->size;i++) {
2960                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2961                         continue;
2962                 }
2963                 num_nodes++;
2964         } 
2965
2966         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2967         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2968
2969         for (i=j=0;i<vnn_map->size;i++) {
2970                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2971                         continue;
2972                 }
2973                 nodes[j++] = vnn_map->map[i];
2974         } 
2975
2976         return nodes;
2977 }
2978
2979 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2980                                 struct ctdb_node_map *node_map,
2981                                 TALLOC_CTX *mem_ctx,
2982                                 bool include_self)
2983 {
2984         int i, j, num_nodes;
2985         uint32_t *nodes;
2986
2987         for (i=num_nodes=0;i<node_map->num;i++) {
2988                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2989                         continue;
2990                 }
2991                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2992                         continue;
2993                 }
2994                 num_nodes++;
2995         } 
2996
2997         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2998         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2999
3000         for (i=j=0;i<node_map->num;i++) {
3001                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3002                         continue;
3003                 }
3004                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3005                         continue;
3006                 }
3007                 nodes[j++] = node_map->nodes[i].pnn;
3008         } 
3009
3010         return nodes;
3011 }
3012
3013 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3014                                 struct ctdb_node_map *node_map,
3015                                 TALLOC_CTX *mem_ctx,
3016                                 bool include_self)
3017 {
3018         int i, j, num_nodes;
3019         uint32_t *nodes;
3020
3021         for (i=num_nodes=0;i<node_map->num;i++) {
3022                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3023                         continue;
3024                 }
3025                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3026                         continue;
3027                 }
3028                 num_nodes++;
3029         } 
3030
3031         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3032         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3033
3034         for (i=j=0;i<node_map->num;i++) {
3035                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3036                         continue;
3037                 }
3038                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3039                         continue;
3040                 }
3041                 nodes[j++] = node_map->nodes[i].pnn;
3042         } 
3043
3044         return nodes;
3045 }
3046
3047 /* 
3048   this is used to test if a pnn lock exists and if it exists will return
3049   the number of connections that pnn has reported or -1 if that recovery
3050   daemon is not running.
3051 */
3052 int
3053 ctdb_read_pnn_lock(int fd, int32_t pnn)
3054 {
3055         struct flock lock;
3056         char c;
3057
3058         lock.l_type = F_WRLCK;
3059         lock.l_whence = SEEK_SET;
3060         lock.l_start = pnn;
3061         lock.l_len = 1;
3062         lock.l_pid = 0;
3063
3064         if (fcntl(fd, F_GETLK, &lock) != 0) {
3065                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3066                 return -1;
3067         }
3068
3069         if (lock.l_type == F_UNLCK) {
3070                 return -1;
3071         }
3072
3073         if (pread(fd, &c, 1, pnn) == -1) {
3074                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3075                 return -1;
3076         }
3077
3078         return c;
3079 }
3080
3081 /*
3082   get capabilities of a remote node
3083  */
3084 struct ctdb_client_control_state *
3085 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3086 {
3087         return ctdb_control_send(ctdb, destnode, 0, 
3088                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3089                            mem_ctx, &timeout, NULL);
3090 }
3091
3092 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3093 {
3094         int ret;
3095         int32_t res;
3096         TDB_DATA outdata;
3097
3098         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3099         if ( (ret != 0) || (res != 0) ) {
3100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3101                 return -1;
3102         }
3103
3104         if (capabilities) {
3105                 *capabilities = *((uint32_t *)outdata.dptr);
3106         }
3107
3108         return 0;
3109 }
3110
3111 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3112 {
3113         struct ctdb_client_control_state *state;
3114         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3115         int ret;
3116
3117         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3118         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3119         talloc_free(tmp_ctx);
3120         return ret;
3121 }
3122
3123 struct ctdb_transaction_handle {
3124         struct ctdb_db_context *ctdb_db;
3125         bool in_replay;
3126         /* we store the reads and writes done under a transaction one
3127            list stores both reads and writes, the other just writes
3128         */
3129         struct ctdb_marshall_buffer *m_all;
3130         struct ctdb_marshall_buffer *m_write;
3131 };
3132
3133 /* start a transaction on a database */
3134 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3135 {
3136         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3137         return 0;
3138 }
3139
3140 /* start a transaction on a database */
3141 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3142 {
3143         struct ctdb_record_handle *rh;
3144         TDB_DATA key;
3145         struct ctdb_ltdb_header header;
3146         TALLOC_CTX *tmp_ctx;
3147         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3148         int ret;
3149         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3150
3151         key.dptr = discard_const(keyname);
3152         key.dsize = strlen(keyname);
3153
3154         if (!ctdb_db->persistent) {
3155                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3156                 return -1;
3157         }
3158
3159 again:
3160         tmp_ctx = talloc_new(h);
3161
3162         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3163         if (rh == NULL) {
3164                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3165                 talloc_free(tmp_ctx);
3166                 return -1;
3167         }
3168         talloc_free(rh);
3169
3170         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3171         if (ret != 0) {
3172                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3173                 talloc_free(tmp_ctx);
3174                 return -1;
3175         }
3176
3177         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
3178         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3179                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3180                 talloc_free(tmp_ctx);
3181                 goto again;
3182         }
3183
3184         talloc_free(tmp_ctx);
3185
3186         return 0;
3187 }
3188
3189
3190 /* start a transaction on a database */
3191 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3192                                                        TALLOC_CTX *mem_ctx)
3193 {
3194         struct ctdb_transaction_handle *h;
3195         int ret;
3196
3197         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3198         if (h == NULL) {
3199                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3200                 return NULL;
3201         }
3202
3203         h->ctdb_db = ctdb_db;
3204
3205         ret = ctdb_transaction_fetch_start(h);
3206         if (ret != 0) {
3207                 talloc_free(h);
3208                 return NULL;
3209         }
3210
3211         talloc_set_destructor(h, ctdb_transaction_destructor);
3212
3213         return h;
3214 }
3215
3216
3217
3218 /*
3219   fetch a record inside a transaction
3220  */
3221 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3222                            TALLOC_CTX *mem_ctx, 
3223                            TDB_DATA key, TDB_DATA *data)
3224 {
3225         struct ctdb_ltdb_header header;
3226         int ret;
3227
3228         ZERO_STRUCT(header);
3229
3230         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3231         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3232                 /* record doesn't exist yet */
3233                 *data = tdb_null;
3234                 ret = 0;
3235         }
3236         
3237         if (ret != 0) {
3238                 return ret;
3239         }
3240
3241         if (!h->in_replay) {
3242                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3243                 if (h->m_all == NULL) {
3244                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3245                         return -1;
3246                 }
3247         }
3248
3249         return 0;
3250 }
3251
3252 /*
3253   stores a record inside a transaction
3254  */
3255 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3256                            TDB_DATA key, TDB_DATA data)
3257 {
3258         TALLOC_CTX *tmp_ctx = talloc_new(h);
3259         struct ctdb_ltdb_header header;
3260         TDB_DATA olddata;
3261         int ret;
3262
3263         ZERO_STRUCT(header);
3264
3265         /* we need the header so we can update the RSN */
3266         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3267         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3268                 /* the record doesn't exist - create one with us as dmaster.
3269                    This is only safe because we are in a transaction and this
3270                    is a persistent database */
3271                 ZERO_STRUCT(header);
3272         } else if (ret != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3274                 talloc_free(tmp_ctx);
3275                 return ret;
3276         }
3277
3278         if (data.dsize == olddata.dsize &&
3279             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3280                 /* save writing the same data */
3281                 talloc_free(tmp_ctx);
3282                 return 0;
3283         }
3284
3285         header.dmaster = h->ctdb_db->ctdb->pnn;
3286         header.rsn++;
3287
3288         if (!h->in_replay) {
3289                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3290                 if (h->m_all == NULL) {
3291                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3292                         talloc_free(tmp_ctx);
3293                         return -1;
3294                 }
3295         }               
3296
3297         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3298         if (h->m_write == NULL) {
3299                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3300                 talloc_free(tmp_ctx);
3301                 return -1;
3302         }
3303         
3304         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3305
3306         talloc_free(tmp_ctx);
3307         
3308         return ret;
3309 }
3310
3311 /*
3312   replay a transaction
3313  */
3314 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3315 {
3316         int ret, i;
3317         struct ctdb_rec_data *rec = NULL;
3318
3319         h->in_replay = true;
3320         talloc_free(h->m_write);
3321         h->m_write = NULL;
3322
3323         ret = ctdb_transaction_fetch_start(h);
3324         if (ret != 0) {
3325                 return ret;
3326         }
3327
3328         for (i=0;i<h->m_all->count;i++) {
3329                 TDB_DATA key, data;
3330
3331                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3332                 if (rec == NULL) {
3333                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3334                         goto failed;
3335                 }
3336
3337                 if (rec->reqid == 0) {
3338                         /* its a store */
3339                         if (ctdb_transaction_store(h, key, data) != 0) {
3340                                 goto failed;
3341                         }
3342                 } else {
3343                         TDB_DATA data2;
3344                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3345
3346                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3347                                 talloc_free(tmp_ctx);
3348                                 goto failed;
3349                         }
3350                         if (data2.dsize != data.dsize ||
3351                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3352                                 /* the record has changed on us - we have to give up */
3353                                 talloc_free(tmp_ctx);
3354                                 goto failed;
3355                         }
3356                         talloc_free(tmp_ctx);
3357                 }
3358         }
3359         
3360         return 0;
3361
3362 failed:
3363         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3364         return -1;
3365 }
3366
3367
3368 /*
3369   commit a transaction
3370  */
3371 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3372 {
3373         int ret, retries=0;
3374         int32_t status;
3375         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3376         struct timeval timeout;
3377         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3378
3379         talloc_set_destructor(h, NULL);
3380
3381         /* our commit strategy is quite complex.
3382
3383            - we first try to commit the changes to all other nodes
3384
3385            - if that works, then we commit locally and we are done
3386
3387            - if a commit on another node fails, then we need to cancel
3388              the transaction, then restart the transaction (thus
3389              opening a window of time for a pending recovery to
3390              complete), then replay the transaction, checking all the
3391              reads and writes (checking that reads give the same data,
3392              and writes succeed). Then we retry the transaction to the
3393              other nodes
3394         */
3395
3396 again:
3397         if (h->m_write == NULL) {
3398                 /* no changes were made */
3399                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3400                 talloc_free(h);
3401                 return 0;
3402         }
3403
3404         /* tell ctdbd to commit to the other nodes */
3405         timeout = timeval_current_ofs(1, 0);
3406         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3407                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3408                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3409                            &timeout, NULL);
3410         if (ret != 0 || status != 0) {
3411                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3412                 sleep(1);
3413
3414                 if (ret != 0) {
3415                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3416                 } else {
3417                         /* work out what error code we will give if we 
3418                            have to fail the operation */
3419                         switch ((enum ctdb_trans2_commit_error)status) {
3420                         case CTDB_TRANS2_COMMIT_SUCCESS:
3421                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3422                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3423                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3424                                 break;
3425                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3426                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3427                                 break;
3428                         }
3429                 }
3430
3431                 if (++retries == 10) {
3432                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3433                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3434                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3435                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3436                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3437                         talloc_free(h);
3438                         return -1;
3439                 }               
3440
3441                 if (ctdb_replay_transaction(h) != 0) {
3442                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3443                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3444                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3445                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3446                         talloc_free(h);
3447                         return -1;
3448                 }
3449                 goto again;
3450         } else {
3451                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3452         }
3453
3454         /* do the real commit locally */
3455         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3456         if (ret != 0) {
3457                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3458                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3459                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3460                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3461                 talloc_free(h);
3462                 return ret;
3463         }
3464
3465         /* tell ctdbd that we are finished with our local commit */
3466         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3467                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3468                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3469         talloc_free(h);
3470         return 0;
3471 }
3472
3473 /*
3474   recovery daemon ping to main daemon
3475  */
3476 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3477 {
3478         int ret;
3479         int32_t res;
3480
3481         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3482                            ctdb, NULL, &res, NULL, NULL);
3483         if (ret != 0 || res != 0) {
3484                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3485                 return -1;
3486         }
3487
3488         return 0;
3489 }
3490
3491 /* when forking the main daemon and the child process needs to connect back
3492  * to the daemon as a client process, this function can be used to change
3493  * the ctdb context from daemon into client mode
3494  */
3495 int switch_from_server_to_client(struct ctdb_context *ctdb)
3496 {
3497         int ret;
3498
3499         /* shutdown the transport */
3500         if (ctdb->methods) {
3501                 ctdb->methods->shutdown(ctdb);
3502         }
3503
3504         /* get a new event context */
3505         talloc_free(ctdb->ev);
3506         ctdb->ev = event_context_init(ctdb);
3507
3508         close(ctdb->daemon.sd);
3509         ctdb->daemon.sd = -1;
3510
3511         /* the client does not need to be realtime */
3512         if (ctdb->do_setsched) {
3513                 ctdb_restore_scheduler(ctdb);
3514         }
3515
3516         /* initialise ctdb */
3517         ret = ctdb_socket_connect(ctdb);
3518         if (ret != 0) {
3519                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3520                 return -1;
3521         }
3522
3523          return 0;
3524 }
3525
3526 /*
3527   tell the main daemon we are starting a new monitor event script
3528  */
3529 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3530 {
3531         int ret;
3532         int32_t res;
3533
3534         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3535                            ctdb, NULL, &res, NULL, NULL);
3536         if (ret != 0 || res != 0) {
3537                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3538                 return -1;
3539         }
3540
3541         return 0;
3542 }
3543
3544 /*
3545   tell the main daemon we are starting a new monitor event script
3546  */
3547 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3548 {
3549         int ret;
3550         int32_t res;
3551
3552         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3553                            ctdb, NULL, &res, NULL, NULL);
3554         if (ret != 0 || res != 0) {
3555                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3556                 return -1;
3557         }
3558
3559         return 0;
3560 }
3561
3562 /*
3563   tell the main daemon we are starting to run an eventscript
3564  */
3565 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3566 {
3567         int ret;
3568         int32_t res;
3569         TDB_DATA data;
3570
3571         data.dptr = discard_const(name);
3572         data.dsize = strlen(name)+1;
3573
3574         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3575                            ctdb, NULL, &res, NULL, NULL);
3576         if (ret != 0 || res != 0) {
3577                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3578                 return -1;
3579         }
3580
3581         return 0;
3582 }
3583
3584 /*
3585   tell the main daemon the status of the script we ran
3586  */
3587 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3588 {
3589         int ret;
3590         int32_t res;
3591         TDB_DATA data;
3592
3593         data.dptr = (uint8_t *)&result;
3594         data.dsize = sizeof(result);
3595
3596         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3597                            ctdb, NULL, &res, NULL, NULL);
3598         if (ret != 0 || res != 0) {
3599                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3600                 return -1;
3601         }
3602
3603         return 0;
3604 }
3605
3606
3607 /*
3608   get the status of running the monitor eventscripts
3609  */
3610 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3611                 struct timeval timeout, uint32_t destnode, 
3612                 TALLOC_CTX *mem_ctx,
3613                 struct ctdb_monitoring_wire **script_status)
3614 {
3615         int ret;
3616         TDB_DATA outdata;
3617         int32_t res;
3618
3619         ret = ctdb_control(ctdb, destnode, 0, 
3620                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3621                            mem_ctx, &outdata, &res, &timeout, NULL);
3622         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3623                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3624                 return -1;
3625         }
3626
3627         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3628         talloc_free(outdata.dptr);
3629                     
3630         return 0;
3631 }
3632
3633 /*
3634   tell the main daemon how long it took to lock the reclock file
3635  */
3636 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3637 {
3638         int ret;
3639         int32_t res;
3640         TDB_DATA data;
3641
3642         data.dptr = (uint8_t *)&latency;
3643         data.dsize = sizeof(latency);
3644
3645         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3646                            ctdb, NULL, &res, NULL, NULL);
3647         if (ret != 0 || res != 0) {
3648                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3649                 return -1;
3650         }
3651
3652         return 0;
3653 }
3654
3655 /*
3656   get the name of the reclock file
3657  */
3658 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3659                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3660                          const char **name)
3661 {
3662         int ret;
3663         int32_t res;
3664         TDB_DATA data;
3665
3666         ret = ctdb_control(ctdb, destnode, 0, 
3667                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3668                            mem_ctx, &data, &res, &timeout, NULL);
3669         if (ret != 0 || res != 0) {
3670                 return -1;
3671         }
3672
3673         if (data.dsize == 0) {
3674                 *name = NULL;
3675         } else {
3676                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3677         }
3678         talloc_free(data.dptr);
3679
3680         return 0;
3681 }
3682
3683 /*
3684   set the reclock filename for a node
3685  */
3686 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3687 {
3688         int ret;
3689         TDB_DATA data;
3690         int32_t res;
3691
3692         if (reclock == NULL) {
3693                 data.dsize = 0;
3694                 data.dptr  = NULL;
3695         } else {
3696                 data.dsize = strlen(reclock) + 1;
3697                 data.dptr  = discard_const(reclock);
3698         }
3699
3700         ret = ctdb_control(ctdb, destnode, 0, 
3701                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3702                            NULL, NULL, &res, &timeout, NULL);
3703         if (ret != 0 || res != 0) {
3704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3705                 return -1;
3706         }
3707
3708         return 0;
3709 }
3710
3711 /*
3712   stop a node
3713  */
3714 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3715 {
3716         int ret;
3717         int32_t res;
3718
3719         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3720                            ctdb, NULL, &res, &timeout, NULL);
3721         if (ret != 0 || res != 0) {
3722                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3723                 return -1;
3724         }
3725
3726         return 0;
3727 }
3728
3729 /*
3730   continue a node
3731  */
3732 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3733 {
3734         int ret;
3735
3736         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3737                            ctdb, NULL, NULL, &timeout, NULL);
3738         if (ret != 0) {
3739                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3740                 return -1;
3741         }
3742
3743         return 0;
3744 }
3745
3746 /*
3747   set the natgw state for a node
3748  */
3749 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3750 {
3751         int ret;
3752         TDB_DATA data;
3753         int32_t res;
3754
3755         data.dsize = sizeof(natgwstate);
3756         data.dptr  = (uint8_t *)&natgwstate;
3757
3758         ret = ctdb_control(ctdb, destnode, 0, 
3759                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3760                            NULL, NULL, &res, &timeout, NULL);
3761         if (ret != 0 || res != 0) {
3762                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3763                 return -1;
3764         }
3765
3766         return 0;
3767 }
3768
3769 /*
3770   set the lmaster role for a node
3771  */
3772 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3773 {
3774         int ret;
3775         TDB_DATA data;
3776         int32_t res;
3777
3778         data.dsize = sizeof(lmasterrole);
3779         data.dptr  = (uint8_t *)&lmasterrole;
3780
3781         ret = ctdb_control(ctdb, destnode, 0, 
3782                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3783                            NULL, NULL, &res, &timeout, NULL);
3784         if (ret != 0 || res != 0) {
3785                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3786                 return -1;
3787         }
3788
3789         return 0;
3790 }
3791
3792 /*
3793   set the recmaster role for a node
3794  */
3795 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3796 {
3797         int ret;
3798         TDB_DATA data;
3799         int32_t res;
3800
3801         data.dsize = sizeof(recmasterrole);
3802         data.dptr  = (uint8_t *)&recmasterrole;
3803
3804         ret = ctdb_control(ctdb, destnode, 0, 
3805                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3806                            NULL, NULL, &res, &timeout, NULL);
3807         if (ret != 0 || res != 0) {
3808                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3809                 return -1;
3810         }
3811
3812         return 0;
3813 }