Track how long it takes to take out the recovery lock from both the main dameon and...
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 /*
34   allocate a packet for use in client<->daemon communication
35  */
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37                                             TALLOC_CTX *mem_ctx, 
38                                             enum ctdb_operation operation, 
39                                             size_t length, size_t slength,
40                                             const char *type)
41 {
42         int size;
43         struct ctdb_req_header *hdr;
44
45         length = MAX(length, slength);
46         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47
48         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49         if (hdr == NULL) {
50                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51                          operation, (unsigned)length));
52                 return NULL;
53         }
54         talloc_set_name_const(hdr, type);
55         memset(hdr, 0, slength);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90
91         for (fn=ctdb_db->calls;fn;fn=fn->next) {
92                 if (fn->id == call->call_id) break;
93         }
94         if (fn == NULL) {
95                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
96                 talloc_free(c);
97                 return -1;
98         }
99
100         if (fn->fn(c) != 0) {
101                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
102                 talloc_free(c);
103                 return -1;
104         }
105
106         if (header->laccessor != caller) {
107                 header->lacount = 0;
108         }
109         header->laccessor = caller;
110         header->lacount++;
111
112         /* we need to force the record to be written out if this was a remote access,
113            so that the lacount is updated */
114         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115                 c->new_data = &c->record_data;
116         }
117
118         if (c->new_data) {
119                 /* XXX check that we always have the lock here? */
120                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122                         talloc_free(c);
123                         return -1;
124                 }
125         }
126
127         if (c->reply_data) {
128                 call->reply_data = *c->reply_data;
129
130                 talloc_steal(call, call->reply_data.dptr);
131                 talloc_set_name_const(call->reply_data.dptr, __location__);
132         } else {
133                 call->reply_data.dptr = NULL;
134                 call->reply_data.dsize = 0;
135         }
136         call->status = c->status;
137
138         talloc_free(c);
139
140         return 0;
141 }
142
143
144 /*
145   queue a packet for sending from client to daemon
146 */
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 {
149         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 }
151
152
153 /*
154   called when a CTDB_REPLY_CALL packet comes in in the client
155
156   This packet comes in response to a CTDB_REQ_CALL request packet. It
157   contains any reply data from the call
158 */
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 {
161         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162         struct ctdb_client_call_state *state;
163
164         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165         if (state == NULL) {
166                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
167                 return;
168         }
169
170         if (hdr->reqid != state->reqid) {
171                 /* we found a record  but it was the wrong one */
172                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
173                 return;
174         }
175
176         state->call->reply_data.dptr = c->data;
177         state->call->reply_data.dsize = c->datalen;
178         state->call->status = c->status;
179
180         talloc_steal(state, c);
181
182         state->state = CTDB_CALL_DONE;
183
184         if (state->async.fn) {
185                 state->async.fn(state);
186         }
187 }
188
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
190
191 /*
192   this is called in the client, when data comes in from the daemon
193  */
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 {
196         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
198         TALLOC_CTX *tmp_ctx;
199
200         /* place the packet as a child of a tmp_ctx. We then use
201            talloc_free() below to free it. If any of the calls want
202            to keep it, then they will steal it somewhere else, and the
203            talloc_free() will be a no-op */
204         tmp_ctx = talloc_new(ctdb);
205         talloc_steal(tmp_ctx, hdr);
206
207         if (cnt == 0) {
208                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
209                 exit(0);
210         }
211
212         if (cnt < sizeof(*hdr)) {
213                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
214                 goto done;
215         }
216         if (cnt != hdr->length) {
217                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
218                                (unsigned)hdr->length, (unsigned)cnt);
219                 goto done;
220         }
221
222         if (hdr->ctdb_magic != CTDB_MAGIC) {
223                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
224                 goto done;
225         }
226
227         if (hdr->ctdb_version != CTDB_VERSION) {
228                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
229                 goto done;
230         }
231
232         switch (hdr->operation) {
233         case CTDB_REPLY_CALL:
234                 ctdb_client_reply_call(ctdb, hdr);
235                 break;
236
237         case CTDB_REQ_MESSAGE:
238                 ctdb_request_message(ctdb, hdr);
239                 break;
240
241         case CTDB_REPLY_CONTROL:
242                 ctdb_client_reply_control(ctdb, hdr);
243                 break;
244
245         default:
246                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
247         }
248
249 done:
250         talloc_free(tmp_ctx);
251 }
252
253 /*
254   connect to a unix domain socket
255 */
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 {
258         struct sockaddr_un addr;
259
260         memset(&addr, 0, sizeof(addr));
261         addr.sun_family = AF_UNIX;
262         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263
264         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265         if (ctdb->daemon.sd == -1) {
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 return -1;
276         }
277
278         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
279                                               CTDB_DS_ALIGNMENT, 
280                                               ctdb_client_read_cb, ctdb);
281         return 0;
282 }
283
284
285 struct ctdb_record_handle {
286         struct ctdb_db_context *ctdb_db;
287         TDB_DATA key;
288         TDB_DATA *data;
289         struct ctdb_ltdb_header header;
290 };
291
292
293 /*
294   make a recv call to the local ctdb daemon - called from client context
295
296   This is called when the program wants to wait for a ctdb_call to complete and get the 
297   results. This call will block unless the call has already completed.
298 */
299 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
300 {
301         if (state == NULL) {
302                 return -1;
303         }
304
305         while (state->state < CTDB_CALL_DONE) {
306                 event_loop_once(state->ctdb_db->ctdb->ev);
307         }
308         if (state->state != CTDB_CALL_DONE) {
309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
310                 talloc_free(state);
311                 return -1;
312         }
313
314         if (state->call->reply_data.dsize) {
315                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
316                                                       state->call->reply_data.dptr,
317                                                       state->call->reply_data.dsize);
318                 call->reply_data.dsize = state->call->reply_data.dsize;
319         } else {
320                 call->reply_data.dptr = NULL;
321                 call->reply_data.dsize = 0;
322         }
323         call->status = state->call->status;
324         talloc_free(state);
325
326         return 0;
327 }
328
329
330
331
332 /*
333   destroy a ctdb_call in client
334 */
335 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
336 {
337         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
338         return 0;
339 }
340
341 /*
342   construct an event driven local ctdb_call
343
344   this is used so that locally processed ctdb_call requests are processed
345   in an event driven manner
346 */
347 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
348                                                                   struct ctdb_call *call,
349                                                                   struct ctdb_ltdb_header *header,
350                                                                   TDB_DATA *data)
351 {
352         struct ctdb_client_call_state *state;
353         struct ctdb_context *ctdb = ctdb_db->ctdb;
354         int ret;
355
356         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
357         CTDB_NO_MEMORY_NULL(ctdb, state);
358         state->call = talloc_zero(state, struct ctdb_call);
359         CTDB_NO_MEMORY_NULL(ctdb, state->call);
360
361         talloc_steal(state, data->dptr);
362
363         state->state   = CTDB_CALL_DONE;
364         *(state->call) = *call;
365         state->ctdb_db = ctdb_db;
366
367         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
368
369         return state;
370 }
371
372 /*
373   make a ctdb call to the local daemon - async send. Called from client context.
374
375   This constructs a ctdb_call request and queues it for processing. 
376   This call never blocks.
377 */
378 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
379                                               struct ctdb_call *call)
380 {
381         struct ctdb_client_call_state *state;
382         struct ctdb_context *ctdb = ctdb_db->ctdb;
383         struct ctdb_ltdb_header header;
384         TDB_DATA data;
385         int ret;
386         size_t len;
387         struct ctdb_req_call *c;
388
389         /* if the domain socket is not yet open, open it */
390         if (ctdb->daemon.sd==-1) {
391                 ctdb_socket_connect(ctdb);
392         }
393
394         ret = ctdb_ltdb_lock(ctdb_db, call->key);
395         if (ret != 0) {
396                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
397                 return NULL;
398         }
399
400         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
401
402         if (ret == 0 && header.dmaster == ctdb->pnn) {
403                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
404                 talloc_free(data.dptr);
405                 ctdb_ltdb_unlock(ctdb_db, call->key);
406                 return state;
407         }
408
409         ctdb_ltdb_unlock(ctdb_db, call->key);
410         talloc_free(data.dptr);
411
412         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
413         if (state == NULL) {
414                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
415                 return NULL;
416         }
417         state->call = talloc_zero(state, struct ctdb_call);
418         if (state->call == NULL) {
419                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
420                 return NULL;
421         }
422
423         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
424         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
425         if (c == NULL) {
426                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
427                 return NULL;
428         }
429
430         state->reqid     = ctdb_reqid_new(ctdb, state);
431         state->ctdb_db = ctdb_db;
432         talloc_set_destructor(state, ctdb_client_call_destructor);
433
434         c->hdr.reqid     = state->reqid;
435         c->flags         = call->flags;
436         c->db_id         = ctdb_db->db_id;
437         c->callid        = call->call_id;
438         c->hopcount      = 0;
439         c->keylen        = call->key.dsize;
440         c->calldatalen   = call->call_data.dsize;
441         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
442         memcpy(&c->data[call->key.dsize], 
443                call->call_data.dptr, call->call_data.dsize);
444         *(state->call)              = *call;
445         state->call->call_data.dptr = &c->data[call->key.dsize];
446         state->call->key.dptr       = &c->data[0];
447
448         state->state  = CTDB_CALL_WAIT;
449
450
451         ctdb_client_queue_pkt(ctdb, &c->hdr);
452
453         return state;
454 }
455
456
457 /*
458   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
459 */
460 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
461 {
462         struct ctdb_client_call_state *state;
463
464         state = ctdb_call_send(ctdb_db, call);
465         return ctdb_call_recv(state, call);
466 }
467
468
469 /*
470   tell the daemon what messaging srvid we will use, and register the message
471   handler function in the client
472 */
473 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
474                              ctdb_message_fn_t handler,
475                              void *private_data)
476                                     
477 {
478         int res;
479         int32_t status;
480         
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499         
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512
513 /*
514   send a message - from client context
515  */
516 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
517                       uint64_t srvid, TDB_DATA data)
518 {
519         struct ctdb_req_message *r;
520         int len, res;
521
522         len = offsetof(struct ctdb_req_message, data) + data.dsize;
523         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
524                                len, struct ctdb_req_message);
525         CTDB_NO_MEMORY(ctdb, r);
526
527         r->hdr.destnode  = pnn;
528         r->srvid         = srvid;
529         r->datalen       = data.dsize;
530         memcpy(&r->data[0], data.dptr, data.dsize);
531         
532         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
533         if (res != 0) {
534                 return res;
535         }
536
537         talloc_free(r);
538         return 0;
539 }
540
541
542 /*
543   cancel a ctdb_fetch_lock operation, releasing the lock
544  */
545 static int fetch_lock_destructor(struct ctdb_record_handle *h)
546 {
547         ctdb_ltdb_unlock(h->ctdb_db, h->key);
548         return 0;
549 }
550
551 /*
552   force the migration of a record to this node
553  */
554 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
555 {
556         struct ctdb_call call;
557         ZERO_STRUCT(call);
558         call.call_id = CTDB_NULL_FUNC;
559         call.key = key;
560         call.flags = CTDB_IMMEDIATE_MIGRATION;
561         return ctdb_call(ctdb_db, &call);
562 }
563
564 /*
565   get a lock on a record, and return the records data. Blocks until it gets the lock
566  */
567 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
568                                            TDB_DATA key, TDB_DATA *data)
569 {
570         int ret;
571         struct ctdb_record_handle *h;
572
573         /*
574           procedure is as follows:
575
576           1) get the chain lock. 
577           2) check if we are dmaster
578           3) if we are the dmaster then return handle 
579           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
580              reply from ctdbd
581           5) when we get the reply, goto (1)
582          */
583
584         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
585         if (h == NULL) {
586                 return NULL;
587         }
588
589         h->ctdb_db = ctdb_db;
590         h->key     = key;
591         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
592         if (h->key.dptr == NULL) {
593                 talloc_free(h);
594                 return NULL;
595         }
596         h->data    = data;
597
598         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
599                  (const char *)key.dptr));
600
601 again:
602         /* step 1 - get the chain lock */
603         ret = ctdb_ltdb_lock(ctdb_db, key);
604         if (ret != 0) {
605                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
606                 talloc_free(h);
607                 return NULL;
608         }
609
610         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
611
612         talloc_set_destructor(h, fetch_lock_destructor);
613
614         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
615
616         /* when torturing, ensure we test the remote path */
617         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
618             random() % 5 == 0) {
619                 h->header.dmaster = (uint32_t)-1;
620         }
621
622
623         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
624
625         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
626                 ctdb_ltdb_unlock(ctdb_db, key);
627                 ret = ctdb_client_force_migration(ctdb_db, key);
628                 if (ret != 0) {
629                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
630                         talloc_free(h);
631                         return NULL;
632                 }
633                 goto again;
634         }
635
636         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
637         return h;
638 }
639
640 /*
641   store some data to the record that was locked with ctdb_fetch_lock()
642 */
643 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
644 {
645         int ret;
646         int32_t status;
647         struct ctdb_rec_data *rec;
648         TDB_DATA recdata;
649
650         if (h->ctdb_db->persistent) {
651                 h->header.rsn++;
652         }
653
654         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
655         if (ret != 0) {
656                 return ret;
657         }
658
659         /* don't need the persistent_store control for non-persistent databases */
660         if (!h->ctdb_db->persistent) {
661                 return 0;
662         }
663
664         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
665         if (rec == NULL) {
666                 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
667                 return -1;
668         }
669
670         recdata.dptr = (uint8_t *)rec;
671         recdata.dsize = rec->length;
672
673         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
674                            CTDB_CONTROL_PERSISTENT_STORE, 0,
675                            recdata, NULL, NULL, &status, NULL, NULL);
676
677         talloc_free(rec);
678
679         if (ret != 0 || status != 0) {
680                 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
681                 return -1;
682         }
683
684         return 0;
685 }
686
687 /*
688   non-locking fetch of a record
689  */
690 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
691                TDB_DATA key, TDB_DATA *data)
692 {
693         struct ctdb_call call;
694         int ret;
695
696         call.call_id = CTDB_FETCH_FUNC;
697         call.call_data.dptr = NULL;
698         call.call_data.dsize = 0;
699
700         ret = ctdb_call(ctdb_db, &call);
701
702         if (ret == 0) {
703                 *data = call.reply_data;
704                 talloc_steal(mem_ctx, data->dptr);
705         }
706
707         return ret;
708 }
709
710
711
712 /*
713    called when a control completes or timesout to invoke the callback
714    function the user provided
715 */
716 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
717         struct timeval t, void *private_data)
718 {
719         struct ctdb_client_control_state *state;
720         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
721         int ret;
722
723         state = talloc_get_type(private_data, struct ctdb_client_control_state);
724         talloc_steal(tmp_ctx, state);
725
726         ret = ctdb_control_recv(state->ctdb, state, state,
727                         NULL, 
728                         NULL, 
729                         NULL);
730
731         talloc_free(tmp_ctx);
732 }
733
734 /*
735   called when a CTDB_REPLY_CONTROL packet comes in in the client
736
737   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
738   contains any reply data from the control
739 */
740 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
741                                       struct ctdb_req_header *hdr)
742 {
743         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
744         struct ctdb_client_control_state *state;
745
746         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
747         if (state == NULL) {
748                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
749                 return;
750         }
751
752         if (hdr->reqid != state->reqid) {
753                 /* we found a record  but it was the wrong one */
754                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
755                 return;
756         }
757
758         state->outdata.dptr = c->data;
759         state->outdata.dsize = c->datalen;
760         state->status = c->status;
761         if (c->errorlen) {
762                 state->errormsg = talloc_strndup(state, 
763                                                  (char *)&c->data[c->datalen], 
764                                                  c->errorlen);
765         }
766
767         /* state->outdata now uses resources from c so we dont want c
768            to just dissappear from under us while state is still alive
769         */
770         talloc_steal(state, c);
771
772         state->state = CTDB_CONTROL_DONE;
773
774         /* if we had a callback registered for this control, pull the response
775            and call the callback.
776         */
777         if (state->async.fn) {
778                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
779         }
780 }
781
782
783 /*
784   destroy a ctdb_control in client
785 */
786 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
787 {
788         ctdb_reqid_remove(state->ctdb, state->reqid);
789         return 0;
790 }
791
792
793 /* time out handler for ctdb_control */
794 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
795         struct timeval t, void *private_data)
796 {
797         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
798
799         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
800
801         state->state = CTDB_CONTROL_TIMEOUT;
802
803         /* if we had a callback registered for this control, pull the response
804            and call the callback.
805         */
806         if (state->async.fn) {
807                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
808         }
809 }
810
811 /* async version of send control request */
812 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
813                 uint32_t destnode, uint64_t srvid, 
814                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
815                 TALLOC_CTX *mem_ctx,
816                 struct timeval *timeout,
817                 char **errormsg)
818 {
819         struct ctdb_client_control_state *state;
820         size_t len;
821         struct ctdb_req_control *c;
822         int ret;
823
824         if (errormsg) {
825                 *errormsg = NULL;
826         }
827
828         /* if the domain socket is not yet open, open it */
829         if (ctdb->daemon.sd==-1) {
830                 ctdb_socket_connect(ctdb);
831         }
832
833         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
834         CTDB_NO_MEMORY_NULL(ctdb, state);
835
836         state->ctdb       = ctdb;
837         state->reqid      = ctdb_reqid_new(ctdb, state);
838         state->state      = CTDB_CONTROL_WAIT;
839         state->errormsg   = NULL;
840
841         talloc_set_destructor(state, ctdb_control_destructor);
842
843         len = offsetof(struct ctdb_req_control, data) + data.dsize;
844         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
845                                len, struct ctdb_req_control);
846         state->c            = c;        
847         CTDB_NO_MEMORY_NULL(ctdb, c);
848         c->hdr.reqid        = state->reqid;
849         c->hdr.destnode     = destnode;
850         c->hdr.reqid        = state->reqid;
851         c->opcode           = opcode;
852         c->client_id        = 0;
853         c->flags            = flags;
854         c->srvid            = srvid;
855         c->datalen          = data.dsize;
856         if (data.dsize) {
857                 memcpy(&c->data[0], data.dptr, data.dsize);
858         }
859
860         /* timeout */
861         if (timeout && !timeval_is_zero(timeout)) {
862                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
863         }
864
865         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
866         if (ret != 0) {
867                 talloc_free(state);
868                 return NULL;
869         }
870
871         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
872                 talloc_free(state);
873                 return NULL;
874         }
875
876         return state;
877 }
878
879
880 /* async version of receive control reply */
881 int ctdb_control_recv(struct ctdb_context *ctdb, 
882                 struct ctdb_client_control_state *state, 
883                 TALLOC_CTX *mem_ctx,
884                 TDB_DATA *outdata, int32_t *status, char **errormsg)
885 {
886         TALLOC_CTX *tmp_ctx;
887
888         if (status != NULL) {
889                 *status = -1;
890         }
891         if (errormsg != NULL) {
892                 *errormsg = NULL;
893         }
894
895         if (state == NULL) {
896                 return -1;
897         }
898
899         /* prevent double free of state */
900         tmp_ctx = talloc_new(ctdb);
901         talloc_steal(tmp_ctx, state);
902
903         /* loop one event at a time until we either timeout or the control
904            completes.
905         */
906         while (state->state == CTDB_CONTROL_WAIT) {
907                 event_loop_once(ctdb->ev);
908         }
909
910         if (state->state != CTDB_CONTROL_DONE) {
911                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
912                 if (state->async.fn) {
913                         state->async.fn(state);
914                 }
915                 talloc_free(tmp_ctx);
916                 return -1;
917         }
918
919         if (state->errormsg) {
920                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
921                 if (errormsg) {
922                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
923                 }
924                 if (state->async.fn) {
925                         state->async.fn(state);
926                 }
927                 talloc_free(tmp_ctx);
928                 return -1;
929         }
930
931         if (outdata) {
932                 *outdata = state->outdata;
933                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
934         }
935
936         if (status) {
937                 *status = state->status;
938         }
939
940         if (state->async.fn) {
941                 state->async.fn(state);
942         }
943
944         talloc_free(tmp_ctx);
945         return 0;
946 }
947
948
949
950 /*
951   send a ctdb control message
952   timeout specifies how long we should wait for a reply.
953   if timeout is NULL we wait indefinitely
954  */
955 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
956                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
957                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
958                  struct timeval *timeout,
959                  char **errormsg)
960 {
961         struct ctdb_client_control_state *state;
962
963         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
964                         flags, data, mem_ctx,
965                         timeout, errormsg);
966         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
967                         errormsg);
968 }
969
970
971
972
973 /*
974   a process exists call. Returns 0 if process exists, -1 otherwise
975  */
976 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
977 {
978         int ret;
979         TDB_DATA data;
980         int32_t status;
981
982         data.dptr = (uint8_t*)&pid;
983         data.dsize = sizeof(pid);
984
985         ret = ctdb_control(ctdb, destnode, 0, 
986                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
987                            NULL, NULL, &status, NULL, NULL);
988         if (ret != 0) {
989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
990                 return -1;
991         }
992
993         return status;
994 }
995
996 /*
997   get remote statistics
998  */
999 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1000 {
1001         int ret;
1002         TDB_DATA data;
1003         int32_t res;
1004
1005         ret = ctdb_control(ctdb, destnode, 0, 
1006                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1007                            ctdb, &data, &res, NULL, NULL);
1008         if (ret != 0 || res != 0) {
1009                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1010                 return -1;
1011         }
1012
1013         if (data.dsize != sizeof(struct ctdb_statistics)) {
1014                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1015                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1016                       return -1;
1017         }
1018
1019         *status = *(struct ctdb_statistics *)data.dptr;
1020         talloc_free(data.dptr);
1021                         
1022         return 0;
1023 }
1024
1025 /*
1026   shutdown a remote ctdb node
1027  */
1028 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1029 {
1030         struct ctdb_client_control_state *state;
1031
1032         state = ctdb_control_send(ctdb, destnode, 0, 
1033                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1034                            NULL, &timeout, NULL);
1035         if (state == NULL) {
1036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1037                 return -1;
1038         }
1039
1040         return 0;
1041 }
1042
1043 /*
1044   get vnn map from a remote node
1045  */
1046 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1047 {
1048         int ret;
1049         TDB_DATA outdata;
1050         int32_t res;
1051         struct ctdb_vnn_map_wire *map;
1052
1053         ret = ctdb_control(ctdb, destnode, 0, 
1054                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1055                            mem_ctx, &outdata, &res, &timeout, NULL);
1056         if (ret != 0 || res != 0) {
1057                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1058                 return -1;
1059         }
1060         
1061         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1062         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1063             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1064                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1065                 return -1;
1066         }
1067
1068         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1069         CTDB_NO_MEMORY(ctdb, *vnnmap);
1070         (*vnnmap)->generation = map->generation;
1071         (*vnnmap)->size       = map->size;
1072         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1073
1074         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1075         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1076         talloc_free(outdata.dptr);
1077                     
1078         return 0;
1079 }
1080
1081
1082 /*
1083   get the recovery mode of a remote node
1084  */
1085 struct ctdb_client_control_state *
1086 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1087 {
1088         return ctdb_control_send(ctdb, destnode, 0, 
1089                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1090                            mem_ctx, &timeout, NULL);
1091 }
1092
1093 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1094 {
1095         int ret;
1096         int32_t res;
1097
1098         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1101                 return -1;
1102         }
1103
1104         if (recmode) {
1105                 *recmode = (uint32_t)res;
1106         }
1107
1108         return 0;
1109 }
1110
1111 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1112 {
1113         struct ctdb_client_control_state *state;
1114
1115         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1116         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1117 }
1118
1119
1120
1121
1122 /*
1123   set the recovery mode of a remote node
1124  */
1125 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1126 {
1127         int ret;
1128         TDB_DATA data;
1129         int32_t res;
1130
1131         data.dsize = sizeof(uint32_t);
1132         data.dptr = (unsigned char *)&recmode;
1133
1134         ret = ctdb_control(ctdb, destnode, 0, 
1135                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1136                            NULL, NULL, &res, &timeout, NULL);
1137         if (ret != 0 || res != 0) {
1138                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1139                 return -1;
1140         }
1141
1142         return 0;
1143 }
1144
1145
1146
1147 /*
1148   get the recovery master of a remote node
1149  */
1150 struct ctdb_client_control_state *
1151 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1152                         struct timeval timeout, uint32_t destnode)
1153 {
1154         return ctdb_control_send(ctdb, destnode, 0, 
1155                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1156                            mem_ctx, &timeout, NULL);
1157 }
1158
1159 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1160 {
1161         int ret;
1162         int32_t res;
1163
1164         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1165         if (ret != 0) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1167                 return -1;
1168         }
1169
1170         if (recmaster) {
1171                 *recmaster = (uint32_t)res;
1172         }
1173
1174         return 0;
1175 }
1176
1177 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1178 {
1179         struct ctdb_client_control_state *state;
1180
1181         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1182         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1183 }
1184
1185
1186 /*
1187   set the recovery master of a remote node
1188  */
1189 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1190 {
1191         int ret;
1192         TDB_DATA data;
1193         int32_t res;
1194
1195         ZERO_STRUCT(data);
1196         data.dsize = sizeof(uint32_t);
1197         data.dptr = (unsigned char *)&recmaster;
1198
1199         ret = ctdb_control(ctdb, destnode, 0, 
1200                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1201                            NULL, NULL, &res, &timeout, NULL);
1202         if (ret != 0 || res != 0) {
1203                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1204                 return -1;
1205         }
1206
1207         return 0;
1208 }
1209
1210
1211 /*
1212   get a list of databases off a remote node
1213  */
1214 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1215                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1216 {
1217         int ret;
1218         TDB_DATA outdata;
1219         int32_t res;
1220
1221         ret = ctdb_control(ctdb, destnode, 0, 
1222                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1223                            mem_ctx, &outdata, &res, &timeout, NULL);
1224         if (ret != 0 || res != 0) {
1225                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1226                 return -1;
1227         }
1228
1229         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1230         talloc_free(outdata.dptr);
1231                     
1232         return 0;
1233 }
1234
1235 /*
1236   get a list of nodes (vnn and flags ) from a remote node
1237  */
1238 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1239                 struct timeval timeout, uint32_t destnode, 
1240                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1241 {
1242         int ret;
1243         TDB_DATA outdata;
1244         int32_t res;
1245
1246         ret = ctdb_control(ctdb, destnode, 0, 
1247                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1248                            mem_ctx, &outdata, &res, &timeout, NULL);
1249         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1251                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1252         }
1253         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1254                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1255                 return -1;
1256         }
1257
1258         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1259         talloc_free(outdata.dptr);
1260                     
1261         return 0;
1262 }
1263
1264 /*
1265   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1266  */
1267 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1268                 struct timeval timeout, uint32_t destnode, 
1269                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1270 {
1271         int ret, i, len;
1272         TDB_DATA outdata;
1273         struct ctdb_node_mapv4 *nodemapv4;
1274         int32_t res;
1275
1276         ret = ctdb_control(ctdb, destnode, 0, 
1277                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1278                            mem_ctx, &outdata, &res, &timeout, NULL);
1279         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1281                 return -1;
1282         }
1283
1284         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1285
1286         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1287         (*nodemap) = talloc_zero_size(mem_ctx, len);
1288         CTDB_NO_MEMORY(ctdb, (*nodemap));
1289
1290         (*nodemap)->num = nodemapv4->num;
1291         for (i=0; i<nodemapv4->num; i++) {
1292                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1293                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1294                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1295                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1296         }
1297                 
1298         talloc_free(outdata.dptr);
1299                     
1300         return 0;
1301 }
1302
1303 /*
1304   drop the transport, reload the nodes file and restart the transport
1305  */
1306 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1307                     struct timeval timeout, uint32_t destnode)
1308 {
1309         int ret;
1310         int32_t res;
1311
1312         ret = ctdb_control(ctdb, destnode, 0, 
1313                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1314                            NULL, NULL, &res, &timeout, NULL);
1315         if (ret != 0 || res != 0) {
1316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1317                 return -1;
1318         }
1319
1320         return 0;
1321 }
1322
1323
1324 /*
1325   set vnn map on a node
1326  */
1327 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1328                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1329 {
1330         int ret;
1331         TDB_DATA data;
1332         int32_t res;
1333         struct ctdb_vnn_map_wire *map;
1334         size_t len;
1335
1336         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1337         map = talloc_size(mem_ctx, len);
1338         CTDB_NO_MEMORY(ctdb, map);
1339
1340         map->generation = vnnmap->generation;
1341         map->size = vnnmap->size;
1342         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1343         
1344         data.dsize = len;
1345         data.dptr  = (uint8_t *)map;
1346
1347         ret = ctdb_control(ctdb, destnode, 0, 
1348                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1349                            NULL, NULL, &res, &timeout, NULL);
1350         if (ret != 0 || res != 0) {
1351                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1352                 return -1;
1353         }
1354
1355         talloc_free(map);
1356
1357         return 0;
1358 }
1359
1360
1361 /*
1362   async send for pull database
1363  */
1364 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1365         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1366         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1367 {
1368         TDB_DATA indata;
1369         struct ctdb_control_pulldb *pull;
1370         struct ctdb_client_control_state *state;
1371
1372         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1373         CTDB_NO_MEMORY_NULL(ctdb, pull);
1374
1375         pull->db_id   = dbid;
1376         pull->lmaster = lmaster;
1377
1378         indata.dsize = sizeof(struct ctdb_control_pulldb);
1379         indata.dptr  = (unsigned char *)pull;
1380
1381         state = ctdb_control_send(ctdb, destnode, 0, 
1382                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1383                                   mem_ctx, &timeout, NULL);
1384         talloc_free(pull);
1385
1386         return state;
1387 }
1388
1389 /*
1390   async recv for pull database
1391  */
1392 int ctdb_ctrl_pulldb_recv(
1393         struct ctdb_context *ctdb, 
1394         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1395         TDB_DATA *outdata)
1396 {
1397         int ret;
1398         int32_t res;
1399
1400         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1401         if ( (ret != 0) || (res != 0) ){
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1403                 return -1;
1404         }
1405
1406         return 0;
1407 }
1408
1409 /*
1410   pull all keys and records for a specific database on a node
1411  */
1412 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1413                 uint32_t dbid, uint32_t lmaster, 
1414                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1415                 TDB_DATA *outdata)
1416 {
1417         struct ctdb_client_control_state *state;
1418
1419         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1420                                       timeout);
1421         
1422         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1423 }
1424
1425
1426 /*
1427   change dmaster for all keys in the database to the new value
1428  */
1429 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1430                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1431 {
1432         int ret;
1433         TDB_DATA indata;
1434         int32_t res;
1435
1436         indata.dsize = 2*sizeof(uint32_t);
1437         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1438
1439         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1440         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1441
1442         ret = ctdb_control(ctdb, destnode, 0, 
1443                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1444                            NULL, NULL, &res, &timeout, NULL);
1445         if (ret != 0 || res != 0) {
1446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1447                 return -1;
1448         }
1449
1450         return 0;
1451 }
1452
1453 /*
1454   ping a node, return number of clients connected
1455  */
1456 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1457 {
1458         int ret;
1459         int32_t res;
1460
1461         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1462                            tdb_null, NULL, NULL, &res, NULL, NULL);
1463         if (ret != 0) {
1464                 return -1;
1465         }
1466         return res;
1467 }
1468
1469 /*
1470   find the real path to a ltdb 
1471  */
1472 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1473                    const char **path)
1474 {
1475         int ret;
1476         int32_t res;
1477         TDB_DATA data;
1478
1479         data.dptr = (uint8_t *)&dbid;
1480         data.dsize = sizeof(dbid);
1481
1482         ret = ctdb_control(ctdb, destnode, 0, 
1483                            CTDB_CONTROL_GETDBPATH, 0, data, 
1484                            mem_ctx, &data, &res, &timeout, NULL);
1485         if (ret != 0 || res != 0) {
1486                 return -1;
1487         }
1488
1489         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1490         if ((*path) == NULL) {
1491                 return -1;
1492         }
1493
1494         talloc_free(data.dptr);
1495
1496         return 0;
1497 }
1498
1499 /*
1500   find the name of a db 
1501  */
1502 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1503                    const char **name)
1504 {
1505         int ret;
1506         int32_t res;
1507         TDB_DATA data;
1508
1509         data.dptr = (uint8_t *)&dbid;
1510         data.dsize = sizeof(dbid);
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1514                            mem_ctx, &data, &res, &timeout, NULL);
1515         if (ret != 0 || res != 0) {
1516                 return -1;
1517         }
1518
1519         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1520         if ((*name) == NULL) {
1521                 return -1;
1522         }
1523
1524         talloc_free(data.dptr);
1525
1526         return 0;
1527 }
1528
1529 /*
1530   create a database
1531  */
1532 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1533                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1534 {
1535         int ret;
1536         int32_t res;
1537         TDB_DATA data;
1538
1539         data.dptr = discard_const(name);
1540         data.dsize = strlen(name)+1;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1544                            0, data, 
1545                            mem_ctx, &data, &res, &timeout, NULL);
1546
1547         if (ret != 0 || res != 0) {
1548                 return -1;
1549         }
1550
1551         return 0;
1552 }
1553
1554 /*
1555   get debug level on a node
1556  */
1557 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1558 {
1559         int ret;
1560         int32_t res;
1561         TDB_DATA data;
1562
1563         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1564                            ctdb, &data, &res, NULL, NULL);
1565         if (ret != 0 || res != 0) {
1566                 return -1;
1567         }
1568         if (data.dsize != sizeof(int32_t)) {
1569                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1570                          (unsigned)data.dsize));
1571                 return -1;
1572         }
1573         *level = *(int32_t *)data.dptr;
1574         talloc_free(data.dptr);
1575         return 0;
1576 }
1577
1578 /*
1579   set debug level on a node
1580  */
1581 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1582 {
1583         int ret;
1584         int32_t res;
1585         TDB_DATA data;
1586
1587         data.dptr = (uint8_t *)&level;
1588         data.dsize = sizeof(level);
1589
1590         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1591                            NULL, NULL, &res, NULL, NULL);
1592         if (ret != 0 || res != 0) {
1593                 return -1;
1594         }
1595         return 0;
1596 }
1597
1598
1599 /*
1600   get a list of connected nodes
1601  */
1602 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1603                                 struct timeval timeout,
1604                                 TALLOC_CTX *mem_ctx,
1605                                 uint32_t *num_nodes)
1606 {
1607         struct ctdb_node_map *map=NULL;
1608         int ret, i;
1609         uint32_t *nodes;
1610
1611         *num_nodes = 0;
1612
1613         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1614         if (ret != 0) {
1615                 return NULL;
1616         }
1617
1618         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1619         if (nodes == NULL) {
1620                 return NULL;
1621         }
1622
1623         for (i=0;i<map->num;i++) {
1624                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1625                         nodes[*num_nodes] = map->nodes[i].pnn;
1626                         (*num_nodes)++;
1627                 }
1628         }
1629
1630         return nodes;
1631 }
1632
1633
1634 /*
1635   reset remote status
1636  */
1637 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1638 {
1639         int ret;
1640         int32_t res;
1641
1642         ret = ctdb_control(ctdb, destnode, 0, 
1643                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1644                            NULL, NULL, &res, NULL, NULL);
1645         if (ret != 0 || res != 0) {
1646                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1647                 return -1;
1648         }
1649         return 0;
1650 }
1651
1652 /*
1653   this is the dummy null procedure that all databases support
1654 */
1655 static int ctdb_null_func(struct ctdb_call_info *call)
1656 {
1657         return 0;
1658 }
1659
1660 /*
1661   this is a plain fetch procedure that all databases support
1662 */
1663 static int ctdb_fetch_func(struct ctdb_call_info *call)
1664 {
1665         call->reply_data = &call->record_data;
1666         return 0;
1667 }
1668
1669 /*
1670   attach to a specific database - client call
1671 */
1672 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1673 {
1674         struct ctdb_db_context *ctdb_db;
1675         TDB_DATA data;
1676         int ret;
1677         int32_t res;
1678
1679         ctdb_db = ctdb_db_handle(ctdb, name);
1680         if (ctdb_db) {
1681                 return ctdb_db;
1682         }
1683
1684         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1685         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1686
1687         ctdb_db->ctdb = ctdb;
1688         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1689         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1690
1691         data.dptr = discard_const(name);
1692         data.dsize = strlen(name)+1;
1693
1694         /* tell ctdb daemon to attach */
1695         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1696                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1697                            0, data, ctdb_db, &data, &res, NULL, NULL);
1698         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1699                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1700                 talloc_free(ctdb_db);
1701                 return NULL;
1702         }
1703         
1704         ctdb_db->db_id = *(uint32_t *)data.dptr;
1705         talloc_free(data.dptr);
1706
1707         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1708         if (ret != 0) {
1709                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1710                 talloc_free(ctdb_db);
1711                 return NULL;
1712         }
1713
1714         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1715         if (!ctdb->do_setsched) {
1716                 tdb_flags |= TDB_NOMMAP;
1717         }
1718
1719         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1720         if (ctdb_db->ltdb == NULL) {
1721                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1722                 talloc_free(ctdb_db);
1723                 return NULL;
1724         }
1725
1726         ctdb_db->persistent = persistent;
1727
1728         DLIST_ADD(ctdb->db_list, ctdb_db);
1729
1730         /* add well known functions */
1731         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1732         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1733
1734         return ctdb_db;
1735 }
1736
1737
1738 /*
1739   setup a call for a database
1740  */
1741 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1742 {
1743         struct ctdb_registered_call *call;
1744
1745 #if 0
1746         TDB_DATA data;
1747         int32_t status;
1748         struct ctdb_control_set_call c;
1749         int ret;
1750
1751         /* this is no longer valid with the separate daemon architecture */
1752         c.db_id = ctdb_db->db_id;
1753         c.fn    = fn;
1754         c.id    = id;
1755
1756         data.dptr = (uint8_t *)&c;
1757         data.dsize = sizeof(c);
1758
1759         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1760                            data, NULL, NULL, &status, NULL, NULL);
1761         if (ret != 0 || status != 0) {
1762                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1763                 return -1;
1764         }
1765 #endif
1766
1767         /* also register locally */
1768         call = talloc(ctdb_db, struct ctdb_registered_call);
1769         call->fn = fn;
1770         call->id = id;
1771
1772         DLIST_ADD(ctdb_db->calls, call);        
1773         return 0;
1774 }
1775
1776
1777 struct traverse_state {
1778         bool done;
1779         uint32_t count;
1780         ctdb_traverse_func fn;
1781         void *private_data;
1782 };
1783
1784 /*
1785   called on each key during a ctdb_traverse
1786  */
1787 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1788 {
1789         struct traverse_state *state = (struct traverse_state *)p;
1790         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1791         TDB_DATA key;
1792
1793         if (data.dsize < sizeof(uint32_t) ||
1794             d->length != data.dsize) {
1795                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1796                 state->done = True;
1797                 return;
1798         }
1799
1800         key.dsize = d->keylen;
1801         key.dptr  = &d->data[0];
1802         data.dsize = d->datalen;
1803         data.dptr = &d->data[d->keylen];
1804
1805         if (key.dsize == 0 && data.dsize == 0) {
1806                 /* end of traverse */
1807                 state->done = True;
1808                 return;
1809         }
1810
1811         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1812                 /* empty records are deleted records in ctdb */
1813                 return;
1814         }
1815
1816         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1817                 state->done = True;
1818         }
1819
1820         state->count++;
1821 }
1822
1823
1824 /*
1825   start a cluster wide traverse, calling the supplied fn on each record
1826   return the number of records traversed, or -1 on error
1827  */
1828 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1829 {
1830         TDB_DATA data;
1831         struct ctdb_traverse_start t;
1832         int32_t status;
1833         int ret;
1834         uint64_t srvid = (getpid() | 0xFLL<<60);
1835         struct traverse_state state;
1836
1837         state.done = False;
1838         state.count = 0;
1839         state.private_data = private_data;
1840         state.fn = fn;
1841
1842         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1845                 return -1;
1846         }
1847
1848         t.db_id = ctdb_db->db_id;
1849         t.srvid = srvid;
1850         t.reqid = 0;
1851
1852         data.dptr = (uint8_t *)&t;
1853         data.dsize = sizeof(t);
1854
1855         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1856                            data, NULL, NULL, &status, NULL, NULL);
1857         if (ret != 0 || status != 0) {
1858                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1859                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1860                 return -1;
1861         }
1862
1863         while (!state.done) {
1864                 event_loop_once(ctdb_db->ctdb->ev);
1865         }
1866
1867         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1870                 return -1;
1871         }
1872
1873         return state.count;
1874 }
1875
1876 #define ISASCII(x) ((x>31)&&(x<128))
1877 /*
1878   called on each key during a catdb
1879  */
1880 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1881 {
1882         int i;
1883         FILE *f = (FILE *)p;
1884         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1885
1886         fprintf(f, "dmaster: %u\n", h->dmaster);
1887         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1888
1889         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1890         for (i=0;i<key.dsize;i++) {
1891                 if (ISASCII(key.dptr[i])) {
1892                         fprintf(f, "%c", key.dptr[i]);
1893                 } else {
1894                         fprintf(f, "\\%02X", key.dptr[i]);
1895                 }
1896         }
1897         fprintf(f, "\"\n");
1898
1899         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1900         for (i=sizeof(*h);i<data.dsize;i++) {
1901                 if (ISASCII(data.dptr[i])) {
1902                         fprintf(f, "%c", data.dptr[i]);
1903                 } else {
1904                         fprintf(f, "\\%02X", data.dptr[i]);
1905                 }
1906         }
1907         fprintf(f, "\"\n");
1908
1909         return 0;
1910 }
1911
1912 /*
1913   convenience function to list all keys to stdout
1914  */
1915 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1916 {
1917         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1918 }
1919
1920 /*
1921   get the pid of a ctdb daemon
1922  */
1923 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1924 {
1925         int ret;
1926         int32_t res;
1927
1928         ret = ctdb_control(ctdb, destnode, 0, 
1929                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1930                            NULL, NULL, &res, &timeout, NULL);
1931         if (ret != 0) {
1932                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1933                 return -1;
1934         }
1935
1936         *pid = res;
1937
1938         return 0;
1939 }
1940
1941
1942 /*
1943   async freeze send control
1944  */
1945 struct ctdb_client_control_state *
1946 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1947 {
1948         return ctdb_control_send(ctdb, destnode, 0, 
1949                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1950                            mem_ctx, &timeout, NULL);
1951 }
1952
1953 /* 
1954    async freeze recv control
1955 */
1956 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1957 {
1958         int ret;
1959         int32_t res;
1960
1961         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1962         if ( (ret != 0) || (res != 0) ){
1963                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1964                 return -1;
1965         }
1966
1967         return 0;
1968 }
1969
1970 /*
1971   freeze a node
1972  */
1973 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1974 {
1975         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1976         struct ctdb_client_control_state *state;
1977         int ret;
1978
1979         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1980         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1981         talloc_free(tmp_ctx);
1982
1983         return ret;
1984 }
1985
1986 /*
1987   thaw a node
1988  */
1989 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1990 {
1991         int ret;
1992         int32_t res;
1993
1994         ret = ctdb_control(ctdb, destnode, 0, 
1995                            CTDB_CONTROL_THAW, 0, tdb_null, 
1996                            NULL, NULL, &res, &timeout, NULL);
1997         if (ret != 0 || res != 0) {
1998                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1999                 return -1;
2000         }
2001
2002         return 0;
2003 }
2004
2005 /*
2006   get pnn of a node, or -1
2007  */
2008 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2009 {
2010         int ret;
2011         int32_t res;
2012
2013         ret = ctdb_control(ctdb, destnode, 0, 
2014                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2015                            NULL, NULL, &res, &timeout, NULL);
2016         if (ret != 0) {
2017                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2018                 return -1;
2019         }
2020
2021         return res;
2022 }
2023
2024 /*
2025   get the monitoring mode of a remote node
2026  */
2027 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2028 {
2029         int ret;
2030         int32_t res;
2031
2032         ret = ctdb_control(ctdb, destnode, 0, 
2033                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2034                            NULL, NULL, &res, &timeout, NULL);
2035         if (ret != 0) {
2036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2037                 return -1;
2038         }
2039
2040         *monmode = res;
2041
2042         return 0;
2043 }
2044
2045
2046 /*
2047  set the monitoring mode of a remote node to active
2048  */
2049 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2050 {
2051         int ret;
2052         
2053
2054         ret = ctdb_control(ctdb, destnode, 0, 
2055                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2056                            NULL, NULL,NULL, &timeout, NULL);
2057         if (ret != 0) {
2058                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2059                 return -1;
2060         }
2061
2062         
2063
2064         return 0;
2065 }
2066
2067 /*
2068   set the monitoring mode of a remote node to disable
2069  */
2070 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2071 {
2072         int ret;
2073         
2074
2075         ret = ctdb_control(ctdb, destnode, 0, 
2076                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2077                            NULL, NULL, NULL, &timeout, NULL);
2078         if (ret != 0) {
2079                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2080                 return -1;
2081         }
2082
2083         
2084
2085         return 0;
2086 }
2087
2088
2089
2090 /* 
2091   sent to a node to make it take over an ip address
2092 */
2093 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2094                           uint32_t destnode, struct ctdb_public_ip *ip)
2095 {
2096         TDB_DATA data;
2097         struct ctdb_public_ipv4 ipv4;
2098         int ret;
2099         int32_t res;
2100
2101         if (ip->addr.sa.sa_family == AF_INET) {
2102                 ipv4.pnn = ip->pnn;
2103                 ipv4.sin = ip->addr.ip;
2104
2105                 data.dsize = sizeof(ipv4);
2106                 data.dptr  = (uint8_t *)&ipv4;
2107
2108                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2109                            NULL, &res, &timeout, NULL);
2110         } else {
2111                 data.dsize = sizeof(*ip);
2112                 data.dptr  = (uint8_t *)ip;
2113
2114                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2115                            NULL, &res, &timeout, NULL);
2116         }
2117
2118         if (ret != 0 || res != 0) {
2119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2120                 return -1;
2121         }
2122
2123         return 0;       
2124 }
2125
2126
2127 /* 
2128   sent to a node to make it release an ip address
2129 */
2130 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2131                          uint32_t destnode, struct ctdb_public_ip *ip)
2132 {
2133         TDB_DATA data;
2134         struct ctdb_public_ipv4 ipv4;
2135         int ret;
2136         int32_t res;
2137
2138         if (ip->addr.sa.sa_family == AF_INET) {
2139                 ipv4.pnn = ip->pnn;
2140                 ipv4.sin = ip->addr.ip;
2141
2142                 data.dsize = sizeof(ipv4);
2143                 data.dptr  = (uint8_t *)&ipv4;
2144
2145                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2146                                    NULL, &res, &timeout, NULL);
2147         } else {
2148                 data.dsize = sizeof(*ip);
2149                 data.dptr  = (uint8_t *)ip;
2150
2151                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2152                                    NULL, &res, &timeout, NULL);
2153         }
2154
2155         if (ret != 0 || res != 0) {
2156                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2157                 return -1;
2158         }
2159
2160         return 0;       
2161 }
2162
2163
2164 /*
2165   get a tunable
2166  */
2167 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2168                           struct timeval timeout, 
2169                           uint32_t destnode,
2170                           const char *name, uint32_t *value)
2171 {
2172         struct ctdb_control_get_tunable *t;
2173         TDB_DATA data, outdata;
2174         int32_t res;
2175         int ret;
2176
2177         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2178         data.dptr  = talloc_size(ctdb, data.dsize);
2179         CTDB_NO_MEMORY(ctdb, data.dptr);
2180
2181         t = (struct ctdb_control_get_tunable *)data.dptr;
2182         t->length = strlen(name)+1;
2183         memcpy(t->name, name, t->length);
2184
2185         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2186                            &outdata, &res, &timeout, NULL);
2187         talloc_free(data.dptr);
2188         if (ret != 0 || res != 0) {
2189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2190                 return -1;
2191         }
2192
2193         if (outdata.dsize != sizeof(uint32_t)) {
2194                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2195                 talloc_free(outdata.dptr);
2196                 return -1;
2197         }
2198         
2199         *value = *(uint32_t *)outdata.dptr;
2200         talloc_free(outdata.dptr);
2201
2202         return 0;
2203 }
2204
2205 /*
2206   set a tunable
2207  */
2208 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2209                           struct timeval timeout, 
2210                           uint32_t destnode,
2211                           const char *name, uint32_t value)
2212 {
2213         struct ctdb_control_set_tunable *t;
2214         TDB_DATA data;
2215         int32_t res;
2216         int ret;
2217
2218         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2219         data.dptr  = talloc_size(ctdb, data.dsize);
2220         CTDB_NO_MEMORY(ctdb, data.dptr);
2221
2222         t = (struct ctdb_control_set_tunable *)data.dptr;
2223         t->length = strlen(name)+1;
2224         memcpy(t->name, name, t->length);
2225         t->value = value;
2226
2227         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2228                            NULL, &res, &timeout, NULL);
2229         talloc_free(data.dptr);
2230         if (ret != 0 || res != 0) {
2231                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2232                 return -1;
2233         }
2234
2235         return 0;
2236 }
2237
2238 /*
2239   list tunables
2240  */
2241 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2242                             struct timeval timeout, 
2243                             uint32_t destnode,
2244                             TALLOC_CTX *mem_ctx,
2245                             const char ***list, uint32_t *count)
2246 {
2247         TDB_DATA outdata;
2248         int32_t res;
2249         int ret;
2250         struct ctdb_control_list_tunable *t;
2251         char *p, *s, *ptr;
2252
2253         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2254                            mem_ctx, &outdata, &res, &timeout, NULL);
2255         if (ret != 0 || res != 0) {
2256                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2257                 return -1;
2258         }
2259
2260         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2261         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2262             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2263                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2264                 talloc_free(outdata.dptr);
2265                 return -1;              
2266         }
2267         
2268         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2269         CTDB_NO_MEMORY(ctdb, p);
2270
2271         talloc_free(outdata.dptr);
2272         
2273         (*list) = NULL;
2274         (*count) = 0;
2275
2276         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2277                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2278                 CTDB_NO_MEMORY(ctdb, *list);
2279                 (*list)[*count] = talloc_strdup(*list, s);
2280                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2281                 (*count)++;
2282         }
2283
2284         talloc_free(p);
2285
2286         return 0;
2287 }
2288
2289
2290 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2291                         struct timeval timeout, uint32_t destnode, 
2292                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2293 {
2294         int ret;
2295         TDB_DATA outdata;
2296         int32_t res;
2297
2298         ret = ctdb_control(ctdb, destnode, 0, 
2299                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2300                            mem_ctx, &outdata, &res, &timeout, NULL);
2301         if (ret == 0 && res == -1) {
2302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2303                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2304         }
2305         if (ret != 0 || res != 0) {
2306           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2307                 return -1;
2308         }
2309
2310         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2311         talloc_free(outdata.dptr);
2312                     
2313         return 0;
2314 }
2315
2316 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2317                         struct timeval timeout, uint32_t destnode, 
2318                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2319 {
2320         int ret, i, len;
2321         TDB_DATA outdata;
2322         int32_t res;
2323         struct ctdb_all_public_ipsv4 *ipsv4;
2324
2325         ret = ctdb_control(ctdb, destnode, 0, 
2326                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2327                            mem_ctx, &outdata, &res, &timeout, NULL);
2328         if (ret != 0 || res != 0) {
2329                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2330                 return -1;
2331         }
2332
2333         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2334         len = offsetof(struct ctdb_all_public_ips, ips) +
2335                 ipsv4->num*sizeof(struct ctdb_public_ip);
2336         *ips = talloc_zero_size(mem_ctx, len);
2337         (*ips)->num = ipsv4->num;
2338         for (i=0; i<ipsv4->num; i++) {
2339                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2340                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2341         }
2342
2343         talloc_free(outdata.dptr);
2344                     
2345         return 0;
2346 }
2347
2348 /*
2349   set/clear the permanent disabled bit on a remote node
2350  */
2351 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2352                        uint32_t set, uint32_t clear)
2353 {
2354         int ret;
2355         TDB_DATA data;
2356         struct ctdb_node_map *nodemap=NULL;
2357         struct ctdb_node_flag_change c;
2358         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2359         uint32_t recmaster;
2360         uint32_t *nodes;
2361
2362
2363         /* find the recovery master */
2364         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2367                 talloc_free(tmp_ctx);
2368                 return ret;
2369         }
2370
2371
2372         /* read the node flags from the recmaster */
2373         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2374         if (ret != 0) {
2375                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2376                 talloc_free(tmp_ctx);
2377                 return -1;
2378         }
2379         if (destnode >= nodemap->num) {
2380                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2381                 talloc_free(tmp_ctx);
2382                 return -1;
2383         }
2384
2385         c.pnn       = destnode;
2386         c.old_flags = nodemap->nodes[destnode].flags;
2387         c.new_flags = c.old_flags;
2388         c.new_flags |= set;
2389         c.new_flags &= ~clear;
2390
2391         data.dsize = sizeof(c);
2392         data.dptr = (unsigned char *)&c;
2393
2394         /* send the flags update to all connected nodes */
2395         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2396
2397         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2398                                         nodes,
2399                                         timeout, false, data,
2400                                         NULL, NULL,
2401                                         NULL) != 0) {
2402                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to disable node failed\n"));
2403
2404                 talloc_free(tmp_ctx);
2405                 return -1;
2406         }
2407
2408         talloc_free(tmp_ctx);
2409         return 0;
2410 }
2411
2412
2413 /*
2414   get all tunables
2415  */
2416 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2417                                struct timeval timeout, 
2418                                uint32_t destnode,
2419                                struct ctdb_tunable *tunables)
2420 {
2421         TDB_DATA outdata;
2422         int ret;
2423         int32_t res;
2424
2425         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2426                            &outdata, &res, &timeout, NULL);
2427         if (ret != 0 || res != 0) {
2428                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2429                 return -1;
2430         }
2431
2432         if (outdata.dsize != sizeof(*tunables)) {
2433                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2434                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2435                 return -1;              
2436         }
2437
2438         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2439         talloc_free(outdata.dptr);
2440         return 0;
2441 }
2442
2443 /*
2444   add a public address to a node
2445  */
2446 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2447                       struct timeval timeout, 
2448                       uint32_t destnode,
2449                       struct ctdb_control_ip_iface *pub)
2450 {
2451         TDB_DATA data;
2452         int32_t res;
2453         int ret;
2454
2455         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2456         data.dptr  = (unsigned char *)pub;
2457
2458         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2459                            NULL, &res, &timeout, NULL);
2460         if (ret != 0 || res != 0) {
2461                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2462                 return -1;
2463         }
2464
2465         return 0;
2466 }
2467
2468 /*
2469   delete a public address from a node
2470  */
2471 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2472                       struct timeval timeout, 
2473                       uint32_t destnode,
2474                       struct ctdb_control_ip_iface *pub)
2475 {
2476         TDB_DATA data;
2477         int32_t res;
2478         int ret;
2479
2480         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2481         data.dptr  = (unsigned char *)pub;
2482
2483         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2484                            NULL, &res, &timeout, NULL);
2485         if (ret != 0 || res != 0) {
2486                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2487                 return -1;
2488         }
2489
2490         return 0;
2491 }
2492
2493 /*
2494   kill a tcp connection
2495  */
2496 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2497                       struct timeval timeout, 
2498                       uint32_t destnode,
2499                       struct ctdb_control_killtcp *killtcp)
2500 {
2501         TDB_DATA data;
2502         int32_t res;
2503         int ret;
2504
2505         data.dsize = sizeof(struct ctdb_control_killtcp);
2506         data.dptr  = (unsigned char *)killtcp;
2507
2508         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2509                            NULL, &res, &timeout, NULL);
2510         if (ret != 0 || res != 0) {
2511                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2512                 return -1;
2513         }
2514
2515         return 0;
2516 }
2517
2518 /*
2519   send a gratious arp
2520  */
2521 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2522                       struct timeval timeout, 
2523                       uint32_t destnode,
2524                       ctdb_sock_addr *addr,
2525                       const char *ifname)
2526 {
2527         TDB_DATA data;
2528         int32_t res;
2529         int ret, len;
2530         struct ctdb_control_gratious_arp *gratious_arp;
2531         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2532
2533
2534         len = strlen(ifname)+1;
2535         gratious_arp = talloc_size(tmp_ctx, 
2536                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2537         CTDB_NO_MEMORY(ctdb, gratious_arp);
2538
2539         gratious_arp->addr = *addr;
2540         gratious_arp->len = len;
2541         memcpy(&gratious_arp->iface[0], ifname, len);
2542
2543
2544         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2545         data.dptr  = (unsigned char *)gratious_arp;
2546
2547         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2548                            NULL, &res, &timeout, NULL);
2549         if (ret != 0 || res != 0) {
2550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2551                 talloc_free(tmp_ctx);
2552                 return -1;
2553         }
2554
2555         talloc_free(tmp_ctx);
2556         return 0;
2557 }
2558
2559 /*
2560   get a list of all tcp tickles that a node knows about for a particular vnn
2561  */
2562 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2563                               struct timeval timeout, uint32_t destnode, 
2564                               TALLOC_CTX *mem_ctx, 
2565                               ctdb_sock_addr *addr,
2566                               struct ctdb_control_tcp_tickle_list **list)
2567 {
2568         int ret;
2569         TDB_DATA data, outdata;
2570         int32_t status;
2571
2572         data.dptr = (uint8_t*)addr;
2573         data.dsize = sizeof(ctdb_sock_addr);
2574
2575         ret = ctdb_control(ctdb, destnode, 0, 
2576                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2577                            mem_ctx, &outdata, &status, NULL, NULL);
2578         if (ret != 0 || status != 0) {
2579                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2580                 return -1;
2581         }
2582
2583         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2584
2585         return status;
2586 }
2587
2588 /*
2589   register a server id
2590  */
2591 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2592                       struct timeval timeout, 
2593                       struct ctdb_server_id *id)
2594 {
2595         TDB_DATA data;
2596         int32_t res;
2597         int ret;
2598
2599         data.dsize = sizeof(struct ctdb_server_id);
2600         data.dptr  = (unsigned char *)id;
2601
2602         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2603                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2604                         0, data, NULL,
2605                         NULL, &res, &timeout, NULL);
2606         if (ret != 0 || res != 0) {
2607                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2608                 return -1;
2609         }
2610
2611         return 0;
2612 }
2613
2614 /*
2615   unregister a server id
2616  */
2617 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2618                       struct timeval timeout, 
2619                       struct ctdb_server_id *id)
2620 {
2621         TDB_DATA data;
2622         int32_t res;
2623         int ret;
2624
2625         data.dsize = sizeof(struct ctdb_server_id);
2626         data.dptr  = (unsigned char *)id;
2627
2628         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2629                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2630                         0, data, NULL,
2631                         NULL, &res, &timeout, NULL);
2632         if (ret != 0 || res != 0) {
2633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2634                 return -1;
2635         }
2636
2637         return 0;
2638 }
2639
2640
2641 /*
2642   check if a server id exists
2643
2644   if a server id does exist, return *status == 1, otherwise *status == 0
2645  */
2646 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2647                       struct timeval timeout, 
2648                       uint32_t destnode,
2649                       struct ctdb_server_id *id,
2650                       uint32_t *status)
2651 {
2652         TDB_DATA data;
2653         int32_t res;
2654         int ret;
2655
2656         data.dsize = sizeof(struct ctdb_server_id);
2657         data.dptr  = (unsigned char *)id;
2658
2659         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2660                         0, data, NULL,
2661                         NULL, &res, &timeout, NULL);
2662         if (ret != 0) {
2663                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2664                 return -1;
2665         }
2666
2667         if (res) {
2668                 *status = 1;
2669         } else {
2670                 *status = 0;
2671         }
2672
2673         return 0;
2674 }
2675
2676 /*
2677    get the list of server ids that are registered on a node
2678 */
2679 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2680                 TALLOC_CTX *mem_ctx,
2681                 struct timeval timeout, uint32_t destnode, 
2682                 struct ctdb_server_id_list **svid_list)
2683 {
2684         int ret;
2685         TDB_DATA outdata;
2686         int32_t res;
2687
2688         ret = ctdb_control(ctdb, destnode, 0, 
2689                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2690                            mem_ctx, &outdata, &res, &timeout, NULL);
2691         if (ret != 0 || res != 0) {
2692                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2693                 return -1;
2694         }
2695
2696         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2697                     
2698         return 0;
2699 }
2700
2701 /*
2702   initialise the ctdb daemon for client applications
2703
2704   NOTE: In current code the daemon does not fork. This is for testing purposes only
2705   and to simplify the code.
2706 */
2707 struct ctdb_context *ctdb_init(struct event_context *ev)
2708 {
2709         struct ctdb_context *ctdb;
2710
2711         ctdb = talloc_zero(ev, struct ctdb_context);
2712         ctdb->ev  = ev;
2713         ctdb->idr = idr_init(ctdb);
2714         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2715
2716         ctdb_set_socketname(ctdb, CTDB_PATH);
2717
2718         return ctdb;
2719 }
2720
2721
2722 /*
2723   set some ctdb flags
2724 */
2725 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2726 {
2727         ctdb->flags |= flags;
2728 }
2729
2730 /*
2731   setup the local socket name
2732 */
2733 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2734 {
2735         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2736         return 0;
2737 }
2738
2739 /*
2740   return the pnn of this node
2741 */
2742 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2743 {
2744         return ctdb->pnn;
2745 }
2746
2747
2748 /*
2749   get the uptime of a remote node
2750  */
2751 struct ctdb_client_control_state *
2752 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2753 {
2754         return ctdb_control_send(ctdb, destnode, 0, 
2755                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2756                            mem_ctx, &timeout, NULL);
2757 }
2758
2759 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2760 {
2761         int ret;
2762         int32_t res;
2763         TDB_DATA outdata;
2764
2765         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2766         if (ret != 0 || res != 0) {
2767                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2768                 return -1;
2769         }
2770
2771         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2772
2773         return 0;
2774 }
2775
2776 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2777 {
2778         struct ctdb_client_control_state *state;
2779
2780         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2781         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2782 }
2783
2784 /*
2785   send a control to execute the "recovered" event script on a node
2786  */
2787 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2788 {
2789         int ret;
2790         int32_t status;
2791
2792         ret = ctdb_control(ctdb, destnode, 0, 
2793                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2794                            NULL, NULL, &status, &timeout, NULL);
2795         if (ret != 0 || status != 0) {
2796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2797                 return -1;
2798         }
2799
2800         return 0;
2801 }
2802
2803 /* 
2804   callback for the async helpers used when sending the same control
2805   to multiple nodes in parallell.
2806 */
2807 static void async_callback(struct ctdb_client_control_state *state)
2808 {
2809         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2810         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2811         int ret;
2812         TDB_DATA outdata;
2813         int32_t res;
2814         uint32_t destnode = state->c->hdr.destnode;
2815
2816         /* one more node has responded with recmode data */
2817         data->count--;
2818
2819         /* if we failed to push the db, then return an error and let
2820            the main loop try again.
2821         */
2822         if (state->state != CTDB_CONTROL_DONE) {
2823                 if ( !data->dont_log_errors) {
2824                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2825                 }
2826                 data->fail_count++;
2827                 if (data->fail_callback) {
2828                         data->fail_callback(ctdb, destnode, res, outdata,
2829                                         data->callback_data);
2830                 }
2831                 return;
2832         }
2833         
2834         state->async.fn = NULL;
2835
2836         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2837         if ((ret != 0) || (res != 0)) {
2838                 if ( !data->dont_log_errors) {
2839                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2840                 }
2841                 data->fail_count++;
2842                 if (data->fail_callback) {
2843                         data->fail_callback(ctdb, destnode, res, outdata,
2844                                         data->callback_data);
2845                 }
2846         }
2847         if ((ret == 0) && (data->callback != NULL)) {
2848                 data->callback(ctdb, destnode, res, outdata,
2849                                         data->callback_data);
2850         }
2851 }
2852
2853
2854 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2855 {
2856         /* set up the callback functions */
2857         state->async.fn = async_callback;
2858         state->async.private_data = data;
2859         
2860         /* one more control to wait for to complete */
2861         data->count++;
2862 }
2863
2864
2865 /* wait for up to the maximum number of seconds allowed
2866    or until all nodes we expect a response from has replied
2867 */
2868 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2869 {
2870         while (data->count > 0) {
2871                 event_loop_once(ctdb->ev);
2872         }
2873         if (data->fail_count != 0) {
2874                 if (!data->dont_log_errors) {
2875                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2876                                  data->fail_count));
2877                 }
2878                 return -1;
2879         }
2880         return 0;
2881 }
2882
2883
2884 /* 
2885    perform a simple control on the listed nodes
2886    The control cannot return data
2887  */
2888 int ctdb_client_async_control(struct ctdb_context *ctdb,
2889                                 enum ctdb_controls opcode,
2890                                 uint32_t *nodes,
2891                                 struct timeval timeout,
2892                                 bool dont_log_errors,
2893                                 TDB_DATA data,
2894                                 client_async_callback client_callback,
2895                                 client_async_callback fail_callback,
2896                                 void *callback_data)
2897 {
2898         struct client_async_data *async_data;
2899         struct ctdb_client_control_state *state;
2900         int j, num_nodes;
2901
2902         async_data = talloc_zero(ctdb, struct client_async_data);
2903         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2904         async_data->dont_log_errors = dont_log_errors;
2905         async_data->callback = client_callback;
2906         async_data->fail_callback = fail_callback;
2907         async_data->callback_data = callback_data;
2908         async_data->opcode        = opcode;
2909
2910         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2911
2912         /* loop over all nodes and send an async control to each of them */
2913         for (j=0; j<num_nodes; j++) {
2914                 uint32_t pnn = nodes[j];
2915
2916                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2917                                           0, data, async_data, &timeout, NULL);
2918                 if (state == NULL) {
2919                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2920                         talloc_free(async_data);
2921                         return -1;
2922                 }
2923                 
2924                 ctdb_client_async_add(async_data, state);
2925         }
2926
2927         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2928                 talloc_free(async_data);
2929                 return -1;
2930         }
2931
2932         talloc_free(async_data);
2933         return 0;
2934 }
2935
2936 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2937                                 struct ctdb_vnn_map *vnn_map,
2938                                 TALLOC_CTX *mem_ctx,
2939                                 bool include_self)
2940 {
2941         int i, j, num_nodes;
2942         uint32_t *nodes;
2943
2944         for (i=num_nodes=0;i<vnn_map->size;i++) {
2945                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2946                         continue;
2947                 }
2948                 num_nodes++;
2949         } 
2950
2951         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2952         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2953
2954         for (i=j=0;i<vnn_map->size;i++) {
2955                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2956                         continue;
2957                 }
2958                 nodes[j++] = vnn_map->map[i];
2959         } 
2960
2961         return nodes;
2962 }
2963
2964 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2965                                 struct ctdb_node_map *node_map,
2966                                 TALLOC_CTX *mem_ctx,
2967                                 bool include_self)
2968 {
2969         int i, j, num_nodes;
2970         uint32_t *nodes;
2971
2972         for (i=num_nodes=0;i<node_map->num;i++) {
2973                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2974                         continue;
2975                 }
2976                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2977                         continue;
2978                 }
2979                 num_nodes++;
2980         } 
2981
2982         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2983         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2984
2985         for (i=j=0;i<node_map->num;i++) {
2986                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2987                         continue;
2988                 }
2989                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2990                         continue;
2991                 }
2992                 nodes[j++] = node_map->nodes[i].pnn;
2993         } 
2994
2995         return nodes;
2996 }
2997
2998 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2999                                 struct ctdb_node_map *node_map,
3000                                 TALLOC_CTX *mem_ctx,
3001                                 bool include_self)
3002 {
3003         int i, j, num_nodes;
3004         uint32_t *nodes;
3005
3006         for (i=num_nodes=0;i<node_map->num;i++) {
3007                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3008                         continue;
3009                 }
3010                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3011                         continue;
3012                 }
3013                 num_nodes++;
3014         } 
3015
3016         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3017         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3018
3019         for (i=j=0;i<node_map->num;i++) {
3020                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3021                         continue;
3022                 }
3023                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3024                         continue;
3025                 }
3026                 nodes[j++] = node_map->nodes[i].pnn;
3027         } 
3028
3029         return nodes;
3030 }
3031
3032 /* 
3033   this is used to test if a pnn lock exists and if it exists will return
3034   the number of connections that pnn has reported or -1 if that recovery
3035   daemon is not running.
3036 */
3037 int
3038 ctdb_read_pnn_lock(int fd, int32_t pnn)
3039 {
3040         struct flock lock;
3041         char c;
3042
3043         lock.l_type = F_WRLCK;
3044         lock.l_whence = SEEK_SET;
3045         lock.l_start = pnn;
3046         lock.l_len = 1;
3047         lock.l_pid = 0;
3048
3049         if (fcntl(fd, F_GETLK, &lock) != 0) {
3050                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3051                 return -1;
3052         }
3053
3054         if (lock.l_type == F_UNLCK) {
3055                 return -1;
3056         }
3057
3058         if (pread(fd, &c, 1, pnn) == -1) {
3059                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3060                 return -1;
3061         }
3062
3063         return c;
3064 }
3065
3066 /*
3067   get capabilities of a remote node
3068  */
3069 struct ctdb_client_control_state *
3070 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3071 {
3072         return ctdb_control_send(ctdb, destnode, 0, 
3073                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3074                            mem_ctx, &timeout, NULL);
3075 }
3076
3077 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3078 {
3079         int ret;
3080         int32_t res;
3081         TDB_DATA outdata;
3082
3083         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3084         if ( (ret != 0) || (res != 0) ) {
3085                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3086                 return -1;
3087         }
3088
3089         if (capabilities) {
3090                 *capabilities = *((uint32_t *)outdata.dptr);
3091         }
3092
3093         return 0;
3094 }
3095
3096 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3097 {
3098         struct ctdb_client_control_state *state;
3099         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3100         int ret;
3101
3102         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3103         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3104         talloc_free(tmp_ctx);
3105         return ret;
3106 }
3107
3108 struct ctdb_transaction_handle {
3109         struct ctdb_db_context *ctdb_db;
3110         bool in_replay;
3111         /* we store the reads and writes done under a transaction one
3112            list stores both reads and writes, the other just writes
3113         */
3114         struct ctdb_marshall_buffer *m_all;
3115         struct ctdb_marshall_buffer *m_write;
3116 };
3117
3118 /* start a transaction on a database */
3119 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3120 {
3121         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3122         return 0;
3123 }
3124
3125 /* start a transaction on a database */
3126 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3127 {
3128         struct ctdb_record_handle *rh;
3129         TDB_DATA key;
3130         struct ctdb_ltdb_header header;
3131         TALLOC_CTX *tmp_ctx;
3132         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3133         int ret;
3134         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3135
3136         key.dptr = discard_const(keyname);
3137         key.dsize = strlen(keyname);
3138
3139         if (!ctdb_db->persistent) {
3140                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3141                 return -1;
3142         }
3143
3144 again:
3145         tmp_ctx = talloc_new(h);
3146
3147         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3148         if (rh == NULL) {
3149                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3150                 talloc_free(tmp_ctx);
3151                 return -1;
3152         }
3153         talloc_free(rh);
3154
3155         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3156         if (ret != 0) {
3157                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3158                 talloc_free(tmp_ctx);
3159                 return -1;
3160         }
3161
3162         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
3163         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3164                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3165                 talloc_free(tmp_ctx);
3166                 goto again;
3167         }
3168
3169         talloc_free(tmp_ctx);
3170
3171         return 0;
3172 }
3173
3174
3175 /* start a transaction on a database */
3176 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3177                                                        TALLOC_CTX *mem_ctx)
3178 {
3179         struct ctdb_transaction_handle *h;
3180         int ret;
3181
3182         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3183         if (h == NULL) {
3184                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3185                 return NULL;
3186         }
3187
3188         h->ctdb_db = ctdb_db;
3189
3190         ret = ctdb_transaction_fetch_start(h);
3191         if (ret != 0) {
3192                 talloc_free(h);
3193                 return NULL;
3194         }
3195
3196         talloc_set_destructor(h, ctdb_transaction_destructor);
3197
3198         return h;
3199 }
3200
3201
3202
3203 /*
3204   fetch a record inside a transaction
3205  */
3206 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3207                            TALLOC_CTX *mem_ctx, 
3208                            TDB_DATA key, TDB_DATA *data)
3209 {
3210         struct ctdb_ltdb_header header;
3211         int ret;
3212
3213         ZERO_STRUCT(header);
3214
3215         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3216         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3217                 /* record doesn't exist yet */
3218                 *data = tdb_null;
3219                 ret = 0;
3220         }
3221         
3222         if (ret != 0) {
3223                 return ret;
3224         }
3225
3226         if (!h->in_replay) {
3227                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3228                 if (h->m_all == NULL) {
3229                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3230                         return -1;
3231                 }
3232         }
3233
3234         return 0;
3235 }
3236
3237 /*
3238   stores a record inside a transaction
3239  */
3240 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3241                            TDB_DATA key, TDB_DATA data)
3242 {
3243         TALLOC_CTX *tmp_ctx = talloc_new(h);
3244         struct ctdb_ltdb_header header;
3245         TDB_DATA olddata;
3246         int ret;
3247
3248         ZERO_STRUCT(header);
3249
3250         /* we need the header so we can update the RSN */
3251         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3252         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3253                 /* the record doesn't exist - create one with us as dmaster.
3254                    This is only safe because we are in a transaction and this
3255                    is a persistent database */
3256                 header.dmaster = h->ctdb_db->ctdb->pnn;
3257                 header.rsn = 0;
3258         } else if (ret != 0) {
3259                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3260                 talloc_free(tmp_ctx);
3261                 return ret;
3262         }
3263
3264         if (data.dsize == olddata.dsize &&
3265             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3266                 /* save writing the same data */
3267                 talloc_free(tmp_ctx);
3268                 return 0;
3269         }
3270
3271         header.rsn++;
3272
3273         if (!h->in_replay) {
3274                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3275                 if (h->m_all == NULL) {
3276                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3277                         talloc_free(tmp_ctx);
3278                         return -1;
3279                 }
3280         }               
3281
3282         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3283         if (h->m_write == NULL) {
3284                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3285                 talloc_free(tmp_ctx);
3286                 return -1;
3287         }
3288         
3289         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3290
3291         talloc_free(tmp_ctx);
3292         
3293         return ret;
3294 }
3295
3296 /*
3297   replay a transaction
3298  */
3299 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3300 {
3301         int ret, i;
3302         struct ctdb_rec_data *rec = NULL;
3303
3304         h->in_replay = true;
3305         talloc_free(h->m_write);
3306         h->m_write = NULL;
3307
3308         ret = ctdb_transaction_fetch_start(h);
3309         if (ret != 0) {
3310                 return ret;
3311         }
3312
3313         for (i=0;i<h->m_all->count;i++) {
3314                 TDB_DATA key, data;
3315
3316                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3317                 if (rec == NULL) {
3318                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3319                         goto failed;
3320                 }
3321
3322                 if (rec->reqid == 0) {
3323                         /* its a store */
3324                         if (ctdb_transaction_store(h, key, data) != 0) {
3325                                 goto failed;
3326                         }
3327                 } else {
3328                         TDB_DATA data2;
3329                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3330
3331                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3332                                 talloc_free(tmp_ctx);
3333                                 goto failed;
3334                         }
3335                         if (data2.dsize != data.dsize ||
3336                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3337                                 /* the record has changed on us - we have to give up */
3338                                 talloc_free(tmp_ctx);
3339                                 goto failed;
3340                         }
3341                         talloc_free(tmp_ctx);
3342                 }
3343         }
3344         
3345         return 0;
3346
3347 failed:
3348         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3349         return -1;
3350 }
3351
3352
3353 /*
3354   commit a transaction
3355  */
3356 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3357 {
3358         int ret, retries=0;
3359         int32_t status;
3360         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3361         struct timeval timeout;
3362         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3363
3364         talloc_set_destructor(h, NULL);
3365
3366         /* our commit strategy is quite complex.
3367
3368            - we first try to commit the changes to all other nodes
3369
3370            - if that works, then we commit locally and we are done
3371
3372            - if a commit on another node fails, then we need to cancel
3373              the transaction, then restart the transaction (thus
3374              opening a window of time for a pending recovery to
3375              complete), then replay the transaction, checking all the
3376              reads and writes (checking that reads give the same data,
3377              and writes succeed). Then we retry the transaction to the
3378              other nodes
3379         */
3380
3381 again:
3382         if (h->m_write == NULL) {
3383                 /* no changes were made */
3384                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3385                 talloc_free(h);
3386                 return 0;
3387         }
3388
3389         /* tell ctdbd to commit to the other nodes */
3390         timeout = timeval_current_ofs(1, 0);
3391         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3392                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3393                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3394                            &timeout, NULL);
3395         if (ret != 0 || status != 0) {
3396                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3397                 sleep(1);
3398
3399                 if (ret != 0) {
3400                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3401                 } else {
3402                         /* work out what error code we will give if we 
3403                            have to fail the operation */
3404                         switch ((enum ctdb_trans2_commit_error)status) {
3405                         case CTDB_TRANS2_COMMIT_SUCCESS:
3406                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3407                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3408                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3409                                 break;
3410                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3411                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3412                                 break;
3413                         }
3414                 }
3415
3416                 if (++retries == 10) {
3417                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3418                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3419                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3420                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3421                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3422                         talloc_free(h);
3423                         return -1;
3424                 }               
3425
3426                 if (ctdb_replay_transaction(h) != 0) {
3427                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3428                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3429                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3430                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3431                         talloc_free(h);
3432                         return -1;
3433                 }
3434                 goto again;
3435         } else {
3436                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3437         }
3438
3439         /* do the real commit locally */
3440         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3441         if (ret != 0) {
3442                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3443                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3444                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3445                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3446                 talloc_free(h);
3447                 return ret;
3448         }
3449
3450         /* tell ctdbd that we are finished with our local commit */
3451         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3452                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3453                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3454         talloc_free(h);
3455         return 0;
3456 }
3457
3458 /*
3459   recovery daemon ping to main daemon
3460  */
3461 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3462 {
3463         int ret;
3464         int32_t res;
3465
3466         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3467                            ctdb, NULL, &res, NULL, NULL);
3468         if (ret != 0 || res != 0) {
3469                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3470                 return -1;
3471         }
3472
3473         return 0;
3474 }
3475
3476 /* when forking the main daemon and the child process needs to connect back
3477  * to the daemon as a client process, this function can be used to change
3478  * the ctdb context from daemon into client mode
3479  */
3480 int switch_from_server_to_client(struct ctdb_context *ctdb)
3481 {
3482         int ret;
3483
3484         /* shutdown the transport */
3485         if (ctdb->methods) {
3486                 ctdb->methods->shutdown(ctdb);
3487         }
3488
3489         /* get a new event context */
3490         talloc_free(ctdb->ev);
3491         ctdb->ev = event_context_init(ctdb);
3492
3493         close(ctdb->daemon.sd);
3494         ctdb->daemon.sd = -1;
3495
3496         /* the client does not need to be realtime */
3497         if (ctdb->do_setsched) {
3498                 ctdb_restore_scheduler(ctdb);
3499         }
3500
3501         /* initialise ctdb */
3502         ret = ctdb_socket_connect(ctdb);
3503         if (ret != 0) {
3504                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3505                 return -1;
3506         }
3507
3508          return 0;
3509 }
3510
3511 /*
3512   tell the main daemon we are starting a new monitor event script
3513  */
3514 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3515 {
3516         int ret;
3517         int32_t res;
3518
3519         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3520                            ctdb, NULL, &res, NULL, NULL);
3521         if (ret != 0 || res != 0) {
3522                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3523                 return -1;
3524         }
3525
3526         return 0;
3527 }
3528
3529 /*
3530   tell the main daemon we are starting a new monitor event script
3531  */
3532 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3533 {
3534         int ret;
3535         int32_t res;
3536
3537         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3538                            ctdb, NULL, &res, NULL, NULL);
3539         if (ret != 0 || res != 0) {
3540                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3541                 return -1;
3542         }
3543
3544         return 0;
3545 }
3546
3547 /*
3548   tell the main daemon we are starting to run an eventscript
3549  */
3550 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3551 {
3552         int ret;
3553         int32_t res;
3554         TDB_DATA data;
3555
3556         data.dptr = discard_const(name);
3557         data.dsize = strlen(name)+1;
3558
3559         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3560                            ctdb, NULL, &res, NULL, NULL);
3561         if (ret != 0 || res != 0) {
3562                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3563                 return -1;
3564         }
3565
3566         return 0;
3567 }
3568
3569 /*
3570   tell the main daemon the status of the script we ran
3571  */
3572 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3573 {
3574         int ret;
3575         int32_t res;
3576         TDB_DATA data;
3577
3578         data.dptr = (uint8_t *)&result;
3579         data.dsize = sizeof(result);
3580
3581         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3582                            ctdb, NULL, &res, NULL, NULL);
3583         if (ret != 0 || res != 0) {
3584                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3585                 return -1;
3586         }
3587
3588         return 0;
3589 }
3590
3591
3592 /*
3593   get the status of running the monitor eventscripts
3594  */
3595 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3596                 struct timeval timeout, uint32_t destnode, 
3597                 TALLOC_CTX *mem_ctx,
3598                 struct ctdb_monitoring_wire **script_status)
3599 {
3600         int ret;
3601         TDB_DATA outdata;
3602         int32_t res;
3603
3604         ret = ctdb_control(ctdb, destnode, 0, 
3605                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3606                            mem_ctx, &outdata, &res, &timeout, NULL);
3607         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3609                 return -1;
3610         }
3611
3612         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3613         talloc_free(outdata.dptr);
3614                     
3615         return 0;
3616 }
3617
3618 /*
3619   tell the main daemon how long it took to lock the reclock file
3620  */
3621 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3622 {
3623         int ret;
3624         int32_t res;
3625         TDB_DATA data;
3626
3627         data.dptr = (uint8_t *)&latency;
3628         data.dsize = sizeof(latency);
3629
3630         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3631                            ctdb, NULL, &res, NULL, NULL);
3632         if (ret != 0 || res != 0) {
3633                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3634                 return -1;
3635         }
3636
3637         return 0;
3638 }