Merge branch 'master' into martins
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 /*
34   allocate a packet for use in client<->daemon communication
35  */
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37                                             TALLOC_CTX *mem_ctx, 
38                                             enum ctdb_operation operation, 
39                                             size_t length, size_t slength,
40                                             const char *type)
41 {
42         int size;
43         struct ctdb_req_header *hdr;
44
45         length = MAX(length, slength);
46         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47
48         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49         if (hdr == NULL) {
50                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51                          operation, (unsigned)length));
52                 return NULL;
53         }
54         talloc_set_name_const(hdr, type);
55         memset(hdr, 0, slength);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90
91         for (fn=ctdb_db->calls;fn;fn=fn->next) {
92                 if (fn->id == call->call_id) break;
93         }
94         if (fn == NULL) {
95                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
96                 talloc_free(c);
97                 return -1;
98         }
99
100         if (fn->fn(c) != 0) {
101                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
102                 talloc_free(c);
103                 return -1;
104         }
105
106         if (header->laccessor != caller) {
107                 header->lacount = 0;
108         }
109         header->laccessor = caller;
110         header->lacount++;
111
112         /* we need to force the record to be written out if this was a remote access,
113            so that the lacount is updated */
114         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115                 c->new_data = &c->record_data;
116         }
117
118         if (c->new_data) {
119                 /* XXX check that we always have the lock here? */
120                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122                         talloc_free(c);
123                         return -1;
124                 }
125         }
126
127         if (c->reply_data) {
128                 call->reply_data = *c->reply_data;
129
130                 talloc_steal(call, call->reply_data.dptr);
131                 talloc_set_name_const(call->reply_data.dptr, __location__);
132         } else {
133                 call->reply_data.dptr = NULL;
134                 call->reply_data.dsize = 0;
135         }
136         call->status = c->status;
137
138         talloc_free(c);
139
140         return 0;
141 }
142
143
144 /*
145   queue a packet for sending from client to daemon
146 */
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 {
149         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 }
151
152
153 /*
154   called when a CTDB_REPLY_CALL packet comes in in the client
155
156   This packet comes in response to a CTDB_REQ_CALL request packet. It
157   contains any reply data from the call
158 */
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 {
161         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162         struct ctdb_client_call_state *state;
163
164         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165         if (state == NULL) {
166                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
167                 return;
168         }
169
170         if (hdr->reqid != state->reqid) {
171                 /* we found a record  but it was the wrong one */
172                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
173                 return;
174         }
175
176         state->call->reply_data.dptr = c->data;
177         state->call->reply_data.dsize = c->datalen;
178         state->call->status = c->status;
179
180         talloc_steal(state, c);
181
182         state->state = CTDB_CALL_DONE;
183
184         if (state->async.fn) {
185                 state->async.fn(state);
186         }
187 }
188
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
190
191 /*
192   this is called in the client, when data comes in from the daemon
193  */
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 {
196         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
198         TALLOC_CTX *tmp_ctx;
199
200         /* place the packet as a child of a tmp_ctx. We then use
201            talloc_free() below to free it. If any of the calls want
202            to keep it, then they will steal it somewhere else, and the
203            talloc_free() will be a no-op */
204         tmp_ctx = talloc_new(ctdb);
205         talloc_steal(tmp_ctx, hdr);
206
207         if (cnt == 0) {
208                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
209                 exit(0);
210         }
211
212         if (cnt < sizeof(*hdr)) {
213                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
214                 goto done;
215         }
216         if (cnt != hdr->length) {
217                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
218                                (unsigned)hdr->length, (unsigned)cnt);
219                 goto done;
220         }
221
222         if (hdr->ctdb_magic != CTDB_MAGIC) {
223                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
224                 goto done;
225         }
226
227         if (hdr->ctdb_version != CTDB_VERSION) {
228                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
229                 goto done;
230         }
231
232         switch (hdr->operation) {
233         case CTDB_REPLY_CALL:
234                 ctdb_client_reply_call(ctdb, hdr);
235                 break;
236
237         case CTDB_REQ_MESSAGE:
238                 ctdb_request_message(ctdb, hdr);
239                 break;
240
241         case CTDB_REPLY_CONTROL:
242                 ctdb_client_reply_control(ctdb, hdr);
243                 break;
244
245         default:
246                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
247         }
248
249 done:
250         talloc_free(tmp_ctx);
251 }
252
253 /*
254   connect to a unix domain socket
255 */
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 {
258         struct sockaddr_un addr;
259
260         memset(&addr, 0, sizeof(addr));
261         addr.sun_family = AF_UNIX;
262         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263
264         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265         if (ctdb->daemon.sd == -1) {
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 return -1;
276         }
277
278         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
279                                               CTDB_DS_ALIGNMENT, 
280                                               ctdb_client_read_cb, ctdb);
281         return 0;
282 }
283
284
285 struct ctdb_record_handle {
286         struct ctdb_db_context *ctdb_db;
287         TDB_DATA key;
288         TDB_DATA *data;
289         struct ctdb_ltdb_header header;
290 };
291
292
293 /*
294   make a recv call to the local ctdb daemon - called from client context
295
296   This is called when the program wants to wait for a ctdb_call to complete and get the 
297   results. This call will block unless the call has already completed.
298 */
299 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
300 {
301         if (state == NULL) {
302                 return -1;
303         }
304
305         while (state->state < CTDB_CALL_DONE) {
306                 event_loop_once(state->ctdb_db->ctdb->ev);
307         }
308         if (state->state != CTDB_CALL_DONE) {
309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
310                 talloc_free(state);
311                 return -1;
312         }
313
314         if (state->call->reply_data.dsize) {
315                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
316                                                       state->call->reply_data.dptr,
317                                                       state->call->reply_data.dsize);
318                 call->reply_data.dsize = state->call->reply_data.dsize;
319         } else {
320                 call->reply_data.dptr = NULL;
321                 call->reply_data.dsize = 0;
322         }
323         call->status = state->call->status;
324         talloc_free(state);
325
326         return 0;
327 }
328
329
330
331
332 /*
333   destroy a ctdb_call in client
334 */
335 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
336 {
337         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
338         return 0;
339 }
340
341 /*
342   construct an event driven local ctdb_call
343
344   this is used so that locally processed ctdb_call requests are processed
345   in an event driven manner
346 */
347 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
348                                                                   struct ctdb_call *call,
349                                                                   struct ctdb_ltdb_header *header,
350                                                                   TDB_DATA *data)
351 {
352         struct ctdb_client_call_state *state;
353         struct ctdb_context *ctdb = ctdb_db->ctdb;
354         int ret;
355
356         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
357         CTDB_NO_MEMORY_NULL(ctdb, state);
358         state->call = talloc_zero(state, struct ctdb_call);
359         CTDB_NO_MEMORY_NULL(ctdb, state->call);
360
361         talloc_steal(state, data->dptr);
362
363         state->state   = CTDB_CALL_DONE;
364         *(state->call) = *call;
365         state->ctdb_db = ctdb_db;
366
367         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
368
369         return state;
370 }
371
372 /*
373   make a ctdb call to the local daemon - async send. Called from client context.
374
375   This constructs a ctdb_call request and queues it for processing. 
376   This call never blocks.
377 */
378 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
379                                               struct ctdb_call *call)
380 {
381         struct ctdb_client_call_state *state;
382         struct ctdb_context *ctdb = ctdb_db->ctdb;
383         struct ctdb_ltdb_header header;
384         TDB_DATA data;
385         int ret;
386         size_t len;
387         struct ctdb_req_call *c;
388
389         /* if the domain socket is not yet open, open it */
390         if (ctdb->daemon.sd==-1) {
391                 ctdb_socket_connect(ctdb);
392         }
393
394         ret = ctdb_ltdb_lock(ctdb_db, call->key);
395         if (ret != 0) {
396                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
397                 return NULL;
398         }
399
400         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
401
402         if (ret == 0 && header.dmaster == ctdb->pnn) {
403                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
404                 talloc_free(data.dptr);
405                 ctdb_ltdb_unlock(ctdb_db, call->key);
406                 return state;
407         }
408
409         ctdb_ltdb_unlock(ctdb_db, call->key);
410         talloc_free(data.dptr);
411
412         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
413         if (state == NULL) {
414                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
415                 return NULL;
416         }
417         state->call = talloc_zero(state, struct ctdb_call);
418         if (state->call == NULL) {
419                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
420                 return NULL;
421         }
422
423         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
424         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
425         if (c == NULL) {
426                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
427                 return NULL;
428         }
429
430         state->reqid     = ctdb_reqid_new(ctdb, state);
431         state->ctdb_db = ctdb_db;
432         talloc_set_destructor(state, ctdb_client_call_destructor);
433
434         c->hdr.reqid     = state->reqid;
435         c->flags         = call->flags;
436         c->db_id         = ctdb_db->db_id;
437         c->callid        = call->call_id;
438         c->hopcount      = 0;
439         c->keylen        = call->key.dsize;
440         c->calldatalen   = call->call_data.dsize;
441         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
442         memcpy(&c->data[call->key.dsize], 
443                call->call_data.dptr, call->call_data.dsize);
444         *(state->call)              = *call;
445         state->call->call_data.dptr = &c->data[call->key.dsize];
446         state->call->key.dptr       = &c->data[0];
447
448         state->state  = CTDB_CALL_WAIT;
449
450
451         ctdb_client_queue_pkt(ctdb, &c->hdr);
452
453         return state;
454 }
455
456
457 /*
458   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
459 */
460 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
461 {
462         struct ctdb_client_call_state *state;
463
464         state = ctdb_call_send(ctdb_db, call);
465         return ctdb_call_recv(state, call);
466 }
467
468
469 /*
470   tell the daemon what messaging srvid we will use, and register the message
471   handler function in the client
472 */
473 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
474                              ctdb_message_fn_t handler,
475                              void *private_data)
476                                     
477 {
478         int res;
479         int32_t status;
480         
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499         
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512
513 /*
514   send a message - from client context
515  */
516 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
517                       uint64_t srvid, TDB_DATA data)
518 {
519         struct ctdb_req_message *r;
520         int len, res;
521
522         len = offsetof(struct ctdb_req_message, data) + data.dsize;
523         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
524                                len, struct ctdb_req_message);
525         CTDB_NO_MEMORY(ctdb, r);
526
527         r->hdr.destnode  = pnn;
528         r->srvid         = srvid;
529         r->datalen       = data.dsize;
530         memcpy(&r->data[0], data.dptr, data.dsize);
531         
532         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
533         if (res != 0) {
534                 return res;
535         }
536
537         talloc_free(r);
538         return 0;
539 }
540
541
542 /*
543   cancel a ctdb_fetch_lock operation, releasing the lock
544  */
545 static int fetch_lock_destructor(struct ctdb_record_handle *h)
546 {
547         ctdb_ltdb_unlock(h->ctdb_db, h->key);
548         return 0;
549 }
550
551 /*
552   force the migration of a record to this node
553  */
554 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
555 {
556         struct ctdb_call call;
557         ZERO_STRUCT(call);
558         call.call_id = CTDB_NULL_FUNC;
559         call.key = key;
560         call.flags = CTDB_IMMEDIATE_MIGRATION;
561         return ctdb_call(ctdb_db, &call);
562 }
563
564 /*
565   get a lock on a record, and return the records data. Blocks until it gets the lock
566  */
567 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
568                                            TDB_DATA key, TDB_DATA *data)
569 {
570         int ret;
571         struct ctdb_record_handle *h;
572
573         /*
574           procedure is as follows:
575
576           1) get the chain lock. 
577           2) check if we are dmaster
578           3) if we are the dmaster then return handle 
579           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
580              reply from ctdbd
581           5) when we get the reply, goto (1)
582          */
583
584         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
585         if (h == NULL) {
586                 return NULL;
587         }
588
589         h->ctdb_db = ctdb_db;
590         h->key     = key;
591         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
592         if (h->key.dptr == NULL) {
593                 talloc_free(h);
594                 return NULL;
595         }
596         h->data    = data;
597
598         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
599                  (const char *)key.dptr));
600
601 again:
602         /* step 1 - get the chain lock */
603         ret = ctdb_ltdb_lock(ctdb_db, key);
604         if (ret != 0) {
605                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
606                 talloc_free(h);
607                 return NULL;
608         }
609
610         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
611
612         talloc_set_destructor(h, fetch_lock_destructor);
613
614         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
615
616         /* when torturing, ensure we test the remote path */
617         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
618             random() % 5 == 0) {
619                 h->header.dmaster = (uint32_t)-1;
620         }
621
622
623         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
624
625         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
626                 ctdb_ltdb_unlock(ctdb_db, key);
627                 ret = ctdb_client_force_migration(ctdb_db, key);
628                 if (ret != 0) {
629                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
630                         talloc_free(h);
631                         return NULL;
632                 }
633                 goto again;
634         }
635
636         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
637         return h;
638 }
639
640 /*
641   store some data to the record that was locked with ctdb_fetch_lock()
642 */
643 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
644 {
645         int ret;
646         int32_t status;
647         struct ctdb_rec_data *rec;
648         TDB_DATA recdata;
649
650         if (h->ctdb_db->persistent) {
651                 h->header.rsn++;
652         }
653
654         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655         if (ret != 0) {
656                 return ret;
657         }
658
659         /* don't need the persistent_store control for non-persistent databases */
660         if (!h->ctdb_db->persistent) {
661                 return 0;
662         }
663
664         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
665         if (rec == NULL) {
666                 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
667                 return -1;
668         }
669
670         recdata.dptr = (uint8_t *)rec;
671         recdata.dsize = rec->length;
672
673         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
674                            CTDB_CONTROL_PERSISTENT_STORE, 0,
675                            recdata, NULL, NULL, &status, NULL, NULL);
676
677         talloc_free(rec);
678
679         if (ret != 0 || status != 0) {
680                 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
681                 return -1;
682         }
683
684         return 0;
685 }
686
687 /*
688   non-locking fetch of a record
689  */
690 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
691                TDB_DATA key, TDB_DATA *data)
692 {
693         struct ctdb_call call;
694         int ret;
695
696         call.call_id = CTDB_FETCH_FUNC;
697         call.call_data.dptr = NULL;
698         call.call_data.dsize = 0;
699
700         ret = ctdb_call(ctdb_db, &call);
701
702         if (ret == 0) {
703                 *data = call.reply_data;
704                 talloc_steal(mem_ctx, data->dptr);
705         }
706
707         return ret;
708 }
709
710
711
712 /*
713    called when a control completes or timesout to invoke the callback
714    function the user provided
715 */
716 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
717         struct timeval t, void *private_data)
718 {
719         struct ctdb_client_control_state *state;
720         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
721         int ret;
722
723         state = talloc_get_type(private_data, struct ctdb_client_control_state);
724         talloc_steal(tmp_ctx, state);
725
726         ret = ctdb_control_recv(state->ctdb, state, state,
727                         NULL, 
728                         NULL, 
729                         NULL);
730
731         talloc_free(tmp_ctx);
732 }
733
734 /*
735   called when a CTDB_REPLY_CONTROL packet comes in in the client
736
737   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
738   contains any reply data from the control
739 */
740 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
741                                       struct ctdb_req_header *hdr)
742 {
743         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
744         struct ctdb_client_control_state *state;
745
746         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
747         if (state == NULL) {
748                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
749                 return;
750         }
751
752         if (hdr->reqid != state->reqid) {
753                 /* we found a record  but it was the wrong one */
754                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
755                 return;
756         }
757
758         state->outdata.dptr = c->data;
759         state->outdata.dsize = c->datalen;
760         state->status = c->status;
761         if (c->errorlen) {
762                 state->errormsg = talloc_strndup(state, 
763                                                  (char *)&c->data[c->datalen], 
764                                                  c->errorlen);
765         }
766
767         /* state->outdata now uses resources from c so we dont want c
768            to just dissappear from under us while state is still alive
769         */
770         talloc_steal(state, c);
771
772         state->state = CTDB_CONTROL_DONE;
773
774         /* if we had a callback registered for this control, pull the response
775            and call the callback.
776         */
777         if (state->async.fn) {
778                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
779         }
780 }
781
782
783 /*
784   destroy a ctdb_control in client
785 */
786 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
787 {
788         ctdb_reqid_remove(state->ctdb, state->reqid);
789         return 0;
790 }
791
792
793 /* time out handler for ctdb_control */
794 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
795         struct timeval t, void *private_data)
796 {
797         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
798
799         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
800
801         state->state = CTDB_CONTROL_TIMEOUT;
802
803         /* if we had a callback registered for this control, pull the response
804            and call the callback.
805         */
806         if (state->async.fn) {
807                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
808         }
809 }
810
811 /* async version of send control request */
812 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
813                 uint32_t destnode, uint64_t srvid, 
814                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
815                 TALLOC_CTX *mem_ctx,
816                 struct timeval *timeout,
817                 char **errormsg)
818 {
819         struct ctdb_client_control_state *state;
820         size_t len;
821         struct ctdb_req_control *c;
822         int ret;
823
824         if (errormsg) {
825                 *errormsg = NULL;
826         }
827
828         /* if the domain socket is not yet open, open it */
829         if (ctdb->daemon.sd==-1) {
830                 ctdb_socket_connect(ctdb);
831         }
832
833         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
834         CTDB_NO_MEMORY_NULL(ctdb, state);
835
836         state->ctdb       = ctdb;
837         state->reqid      = ctdb_reqid_new(ctdb, state);
838         state->state      = CTDB_CONTROL_WAIT;
839         state->errormsg   = NULL;
840
841         talloc_set_destructor(state, ctdb_control_destructor);
842
843         len = offsetof(struct ctdb_req_control, data) + data.dsize;
844         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
845                                len, struct ctdb_req_control);
846         state->c            = c;        
847         CTDB_NO_MEMORY_NULL(ctdb, c);
848         c->hdr.reqid        = state->reqid;
849         c->hdr.destnode     = destnode;
850         c->hdr.reqid        = state->reqid;
851         c->opcode           = opcode;
852         c->client_id        = 0;
853         c->flags            = flags;
854         c->srvid            = srvid;
855         c->datalen          = data.dsize;
856         if (data.dsize) {
857                 memcpy(&c->data[0], data.dptr, data.dsize);
858         }
859
860         /* timeout */
861         if (timeout && !timeval_is_zero(timeout)) {
862                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
863         }
864
865         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
866         if (ret != 0) {
867                 talloc_free(state);
868                 return NULL;
869         }
870
871         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
872                 talloc_free(state);
873                 return NULL;
874         }
875
876         return state;
877 }
878
879
880 /* async version of receive control reply */
881 int ctdb_control_recv(struct ctdb_context *ctdb, 
882                 struct ctdb_client_control_state *state, 
883                 TALLOC_CTX *mem_ctx,
884                 TDB_DATA *outdata, int32_t *status, char **errormsg)
885 {
886         TALLOC_CTX *tmp_ctx;
887
888         if (status != NULL) {
889                 *status = -1;
890         }
891         if (errormsg != NULL) {
892                 *errormsg = NULL;
893         }
894
895         if (state == NULL) {
896                 return -1;
897         }
898
899         /* prevent double free of state */
900         tmp_ctx = talloc_new(ctdb);
901         talloc_steal(tmp_ctx, state);
902
903         /* loop one event at a time until we either timeout or the control
904            completes.
905         */
906         while (state->state == CTDB_CONTROL_WAIT) {
907                 event_loop_once(ctdb->ev);
908         }
909
910         if (state->state != CTDB_CONTROL_DONE) {
911                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
912                 if (state->async.fn) {
913                         state->async.fn(state);
914                 }
915                 talloc_free(tmp_ctx);
916                 return -1;
917         }
918
919         if (state->errormsg) {
920                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
921                 if (errormsg) {
922                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
923                 }
924                 if (state->async.fn) {
925                         state->async.fn(state);
926                 }
927                 talloc_free(tmp_ctx);
928                 return -1;
929         }
930
931         if (outdata) {
932                 *outdata = state->outdata;
933                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
934         }
935
936         if (status) {
937                 *status = state->status;
938         }
939
940         if (state->async.fn) {
941                 state->async.fn(state);
942         }
943
944         talloc_free(tmp_ctx);
945         return 0;
946 }
947
948
949
950 /*
951   send a ctdb control message
952   timeout specifies how long we should wait for a reply.
953   if timeout is NULL we wait indefinitely
954  */
955 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
956                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
957                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
958                  struct timeval *timeout,
959                  char **errormsg)
960 {
961         struct ctdb_client_control_state *state;
962
963         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
964                         flags, data, mem_ctx,
965                         timeout, errormsg);
966         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
967                         errormsg);
968 }
969
970
971
972
973 /*
974   a process exists call. Returns 0 if process exists, -1 otherwise
975  */
976 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
977 {
978         int ret;
979         TDB_DATA data;
980         int32_t status;
981
982         data.dptr = (uint8_t*)&pid;
983         data.dsize = sizeof(pid);
984
985         ret = ctdb_control(ctdb, destnode, 0, 
986                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
987                            NULL, NULL, &status, NULL, NULL);
988         if (ret != 0) {
989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
990                 return -1;
991         }
992
993         return status;
994 }
995
996 /*
997   get remote statistics
998  */
999 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1000 {
1001         int ret;
1002         TDB_DATA data;
1003         int32_t res;
1004
1005         ret = ctdb_control(ctdb, destnode, 0, 
1006                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1007                            ctdb, &data, &res, NULL, NULL);
1008         if (ret != 0 || res != 0) {
1009                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1010                 return -1;
1011         }
1012
1013         if (data.dsize != sizeof(struct ctdb_statistics)) {
1014                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1015                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1016                       return -1;
1017         }
1018
1019         *status = *(struct ctdb_statistics *)data.dptr;
1020         talloc_free(data.dptr);
1021                         
1022         return 0;
1023 }
1024
1025 /*
1026   shutdown a remote ctdb node
1027  */
1028 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1029 {
1030         struct ctdb_client_control_state *state;
1031
1032         state = ctdb_control_send(ctdb, destnode, 0, 
1033                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1034                            NULL, &timeout, NULL);
1035         if (state == NULL) {
1036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1037                 return -1;
1038         }
1039
1040         return 0;
1041 }
1042
1043 /*
1044   get vnn map from a remote node
1045  */
1046 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1047 {
1048         int ret;
1049         TDB_DATA outdata;
1050         int32_t res;
1051         struct ctdb_vnn_map_wire *map;
1052
1053         ret = ctdb_control(ctdb, destnode, 0, 
1054                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1055                            mem_ctx, &outdata, &res, &timeout, NULL);
1056         if (ret != 0 || res != 0) {
1057                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1058                 return -1;
1059         }
1060         
1061         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1062         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1063             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1064                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1065                 return -1;
1066         }
1067
1068         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1069         CTDB_NO_MEMORY(ctdb, *vnnmap);
1070         (*vnnmap)->generation = map->generation;
1071         (*vnnmap)->size       = map->size;
1072         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1073
1074         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1075         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1076         talloc_free(outdata.dptr);
1077                     
1078         return 0;
1079 }
1080
1081
1082 /*
1083   get the recovery mode of a remote node
1084  */
1085 struct ctdb_client_control_state *
1086 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1087 {
1088         return ctdb_control_send(ctdb, destnode, 0, 
1089                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1090                            mem_ctx, &timeout, NULL);
1091 }
1092
1093 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1094 {
1095         int ret;
1096         int32_t res;
1097
1098         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1101                 return -1;
1102         }
1103
1104         if (recmode) {
1105                 *recmode = (uint32_t)res;
1106         }
1107
1108         return 0;
1109 }
1110
1111 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1112 {
1113         struct ctdb_client_control_state *state;
1114
1115         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1116         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1117 }
1118
1119
1120
1121
1122 /*
1123   set the recovery mode of a remote node
1124  */
1125 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1126 {
1127         int ret;
1128         TDB_DATA data;
1129         int32_t res;
1130
1131         data.dsize = sizeof(uint32_t);
1132         data.dptr = (unsigned char *)&recmode;
1133
1134         ret = ctdb_control(ctdb, destnode, 0, 
1135                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1136                            NULL, NULL, &res, &timeout, NULL);
1137         if (ret != 0 || res != 0) {
1138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1139                 return -1;
1140         }
1141
1142         return 0;
1143 }
1144
1145
1146
1147 /*
1148   get the recovery master of a remote node
1149  */
1150 struct ctdb_client_control_state *
1151 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1152                         struct timeval timeout, uint32_t destnode)
1153 {
1154         return ctdb_control_send(ctdb, destnode, 0, 
1155                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1156                            mem_ctx, &timeout, NULL);
1157 }
1158
1159 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1160 {
1161         int ret;
1162         int32_t res;
1163
1164         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1165         if (ret != 0) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1167                 return -1;
1168         }
1169
1170         if (recmaster) {
1171                 *recmaster = (uint32_t)res;
1172         }
1173
1174         return 0;
1175 }
1176
1177 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1178 {
1179         struct ctdb_client_control_state *state;
1180
1181         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1182         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1183 }
1184
1185
1186 /*
1187   set the recovery master of a remote node
1188  */
1189 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1190 {
1191         int ret;
1192         TDB_DATA data;
1193         int32_t res;
1194
1195         ZERO_STRUCT(data);
1196         data.dsize = sizeof(uint32_t);
1197         data.dptr = (unsigned char *)&recmaster;
1198
1199         ret = ctdb_control(ctdb, destnode, 0, 
1200                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1201                            NULL, NULL, &res, &timeout, NULL);
1202         if (ret != 0 || res != 0) {
1203                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1204                 return -1;
1205         }
1206
1207         return 0;
1208 }
1209
1210
1211 /*
1212   get a list of databases off a remote node
1213  */
1214 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1215                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1216 {
1217         int ret;
1218         TDB_DATA outdata;
1219         int32_t res;
1220
1221         ret = ctdb_control(ctdb, destnode, 0, 
1222                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1223                            mem_ctx, &outdata, &res, &timeout, NULL);
1224         if (ret != 0 || res != 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret;
1243         TDB_DATA outdata;
1244         int32_t res;
1245
1246         ret = ctdb_control(ctdb, destnode, 0, 
1247                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1248                            mem_ctx, &outdata, &res, &timeout, NULL);
1249         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1251                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1252         }
1253         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1255                 return -1;
1256         }
1257
1258         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1259         talloc_free(outdata.dptr);
1260                     
1261         return 0;
1262 }
1263
1264 /*
1265   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1266  */
1267 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1268                 struct timeval timeout, uint32_t destnode, 
1269                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1270 {
1271         int ret, i, len;
1272         TDB_DATA outdata;
1273         struct ctdb_node_mapv4 *nodemapv4;
1274         int32_t res;
1275
1276         ret = ctdb_control(ctdb, destnode, 0, 
1277                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1278                            mem_ctx, &outdata, &res, &timeout, NULL);
1279         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1281                 return -1;
1282         }
1283
1284         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1285
1286         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1287         (*nodemap) = talloc_zero_size(mem_ctx, len);
1288         CTDB_NO_MEMORY(ctdb, (*nodemap));
1289
1290         (*nodemap)->num = nodemapv4->num;
1291         for (i=0; i<nodemapv4->num; i++) {
1292                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1293                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1294                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1295                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1296         }
1297                 
1298         talloc_free(outdata.dptr);
1299                     
1300         return 0;
1301 }
1302
1303 /*
1304   drop the transport, reload the nodes file and restart the transport
1305  */
1306 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1307                     struct timeval timeout, uint32_t destnode)
1308 {
1309         int ret;
1310         int32_t res;
1311
1312         ret = ctdb_control(ctdb, destnode, 0, 
1313                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1314                            NULL, NULL, &res, &timeout, NULL);
1315         if (ret != 0 || res != 0) {
1316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1317                 return -1;
1318         }
1319
1320         return 0;
1321 }
1322
1323
1324 /*
1325   set vnn map on a node
1326  */
1327 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1328                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1329 {
1330         int ret;
1331         TDB_DATA data;
1332         int32_t res;
1333         struct ctdb_vnn_map_wire *map;
1334         size_t len;
1335
1336         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1337         map = talloc_size(mem_ctx, len);
1338         CTDB_NO_MEMORY(ctdb, map);
1339
1340         map->generation = vnnmap->generation;
1341         map->size = vnnmap->size;
1342         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1343         
1344         data.dsize = len;
1345         data.dptr  = (uint8_t *)map;
1346
1347         ret = ctdb_control(ctdb, destnode, 0, 
1348                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1349                            NULL, NULL, &res, &timeout, NULL);
1350         if (ret != 0 || res != 0) {
1351                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1352                 return -1;
1353         }
1354
1355         talloc_free(map);
1356
1357         return 0;
1358 }
1359
1360
1361 /*
1362   async send for pull database
1363  */
1364 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1365         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1366         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1367 {
1368         TDB_DATA indata;
1369         struct ctdb_control_pulldb *pull;
1370         struct ctdb_client_control_state *state;
1371
1372         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1373         CTDB_NO_MEMORY_NULL(ctdb, pull);
1374
1375         pull->db_id   = dbid;
1376         pull->lmaster = lmaster;
1377
1378         indata.dsize = sizeof(struct ctdb_control_pulldb);
1379         indata.dptr  = (unsigned char *)pull;
1380
1381         state = ctdb_control_send(ctdb, destnode, 0, 
1382                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1383                                   mem_ctx, &timeout, NULL);
1384         talloc_free(pull);
1385
1386         return state;
1387 }
1388
1389 /*
1390   async recv for pull database
1391  */
1392 int ctdb_ctrl_pulldb_recv(
1393         struct ctdb_context *ctdb, 
1394         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1395         TDB_DATA *outdata)
1396 {
1397         int ret;
1398         int32_t res;
1399
1400         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1401         if ( (ret != 0) || (res != 0) ){
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1403                 return -1;
1404         }
1405
1406         return 0;
1407 }
1408
1409 /*
1410   pull all keys and records for a specific database on a node
1411  */
1412 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1413                 uint32_t dbid, uint32_t lmaster, 
1414                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1415                 TDB_DATA *outdata)
1416 {
1417         struct ctdb_client_control_state *state;
1418
1419         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1420                                       timeout);
1421         
1422         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1423 }
1424
1425
1426 /*
1427   change dmaster for all keys in the database to the new value
1428  */
1429 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1430                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1431 {
1432         int ret;
1433         TDB_DATA indata;
1434         int32_t res;
1435
1436         indata.dsize = 2*sizeof(uint32_t);
1437         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1438
1439         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1440         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1441
1442         ret = ctdb_control(ctdb, destnode, 0, 
1443                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1444                            NULL, NULL, &res, &timeout, NULL);
1445         if (ret != 0 || res != 0) {
1446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1447                 return -1;
1448         }
1449
1450         return 0;
1451 }
1452
1453 /*
1454   ping a node, return number of clients connected
1455  */
1456 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1457 {
1458         int ret;
1459         int32_t res;
1460
1461         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1462                            tdb_null, NULL, NULL, &res, NULL, NULL);
1463         if (ret != 0) {
1464                 return -1;
1465         }
1466         return res;
1467 }
1468
1469 /*
1470   find the real path to a ltdb 
1471  */
1472 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1473                    const char **path)
1474 {
1475         int ret;
1476         int32_t res;
1477         TDB_DATA data;
1478
1479         data.dptr = (uint8_t *)&dbid;
1480         data.dsize = sizeof(dbid);
1481
1482         ret = ctdb_control(ctdb, destnode, 0, 
1483                            CTDB_CONTROL_GETDBPATH, 0, data, 
1484                            mem_ctx, &data, &res, &timeout, NULL);
1485         if (ret != 0 || res != 0) {
1486                 return -1;
1487         }
1488
1489         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1490         if ((*path) == NULL) {
1491                 return -1;
1492         }
1493
1494         talloc_free(data.dptr);
1495
1496         return 0;
1497 }
1498
1499 /*
1500   find the name of a db 
1501  */
1502 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1503                    const char **name)
1504 {
1505         int ret;
1506         int32_t res;
1507         TDB_DATA data;
1508
1509         data.dptr = (uint8_t *)&dbid;
1510         data.dsize = sizeof(dbid);
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1514                            mem_ctx, &data, &res, &timeout, NULL);
1515         if (ret != 0 || res != 0) {
1516                 return -1;
1517         }
1518
1519         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1520         if ((*name) == NULL) {
1521                 return -1;
1522         }
1523
1524         talloc_free(data.dptr);
1525
1526         return 0;
1527 }
1528
1529 /*
1530   create a database
1531  */
1532 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1533                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1534 {
1535         int ret;
1536         int32_t res;
1537         TDB_DATA data;
1538
1539         data.dptr = discard_const(name);
1540         data.dsize = strlen(name)+1;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1544                            0, data, 
1545                            mem_ctx, &data, &res, &timeout, NULL);
1546
1547         if (ret != 0 || res != 0) {
1548                 return -1;
1549         }
1550
1551         return 0;
1552 }
1553
1554 /*
1555   get debug level on a node
1556  */
1557 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1558 {
1559         int ret;
1560         int32_t res;
1561         TDB_DATA data;
1562
1563         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1564                            ctdb, &data, &res, NULL, NULL);
1565         if (ret != 0 || res != 0) {
1566                 return -1;
1567         }
1568         if (data.dsize != sizeof(int32_t)) {
1569                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1570                          (unsigned)data.dsize));
1571                 return -1;
1572         }
1573         *level = *(int32_t *)data.dptr;
1574         talloc_free(data.dptr);
1575         return 0;
1576 }
1577
1578 /*
1579   set debug level on a node
1580  */
1581 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1582 {
1583         int ret;
1584         int32_t res;
1585         TDB_DATA data;
1586
1587         data.dptr = (uint8_t *)&level;
1588         data.dsize = sizeof(level);
1589
1590         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1591                            NULL, NULL, &res, NULL, NULL);
1592         if (ret != 0 || res != 0) {
1593                 return -1;
1594         }
1595         return 0;
1596 }
1597
1598
1599 /*
1600   get a list of connected nodes
1601  */
1602 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1603                                 struct timeval timeout,
1604                                 TALLOC_CTX *mem_ctx,
1605                                 uint32_t *num_nodes)
1606 {
1607         struct ctdb_node_map *map=NULL;
1608         int ret, i;
1609         uint32_t *nodes;
1610
1611         *num_nodes = 0;
1612
1613         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1614         if (ret != 0) {
1615                 return NULL;
1616         }
1617
1618         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1619         if (nodes == NULL) {
1620                 return NULL;
1621         }
1622
1623         for (i=0;i<map->num;i++) {
1624                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1625                         nodes[*num_nodes] = map->nodes[i].pnn;
1626                         (*num_nodes)++;
1627                 }
1628         }
1629
1630         return nodes;
1631 }
1632
1633
1634 /*
1635   reset remote status
1636  */
1637 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1638 {
1639         int ret;
1640         int32_t res;
1641
1642         ret = ctdb_control(ctdb, destnode, 0, 
1643                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1644                            NULL, NULL, &res, NULL, NULL);
1645         if (ret != 0 || res != 0) {
1646                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1647                 return -1;
1648         }
1649         return 0;
1650 }
1651
1652 /*
1653   this is the dummy null procedure that all databases support
1654 */
1655 static int ctdb_null_func(struct ctdb_call_info *call)
1656 {
1657         return 0;
1658 }
1659
1660 /*
1661   this is a plain fetch procedure that all databases support
1662 */
1663 static int ctdb_fetch_func(struct ctdb_call_info *call)
1664 {
1665         call->reply_data = &call->record_data;
1666         return 0;
1667 }
1668
1669 /*
1670   attach to a specific database - client call
1671 */
1672 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1673 {
1674         struct ctdb_db_context *ctdb_db;
1675         TDB_DATA data;
1676         int ret;
1677         int32_t res;
1678
1679         ctdb_db = ctdb_db_handle(ctdb, name);
1680         if (ctdb_db) {
1681                 return ctdb_db;
1682         }
1683
1684         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1685         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1686
1687         ctdb_db->ctdb = ctdb;
1688         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1689         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1690
1691         data.dptr = discard_const(name);
1692         data.dsize = strlen(name)+1;
1693
1694         /* tell ctdb daemon to attach */
1695         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1696                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1697                            0, data, ctdb_db, &data, &res, NULL, NULL);
1698         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1699                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1700                 talloc_free(ctdb_db);
1701                 return NULL;
1702         }
1703         
1704         ctdb_db->db_id = *(uint32_t *)data.dptr;
1705         talloc_free(data.dptr);
1706
1707         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1708         if (ret != 0) {
1709                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1710                 talloc_free(ctdb_db);
1711                 return NULL;
1712         }
1713
1714         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1715         if (!ctdb->do_setsched) {
1716                 tdb_flags |= TDB_NOMMAP;
1717         }
1718
1719         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1720         if (ctdb_db->ltdb == NULL) {
1721                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1722                 talloc_free(ctdb_db);
1723                 return NULL;
1724         }
1725
1726         ctdb_db->persistent = persistent;
1727
1728         DLIST_ADD(ctdb->db_list, ctdb_db);
1729
1730         /* add well known functions */
1731         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1732         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1733
1734         return ctdb_db;
1735 }
1736
1737
1738 /*
1739   setup a call for a database
1740  */
1741 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1742 {
1743         struct ctdb_registered_call *call;
1744
1745 #if 0
1746         TDB_DATA data;
1747         int32_t status;
1748         struct ctdb_control_set_call c;
1749         int ret;
1750
1751         /* this is no longer valid with the separate daemon architecture */
1752         c.db_id = ctdb_db->db_id;
1753         c.fn    = fn;
1754         c.id    = id;
1755
1756         data.dptr = (uint8_t *)&c;
1757         data.dsize = sizeof(c);
1758
1759         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1760                            data, NULL, NULL, &status, NULL, NULL);
1761         if (ret != 0 || status != 0) {
1762                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1763                 return -1;
1764         }
1765 #endif
1766
1767         /* also register locally */
1768         call = talloc(ctdb_db, struct ctdb_registered_call);
1769         call->fn = fn;
1770         call->id = id;
1771
1772         DLIST_ADD(ctdb_db->calls, call);        
1773         return 0;
1774 }
1775
1776
1777 struct traverse_state {
1778         bool done;
1779         uint32_t count;
1780         ctdb_traverse_func fn;
1781         void *private_data;
1782 };
1783
1784 /*
1785   called on each key during a ctdb_traverse
1786  */
1787 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1788 {
1789         struct traverse_state *state = (struct traverse_state *)p;
1790         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1791         TDB_DATA key;
1792
1793         if (data.dsize < sizeof(uint32_t) ||
1794             d->length != data.dsize) {
1795                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1796                 state->done = True;
1797                 return;
1798         }
1799
1800         key.dsize = d->keylen;
1801         key.dptr  = &d->data[0];
1802         data.dsize = d->datalen;
1803         data.dptr = &d->data[d->keylen];
1804
1805         if (key.dsize == 0 && data.dsize == 0) {
1806                 /* end of traverse */
1807                 state->done = True;
1808                 return;
1809         }
1810
1811         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1812                 /* empty records are deleted records in ctdb */
1813                 return;
1814         }
1815
1816         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1817                 state->done = True;
1818         }
1819
1820         state->count++;
1821 }
1822
1823
1824 /*
1825   start a cluster wide traverse, calling the supplied fn on each record
1826   return the number of records traversed, or -1 on error
1827  */
1828 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1829 {
1830         TDB_DATA data;
1831         struct ctdb_traverse_start t;
1832         int32_t status;
1833         int ret;
1834         uint64_t srvid = (getpid() | 0xFLL<<60);
1835         struct traverse_state state;
1836
1837         state.done = False;
1838         state.count = 0;
1839         state.private_data = private_data;
1840         state.fn = fn;
1841
1842         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1845                 return -1;
1846         }
1847
1848         t.db_id = ctdb_db->db_id;
1849         t.srvid = srvid;
1850         t.reqid = 0;
1851
1852         data.dptr = (uint8_t *)&t;
1853         data.dsize = sizeof(t);
1854
1855         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1856                            data, NULL, NULL, &status, NULL, NULL);
1857         if (ret != 0 || status != 0) {
1858                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1859                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1860                 return -1;
1861         }
1862
1863         while (!state.done) {
1864                 event_loop_once(ctdb_db->ctdb->ev);
1865         }
1866
1867         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1870                 return -1;
1871         }
1872
1873         return state.count;
1874 }
1875
1876 #define ISASCII(x) ((x>31)&&(x<128))
1877 /*
1878   called on each key during a catdb
1879  */
1880 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1881 {
1882         int i;
1883         FILE *f = (FILE *)p;
1884         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1885
1886         fprintf(f, "dmaster: %u\n", h->dmaster);
1887         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1888
1889         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1890         for (i=0;i<key.dsize;i++) {
1891                 if (ISASCII(key.dptr[i])) {
1892                         fprintf(f, "%c", key.dptr[i]);
1893                 } else {
1894                         fprintf(f, "\\%02X", key.dptr[i]);
1895                 }
1896         }
1897         fprintf(f, "\"\n");
1898
1899         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1900         for (i=sizeof(*h);i<data.dsize;i++) {
1901                 if (ISASCII(data.dptr[i])) {
1902                         fprintf(f, "%c", data.dptr[i]);
1903                 } else {
1904                         fprintf(f, "\\%02X", data.dptr[i]);
1905                 }
1906         }
1907         fprintf(f, "\"\n");
1908
1909         return 0;
1910 }
1911
1912 /*
1913   convenience function to list all keys to stdout
1914  */
1915 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1916 {
1917         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1918 }
1919
1920 /*
1921   get the pid of a ctdb daemon
1922  */
1923 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1924 {
1925         int ret;
1926         int32_t res;
1927
1928         ret = ctdb_control(ctdb, destnode, 0, 
1929                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1930                            NULL, NULL, &res, &timeout, NULL);
1931         if (ret != 0) {
1932                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1933                 return -1;
1934         }
1935
1936         *pid = res;
1937
1938         return 0;
1939 }
1940
1941
1942 /*
1943   async freeze send control
1944  */
1945 struct ctdb_client_control_state *
1946 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1947 {
1948         return ctdb_control_send(ctdb, destnode, 0, 
1949                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1950                            mem_ctx, &timeout, NULL);
1951 }
1952
1953 /* 
1954    async freeze recv control
1955 */
1956 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1957 {
1958         int ret;
1959         int32_t res;
1960
1961         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1962         if ( (ret != 0) || (res != 0) ){
1963                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1964                 return -1;
1965         }
1966
1967         return 0;
1968 }
1969
1970 /*
1971   freeze a node
1972  */
1973 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1974 {
1975         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1976         struct ctdb_client_control_state *state;
1977         int ret;
1978
1979         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1980         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1981         talloc_free(tmp_ctx);
1982
1983         return ret;
1984 }
1985
1986 /*
1987   thaw a node
1988  */
1989 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1990 {
1991         int ret;
1992         int32_t res;
1993
1994         ret = ctdb_control(ctdb, destnode, 0, 
1995                            CTDB_CONTROL_THAW, 0, tdb_null, 
1996                            NULL, NULL, &res, &timeout, NULL);
1997         if (ret != 0 || res != 0) {
1998                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1999                 return -1;
2000         }
2001
2002         return 0;
2003 }
2004
2005 /*
2006   get pnn of a node, or -1
2007  */
2008 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2009 {
2010         int ret;
2011         int32_t res;
2012
2013         ret = ctdb_control(ctdb, destnode, 0, 
2014                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2015                            NULL, NULL, &res, &timeout, NULL);
2016         if (ret != 0) {
2017                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2018                 return -1;
2019         }
2020
2021         return res;
2022 }
2023
2024 /*
2025   get the monitoring mode of a remote node
2026  */
2027 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2028 {
2029         int ret;
2030         int32_t res;
2031
2032         ret = ctdb_control(ctdb, destnode, 0, 
2033                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2034                            NULL, NULL, &res, &timeout, NULL);
2035         if (ret != 0) {
2036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2037                 return -1;
2038         }
2039
2040         *monmode = res;
2041
2042         return 0;
2043 }
2044
2045
2046 /*
2047  set the monitoring mode of a remote node to active
2048  */
2049 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2050 {
2051         int ret;
2052         
2053
2054         ret = ctdb_control(ctdb, destnode, 0, 
2055                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2056                            NULL, NULL,NULL, &timeout, NULL);
2057         if (ret != 0) {
2058                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2059                 return -1;
2060         }
2061
2062         
2063
2064         return 0;
2065 }
2066
2067 /*
2068   set the monitoring mode of a remote node to disable
2069  */
2070 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2071 {
2072         int ret;
2073         
2074
2075         ret = ctdb_control(ctdb, destnode, 0, 
2076                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2077                            NULL, NULL, NULL, &timeout, NULL);
2078         if (ret != 0) {
2079                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2080                 return -1;
2081         }
2082
2083         
2084
2085         return 0;
2086 }
2087
2088
2089
2090 /* 
2091   sent to a node to make it take over an ip address
2092 */
2093 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2094                           uint32_t destnode, struct ctdb_public_ip *ip)
2095 {
2096         TDB_DATA data;
2097         struct ctdb_public_ipv4 ipv4;
2098         int ret;
2099         int32_t res;
2100
2101         if (ip->addr.sa.sa_family == AF_INET) {
2102                 ipv4.pnn = ip->pnn;
2103                 ipv4.sin = ip->addr.ip;
2104
2105                 data.dsize = sizeof(ipv4);
2106                 data.dptr  = (uint8_t *)&ipv4;
2107
2108                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2109                            NULL, &res, &timeout, NULL);
2110         } else {
2111                 data.dsize = sizeof(*ip);
2112                 data.dptr  = (uint8_t *)ip;
2113
2114                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2115                            NULL, &res, &timeout, NULL);
2116         }
2117
2118         if (ret != 0 || res != 0) {
2119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2120                 return -1;
2121         }
2122
2123         return 0;       
2124 }
2125
2126
2127 /* 
2128   sent to a node to make it release an ip address
2129 */
2130 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2131                          uint32_t destnode, struct ctdb_public_ip *ip)
2132 {
2133         TDB_DATA data;
2134         struct ctdb_public_ipv4 ipv4;
2135         int ret;
2136         int32_t res;
2137
2138         if (ip->addr.sa.sa_family == AF_INET) {
2139                 ipv4.pnn = ip->pnn;
2140                 ipv4.sin = ip->addr.ip;
2141
2142                 data.dsize = sizeof(ipv4);
2143                 data.dptr  = (uint8_t *)&ipv4;
2144
2145                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2146                                    NULL, &res, &timeout, NULL);
2147         } else {
2148                 data.dsize = sizeof(*ip);
2149                 data.dptr  = (uint8_t *)ip;
2150
2151                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2152                                    NULL, &res, &timeout, NULL);
2153         }
2154
2155         if (ret != 0 || res != 0) {
2156                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2157                 return -1;
2158         }
2159
2160         return 0;       
2161 }
2162
2163
2164 /*
2165   get a tunable
2166  */
2167 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2168                           struct timeval timeout, 
2169                           uint32_t destnode,
2170                           const char *name, uint32_t *value)
2171 {
2172         struct ctdb_control_get_tunable *t;
2173         TDB_DATA data, outdata;
2174         int32_t res;
2175         int ret;
2176
2177         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2178         data.dptr  = talloc_size(ctdb, data.dsize);
2179         CTDB_NO_MEMORY(ctdb, data.dptr);
2180
2181         t = (struct ctdb_control_get_tunable *)data.dptr;
2182         t->length = strlen(name)+1;
2183         memcpy(t->name, name, t->length);
2184
2185         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2186                            &outdata, &res, &timeout, NULL);
2187         talloc_free(data.dptr);
2188         if (ret != 0 || res != 0) {
2189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2190                 return -1;
2191         }
2192
2193         if (outdata.dsize != sizeof(uint32_t)) {
2194                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2195                 talloc_free(outdata.dptr);
2196                 return -1;
2197         }
2198         
2199         *value = *(uint32_t *)outdata.dptr;
2200         talloc_free(outdata.dptr);
2201
2202         return 0;
2203 }
2204
2205 /*
2206   set a tunable
2207  */
2208 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2209                           struct timeval timeout, 
2210                           uint32_t destnode,
2211                           const char *name, uint32_t value)
2212 {
2213         struct ctdb_control_set_tunable *t;
2214         TDB_DATA data;
2215         int32_t res;
2216         int ret;
2217
2218         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2219         data.dptr  = talloc_size(ctdb, data.dsize);
2220         CTDB_NO_MEMORY(ctdb, data.dptr);
2221
2222         t = (struct ctdb_control_set_tunable *)data.dptr;
2223         t->length = strlen(name)+1;
2224         memcpy(t->name, name, t->length);
2225         t->value = value;
2226
2227         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2228                            NULL, &res, &timeout, NULL);
2229         talloc_free(data.dptr);
2230         if (ret != 0 || res != 0) {
2231                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2232                 return -1;
2233         }
2234
2235         return 0;
2236 }
2237
2238 /*
2239   list tunables
2240  */
2241 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2242                             struct timeval timeout, 
2243                             uint32_t destnode,
2244                             TALLOC_CTX *mem_ctx,
2245                             const char ***list, uint32_t *count)
2246 {
2247         TDB_DATA outdata;
2248         int32_t res;
2249         int ret;
2250         struct ctdb_control_list_tunable *t;
2251         char *p, *s, *ptr;
2252
2253         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2254                            mem_ctx, &outdata, &res, &timeout, NULL);
2255         if (ret != 0 || res != 0) {
2256                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2257                 return -1;
2258         }
2259
2260         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2261         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2262             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2263                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2264                 talloc_free(outdata.dptr);
2265                 return -1;              
2266         }
2267         
2268         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2269         CTDB_NO_MEMORY(ctdb, p);
2270
2271         talloc_free(outdata.dptr);
2272         
2273         (*list) = NULL;
2274         (*count) = 0;
2275
2276         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2277                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2278                 CTDB_NO_MEMORY(ctdb, *list);
2279                 (*list)[*count] = talloc_strdup(*list, s);
2280                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2281                 (*count)++;
2282         }
2283
2284         talloc_free(p);
2285
2286         return 0;
2287 }
2288
2289
2290 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2291                         struct timeval timeout, uint32_t destnode, 
2292                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2293 {
2294         int ret;
2295         TDB_DATA outdata;
2296         int32_t res;
2297
2298         ret = ctdb_control(ctdb, destnode, 0, 
2299                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2300                            mem_ctx, &outdata, &res, &timeout, NULL);
2301         if (ret == 0 && res == -1) {
2302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2303                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2304         }
2305         if (ret != 0 || res != 0) {
2306           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2307                 return -1;
2308         }
2309
2310         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2311         talloc_free(outdata.dptr);
2312                     
2313         return 0;
2314 }
2315
2316 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2317                         struct timeval timeout, uint32_t destnode, 
2318                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2319 {
2320         int ret, i, len;
2321         TDB_DATA outdata;
2322         int32_t res;
2323         struct ctdb_all_public_ipsv4 *ipsv4;
2324
2325         ret = ctdb_control(ctdb, destnode, 0, 
2326                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2327                            mem_ctx, &outdata, &res, &timeout, NULL);
2328         if (ret != 0 || res != 0) {
2329                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2330                 return -1;
2331         }
2332
2333         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2334         len = offsetof(struct ctdb_all_public_ips, ips) +
2335                 ipsv4->num*sizeof(struct ctdb_public_ip);
2336         *ips = talloc_zero_size(mem_ctx, len);
2337         (*ips)->num = ipsv4->num;
2338         for (i=0; i<ipsv4->num; i++) {
2339                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2340                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2341         }
2342
2343         talloc_free(outdata.dptr);
2344                     
2345         return 0;
2346 }
2347
2348 /*
2349   set/clear the permanent disabled bit on a remote node
2350  */
2351 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2352                        uint32_t set, uint32_t clear)
2353 {
2354         int ret;
2355         TDB_DATA data;
2356         struct ctdb_node_modflags m;
2357         int32_t res;
2358
2359         m.set = set;
2360         m.clear = clear;
2361
2362         data.dsize = sizeof(m);
2363         data.dptr = (unsigned char *)&m;
2364
2365         ret = ctdb_control(ctdb, destnode, 0, 
2366                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2367                            NULL, NULL, &res, &timeout, NULL);
2368         if (ret != 0 || res != 0) {
2369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for modflags failed\n"));
2370                 return -1;
2371         }
2372
2373         return 0;
2374 }
2375
2376
2377 /*
2378   get all tunables
2379  */
2380 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2381                                struct timeval timeout, 
2382                                uint32_t destnode,
2383                                struct ctdb_tunable *tunables)
2384 {
2385         TDB_DATA outdata;
2386         int ret;
2387         int32_t res;
2388
2389         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2390                            &outdata, &res, &timeout, NULL);
2391         if (ret != 0 || res != 0) {
2392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2393                 return -1;
2394         }
2395
2396         if (outdata.dsize != sizeof(*tunables)) {
2397                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2398                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2399                 return -1;              
2400         }
2401
2402         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2403         talloc_free(outdata.dptr);
2404         return 0;
2405 }
2406
2407 /*
2408   add a public address to a node
2409  */
2410 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2411                       struct timeval timeout, 
2412                       uint32_t destnode,
2413                       struct ctdb_control_ip_iface *pub)
2414 {
2415         TDB_DATA data;
2416         int32_t res;
2417         int ret;
2418
2419         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2420         data.dptr  = (unsigned char *)pub;
2421
2422         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2423                            NULL, &res, &timeout, NULL);
2424         if (ret != 0 || res != 0) {
2425                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2426                 return -1;
2427         }
2428
2429         return 0;
2430 }
2431
2432 /*
2433   delete a public address from a node
2434  */
2435 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2436                       struct timeval timeout, 
2437                       uint32_t destnode,
2438                       struct ctdb_control_ip_iface *pub)
2439 {
2440         TDB_DATA data;
2441         int32_t res;
2442         int ret;
2443
2444         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2445         data.dptr  = (unsigned char *)pub;
2446
2447         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2448                            NULL, &res, &timeout, NULL);
2449         if (ret != 0 || res != 0) {
2450                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2451                 return -1;
2452         }
2453
2454         return 0;
2455 }
2456
2457 /*
2458   kill a tcp connection
2459  */
2460 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2461                       struct timeval timeout, 
2462                       uint32_t destnode,
2463                       struct ctdb_control_killtcp *killtcp)
2464 {
2465         TDB_DATA data;
2466         int32_t res;
2467         int ret;
2468
2469         data.dsize = sizeof(struct ctdb_control_killtcp);
2470         data.dptr  = (unsigned char *)killtcp;
2471
2472         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2473                            NULL, &res, &timeout, NULL);
2474         if (ret != 0 || res != 0) {
2475                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2476                 return -1;
2477         }
2478
2479         return 0;
2480 }
2481
2482 /*
2483   send a gratious arp
2484  */
2485 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2486                       struct timeval timeout, 
2487                       uint32_t destnode,
2488                       ctdb_sock_addr *addr,
2489                       const char *ifname)
2490 {
2491         TDB_DATA data;
2492         int32_t res;
2493         int ret, len;
2494         struct ctdb_control_gratious_arp *gratious_arp;
2495         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2496
2497
2498         len = strlen(ifname)+1;
2499         gratious_arp = talloc_size(tmp_ctx, 
2500                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2501         CTDB_NO_MEMORY(ctdb, gratious_arp);
2502
2503         gratious_arp->addr = *addr;
2504         gratious_arp->len = len;
2505         memcpy(&gratious_arp->iface[0], ifname, len);
2506
2507
2508         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2509         data.dptr  = (unsigned char *)gratious_arp;
2510
2511         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2512                            NULL, &res, &timeout, NULL);
2513         if (ret != 0 || res != 0) {
2514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2515                 talloc_free(tmp_ctx);
2516                 return -1;
2517         }
2518
2519         talloc_free(tmp_ctx);
2520         return 0;
2521 }
2522
2523 /*
2524   get a list of all tcp tickles that a node knows about for a particular vnn
2525  */
2526 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2527                               struct timeval timeout, uint32_t destnode, 
2528                               TALLOC_CTX *mem_ctx, 
2529                               ctdb_sock_addr *addr,
2530                               struct ctdb_control_tcp_tickle_list **list)
2531 {
2532         int ret;
2533         TDB_DATA data, outdata;
2534         int32_t status;
2535
2536         data.dptr = (uint8_t*)addr;
2537         data.dsize = sizeof(ctdb_sock_addr);
2538
2539         ret = ctdb_control(ctdb, destnode, 0, 
2540                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2541                            mem_ctx, &outdata, &status, NULL, NULL);
2542         if (ret != 0 || status != 0) {
2543                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2544                 return -1;
2545         }
2546
2547         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2548
2549         return status;
2550 }
2551
2552 /*
2553   register a server id
2554  */
2555 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2556                       struct timeval timeout, 
2557                       struct ctdb_server_id *id)
2558 {
2559         TDB_DATA data;
2560         int32_t res;
2561         int ret;
2562
2563         data.dsize = sizeof(struct ctdb_server_id);
2564         data.dptr  = (unsigned char *)id;
2565
2566         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2567                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2568                         0, data, NULL,
2569                         NULL, &res, &timeout, NULL);
2570         if (ret != 0 || res != 0) {
2571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2572                 return -1;
2573         }
2574
2575         return 0;
2576 }
2577
2578 /*
2579   unregister a server id
2580  */
2581 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2582                       struct timeval timeout, 
2583                       struct ctdb_server_id *id)
2584 {
2585         TDB_DATA data;
2586         int32_t res;
2587         int ret;
2588
2589         data.dsize = sizeof(struct ctdb_server_id);
2590         data.dptr  = (unsigned char *)id;
2591
2592         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2593                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2594                         0, data, NULL,
2595                         NULL, &res, &timeout, NULL);
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;
2602 }
2603
2604
2605 /*
2606   check if a server id exists
2607
2608   if a server id does exist, return *status == 1, otherwise *status == 0
2609  */
2610 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2611                       struct timeval timeout, 
2612                       uint32_t destnode,
2613                       struct ctdb_server_id *id,
2614                       uint32_t *status)
2615 {
2616         TDB_DATA data;
2617         int32_t res;
2618         int ret;
2619
2620         data.dsize = sizeof(struct ctdb_server_id);
2621         data.dptr  = (unsigned char *)id;
2622
2623         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2624                         0, data, NULL,
2625                         NULL, &res, &timeout, NULL);
2626         if (ret != 0) {
2627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2628                 return -1;
2629         }
2630
2631         if (res) {
2632                 *status = 1;
2633         } else {
2634                 *status = 0;
2635         }
2636
2637         return 0;
2638 }
2639
2640 /*
2641    get the list of server ids that are registered on a node
2642 */
2643 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2644                 TALLOC_CTX *mem_ctx,
2645                 struct timeval timeout, uint32_t destnode, 
2646                 struct ctdb_server_id_list **svid_list)
2647 {
2648         int ret;
2649         TDB_DATA outdata;
2650         int32_t res;
2651
2652         ret = ctdb_control(ctdb, destnode, 0, 
2653                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2654                            mem_ctx, &outdata, &res, &timeout, NULL);
2655         if (ret != 0 || res != 0) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2657                 return -1;
2658         }
2659
2660         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2661                     
2662         return 0;
2663 }
2664
2665 /*
2666   initialise the ctdb daemon for client applications
2667
2668   NOTE: In current code the daemon does not fork. This is for testing purposes only
2669   and to simplify the code.
2670 */
2671 struct ctdb_context *ctdb_init(struct event_context *ev)
2672 {
2673         struct ctdb_context *ctdb;
2674
2675         ctdb = talloc_zero(ev, struct ctdb_context);
2676         ctdb->ev  = ev;
2677         ctdb->idr = idr_init(ctdb);
2678         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2679
2680         ctdb_set_socketname(ctdb, CTDB_PATH);
2681
2682         return ctdb;
2683 }
2684
2685
2686 /*
2687   set some ctdb flags
2688 */
2689 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2690 {
2691         ctdb->flags |= flags;
2692 }
2693
2694 /*
2695   setup the local socket name
2696 */
2697 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2698 {
2699         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2700         return 0;
2701 }
2702
2703 /*
2704   return the pnn of this node
2705 */
2706 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2707 {
2708         return ctdb->pnn;
2709 }
2710
2711
2712 /*
2713   get the uptime of a remote node
2714  */
2715 struct ctdb_client_control_state *
2716 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2717 {
2718         return ctdb_control_send(ctdb, destnode, 0, 
2719                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2720                            mem_ctx, &timeout, NULL);
2721 }
2722
2723 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2724 {
2725         int ret;
2726         int32_t res;
2727         TDB_DATA outdata;
2728
2729         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2730         if (ret != 0 || res != 0) {
2731                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2732                 return -1;
2733         }
2734
2735         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2736
2737         return 0;
2738 }
2739
2740 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2741 {
2742         struct ctdb_client_control_state *state;
2743
2744         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2745         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2746 }
2747
2748 /*
2749   send a control to execute the "recovered" event script on a node
2750  */
2751 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2752 {
2753         int ret;
2754         int32_t status;
2755
2756         ret = ctdb_control(ctdb, destnode, 0, 
2757                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2758                            NULL, NULL, &status, &timeout, NULL);
2759         if (ret != 0 || status != 0) {
2760                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2761                 return -1;
2762         }
2763
2764         return 0;
2765 }
2766
2767 /* 
2768   callback for the async helpers used when sending the same control
2769   to multiple nodes in parallell.
2770 */
2771 static void async_callback(struct ctdb_client_control_state *state)
2772 {
2773         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2774         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2775         int ret;
2776         TDB_DATA outdata;
2777         int32_t res;
2778         uint32_t destnode = state->c->hdr.destnode;
2779
2780         /* one more node has responded with recmode data */
2781         data->count--;
2782
2783         /* if we failed to push the db, then return an error and let
2784            the main loop try again.
2785         */
2786         if (state->state != CTDB_CONTROL_DONE) {
2787                 if ( !data->dont_log_errors) {
2788                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2789                 }
2790                 data->fail_count++;
2791                 if (data->fail_callback) {
2792                         data->fail_callback(ctdb, destnode, res, outdata,
2793                                         data->callback_data);
2794                 }
2795                 return;
2796         }
2797         
2798         state->async.fn = NULL;
2799
2800         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2801         if ((ret != 0) || (res != 0)) {
2802                 if ( !data->dont_log_errors) {
2803                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2804                 }
2805                 data->fail_count++;
2806                 if (data->fail_callback) {
2807                         data->fail_callback(ctdb, destnode, res, outdata,
2808                                         data->callback_data);
2809                 }
2810         }
2811         if ((ret == 0) && (data->callback != NULL)) {
2812                 data->callback(ctdb, destnode, res, outdata,
2813                                         data->callback_data);
2814         }
2815 }
2816
2817
2818 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2819 {
2820         /* set up the callback functions */
2821         state->async.fn = async_callback;
2822         state->async.private_data = data;
2823         
2824         /* one more control to wait for to complete */
2825         data->count++;
2826 }
2827
2828
2829 /* wait for up to the maximum number of seconds allowed
2830    or until all nodes we expect a response from has replied
2831 */
2832 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2833 {
2834         while (data->count > 0) {
2835                 event_loop_once(ctdb->ev);
2836         }
2837         if (data->fail_count != 0) {
2838                 if (!data->dont_log_errors) {
2839                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2840                                  data->fail_count));
2841                 }
2842                 return -1;
2843         }
2844         return 0;
2845 }
2846
2847
2848 /* 
2849    perform a simple control on the listed nodes
2850    The control cannot return data
2851  */
2852 int ctdb_client_async_control(struct ctdb_context *ctdb,
2853                                 enum ctdb_controls opcode,
2854                                 uint32_t *nodes,
2855                                 struct timeval timeout,
2856                                 bool dont_log_errors,
2857                                 TDB_DATA data,
2858                                 client_async_callback client_callback,
2859                                 client_async_callback fail_callback,
2860                                 void *callback_data)
2861 {
2862         struct client_async_data *async_data;
2863         struct ctdb_client_control_state *state;
2864         int j, num_nodes;
2865
2866         async_data = talloc_zero(ctdb, struct client_async_data);
2867         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2868         async_data->dont_log_errors = dont_log_errors;
2869         async_data->callback = client_callback;
2870         async_data->fail_callback = fail_callback;
2871         async_data->callback_data = callback_data;
2872         async_data->opcode        = opcode;
2873
2874         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2875
2876         /* loop over all nodes and send an async control to each of them */
2877         for (j=0; j<num_nodes; j++) {
2878                 uint32_t pnn = nodes[j];
2879
2880                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2881                                           0, data, async_data, &timeout, NULL);
2882                 if (state == NULL) {
2883                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2884                         talloc_free(async_data);
2885                         return -1;
2886                 }
2887                 
2888                 ctdb_client_async_add(async_data, state);
2889         }
2890
2891         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2892                 talloc_free(async_data);
2893                 return -1;
2894         }
2895
2896         talloc_free(async_data);
2897         return 0;
2898 }
2899
2900 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2901                                 struct ctdb_vnn_map *vnn_map,
2902                                 TALLOC_CTX *mem_ctx,
2903                                 bool include_self)
2904 {
2905         int i, j, num_nodes;
2906         uint32_t *nodes;
2907
2908         for (i=num_nodes=0;i<vnn_map->size;i++) {
2909                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2910                         continue;
2911                 }
2912                 num_nodes++;
2913         } 
2914
2915         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2916         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2917
2918         for (i=j=0;i<vnn_map->size;i++) {
2919                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2920                         continue;
2921                 }
2922                 nodes[j++] = vnn_map->map[i];
2923         } 
2924
2925         return nodes;
2926 }
2927
2928 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2929                                 struct ctdb_node_map *node_map,
2930                                 TALLOC_CTX *mem_ctx,
2931                                 bool include_self)
2932 {
2933         int i, j, num_nodes;
2934         uint32_t *nodes;
2935
2936         for (i=num_nodes=0;i<node_map->num;i++) {
2937                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2938                         continue;
2939                 }
2940                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2941                         continue;
2942                 }
2943                 num_nodes++;
2944         } 
2945
2946         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2947         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2948
2949         for (i=j=0;i<node_map->num;i++) {
2950                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2951                         continue;
2952                 }
2953                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2954                         continue;
2955                 }
2956                 nodes[j++] = node_map->nodes[i].pnn;
2957         } 
2958
2959         return nodes;
2960 }
2961
2962 /* 
2963   this is used to test if a pnn lock exists and if it exists will return
2964   the number of connections that pnn has reported or -1 if that recovery
2965   daemon is not running.
2966 */
2967 int
2968 ctdb_read_pnn_lock(int fd, int32_t pnn)
2969 {
2970         struct flock lock;
2971         char c;
2972
2973         lock.l_type = F_WRLCK;
2974         lock.l_whence = SEEK_SET;
2975         lock.l_start = pnn;
2976         lock.l_len = 1;
2977         lock.l_pid = 0;
2978
2979         if (fcntl(fd, F_GETLK, &lock) != 0) {
2980                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2981                 return -1;
2982         }
2983
2984         if (lock.l_type == F_UNLCK) {
2985                 return -1;
2986         }
2987
2988         if (pread(fd, &c, 1, pnn) == -1) {
2989                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2990                 return -1;
2991         }
2992
2993         return c;
2994 }
2995
2996 /*
2997   get capabilities of a remote node
2998  */
2999 struct ctdb_client_control_state *
3000 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3001 {
3002         return ctdb_control_send(ctdb, destnode, 0, 
3003                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3004                            mem_ctx, &timeout, NULL);
3005 }
3006
3007 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3008 {
3009         int ret;
3010         int32_t res;
3011         TDB_DATA outdata;
3012
3013         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3014         if ( (ret != 0) || (res != 0) ) {
3015                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3016                 return -1;
3017         }
3018
3019         if (capabilities) {
3020                 *capabilities = *((uint32_t *)outdata.dptr);
3021         }
3022
3023         return 0;
3024 }
3025
3026 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3027 {
3028         struct ctdb_client_control_state *state;
3029         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3030         int ret;
3031
3032         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3033         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3034         talloc_free(tmp_ctx);
3035         return ret;
3036 }
3037
3038 struct ctdb_transaction_handle {
3039         struct ctdb_db_context *ctdb_db;
3040         bool in_replay;
3041         /* we store the reads and writes done under a transaction one
3042            list stores both reads and writes, the other just writes
3043         */
3044         struct ctdb_marshall_buffer *m_all;
3045         struct ctdb_marshall_buffer *m_write;
3046 };
3047
3048 /* start a transaction on a database */
3049 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3050 {
3051         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3052         return 0;
3053 }
3054
3055 /* start a transaction on a database */
3056 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3057 {
3058         struct ctdb_record_handle *rh;
3059         TDB_DATA key;
3060         struct ctdb_ltdb_header header;
3061         TALLOC_CTX *tmp_ctx;
3062         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3063         int ret;
3064         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3065
3066         key.dptr = discard_const(keyname);
3067         key.dsize = strlen(keyname);
3068
3069         if (!ctdb_db->persistent) {
3070                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3071                 return -1;
3072         }
3073
3074 again:
3075         tmp_ctx = talloc_new(h);
3076
3077         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3078         if (rh == NULL) {
3079                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3080                 talloc_free(tmp_ctx);
3081                 return -1;
3082         }
3083         talloc_free(rh);
3084
3085         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3086         if (ret != 0) {
3087                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3088                 talloc_free(tmp_ctx);
3089                 return -1;
3090         }
3091
3092         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
3093         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3094                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3095                 talloc_free(tmp_ctx);
3096                 goto again;
3097         }
3098
3099         talloc_free(tmp_ctx);
3100
3101         return 0;
3102 }
3103
3104
3105 /* start a transaction on a database */
3106 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3107                                                        TALLOC_CTX *mem_ctx)
3108 {
3109         struct ctdb_transaction_handle *h;
3110         int ret;
3111
3112         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3113         if (h == NULL) {
3114                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3115                 return NULL;
3116         }
3117
3118         h->ctdb_db = ctdb_db;
3119
3120         ret = ctdb_transaction_fetch_start(h);
3121         if (ret != 0) {
3122                 talloc_free(h);
3123                 return NULL;
3124         }
3125
3126         talloc_set_destructor(h, ctdb_transaction_destructor);
3127
3128         return h;
3129 }
3130
3131
3132
3133 /*
3134   fetch a record inside a transaction
3135  */
3136 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3137                            TALLOC_CTX *mem_ctx, 
3138                            TDB_DATA key, TDB_DATA *data)
3139 {
3140         struct ctdb_ltdb_header header;
3141         int ret;
3142
3143         ZERO_STRUCT(header);
3144
3145         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3146         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3147                 /* record doesn't exist yet */
3148                 *data = tdb_null;
3149                 ret = 0;
3150         }
3151         
3152         if (ret != 0) {
3153                 return ret;
3154         }
3155
3156         if (!h->in_replay) {
3157                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3158                 if (h->m_all == NULL) {
3159                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3160                         return -1;
3161                 }
3162         }
3163
3164         return 0;
3165 }
3166
3167 /*
3168   stores a record inside a transaction
3169  */
3170 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3171                            TDB_DATA key, TDB_DATA data)
3172 {
3173         TALLOC_CTX *tmp_ctx = talloc_new(h);
3174         struct ctdb_ltdb_header header;
3175         TDB_DATA olddata;
3176         int ret;
3177
3178         ZERO_STRUCT(header);
3179
3180         /* we need the header so we can update the RSN */
3181         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3182         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3183                 /* the record doesn't exist - create one with us as dmaster.
3184                    This is only safe because we are in a transaction and this
3185                    is a persistent database */
3186                 header.dmaster = h->ctdb_db->ctdb->pnn;
3187                 header.rsn = 0;
3188         } else if (ret != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3190                 talloc_free(tmp_ctx);
3191                 return ret;
3192         }
3193
3194         if (data.dsize == olddata.dsize &&
3195             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3196                 /* save writing the same data */
3197                 talloc_free(tmp_ctx);
3198                 return 0;
3199         }
3200
3201         header.rsn++;
3202
3203         if (!h->in_replay) {
3204                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3205                 if (h->m_all == NULL) {
3206                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3207                         talloc_free(tmp_ctx);
3208                         return -1;
3209                 }
3210         }               
3211
3212         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3213         if (h->m_write == NULL) {
3214                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3215                 talloc_free(tmp_ctx);
3216                 return -1;
3217         }
3218         
3219         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3220
3221         talloc_free(tmp_ctx);
3222         
3223         return ret;
3224 }
3225
3226 /*
3227   replay a transaction
3228  */
3229 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3230 {
3231         int ret, i;
3232         struct ctdb_rec_data *rec = NULL;
3233
3234         h->in_replay = true;
3235         talloc_free(h->m_write);
3236         h->m_write = NULL;
3237
3238         ret = ctdb_transaction_fetch_start(h);
3239         if (ret != 0) {
3240                 return ret;
3241         }
3242
3243         for (i=0;i<h->m_all->count;i++) {
3244                 TDB_DATA key, data;
3245
3246                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3247                 if (rec == NULL) {
3248                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3249                         goto failed;
3250                 }
3251
3252                 if (rec->reqid == 0) {
3253                         /* its a store */
3254                         if (ctdb_transaction_store(h, key, data) != 0) {
3255                                 goto failed;
3256                         }
3257                 } else {
3258                         TDB_DATA data2;
3259                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3260
3261                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3262                                 talloc_free(tmp_ctx);
3263                                 goto failed;
3264                         }
3265                         if (data2.dsize != data.dsize ||
3266                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3267                                 /* the record has changed on us - we have to give up */
3268                                 talloc_free(tmp_ctx);
3269                                 goto failed;
3270                         }
3271                         talloc_free(tmp_ctx);
3272                 }
3273         }
3274         
3275         return 0;
3276
3277 failed:
3278         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3279         return -1;
3280 }
3281
3282
3283 /*
3284   commit a transaction
3285  */
3286 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3287 {
3288         int ret, retries=0;
3289         int32_t status;
3290         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3291         struct timeval timeout;
3292         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3293
3294         talloc_set_destructor(h, NULL);
3295
3296         /* our commit strategy is quite complex.
3297
3298            - we first try to commit the changes to all other nodes
3299
3300            - if that works, then we commit locally and we are done
3301
3302            - if a commit on another node fails, then we need to cancel
3303              the transaction, then restart the transaction (thus
3304              opening a window of time for a pending recovery to
3305              complete), then replay the transaction, checking all the
3306              reads and writes (checking that reads give the same data,
3307              and writes succeed). Then we retry the transaction to the
3308              other nodes
3309         */
3310
3311 again:
3312         if (h->m_write == NULL) {
3313                 /* no changes were made */
3314                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3315                 talloc_free(h);
3316                 return 0;
3317         }
3318
3319         /* tell ctdbd to commit to the other nodes */
3320         timeout = timeval_current_ofs(1, 0);
3321         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3322                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3323                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3324                            &timeout, NULL);
3325         if (ret != 0 || status != 0) {
3326                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3327                 sleep(1);
3328
3329                 if (ret != 0) {
3330                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3331                 } else {
3332                         /* work out what error code we will give if we 
3333                            have to fail the operation */
3334                         switch ((enum ctdb_trans2_commit_error)status) {
3335                         case CTDB_TRANS2_COMMIT_SUCCESS:
3336                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3337                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3338                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3339                                 break;
3340                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3341                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3342                                 break;
3343                         }
3344                 }
3345
3346                 if (++retries == 10) {
3347                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3348                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3349                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3350                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3351                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3352                         talloc_free(h);
3353                         return -1;
3354                 }               
3355
3356                 if (ctdb_replay_transaction(h) != 0) {
3357                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3358                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3359                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3360                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3361                         talloc_free(h);
3362                         return -1;
3363                 }
3364                 goto again;
3365         } else {
3366                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3367         }
3368
3369         /* do the real commit locally */
3370         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3371         if (ret != 0) {
3372                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3373                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3374                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3375                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3376                 talloc_free(h);
3377                 return ret;
3378         }
3379
3380         /* tell ctdbd that we are finished with our local commit */
3381         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3382                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3383                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3384         talloc_free(h);
3385         return 0;
3386 }
3387
3388 /*
3389   recovery daemon ping to main daemon
3390  */
3391 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3392 {
3393         int ret;
3394         int32_t res;
3395
3396         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3397                            ctdb, NULL, &res, NULL, NULL);
3398         if (ret != 0 || res != 0) {
3399                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3400                 return -1;
3401         }
3402
3403         return 0;
3404 }