prevent valgrind errors where we print unitialised values on control errors
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         memset(hdr, 0, slength);
55         hdr->length       = length;
56         hdr->operation    = operation;
57         hdr->ctdb_magic   = CTDB_MAGIC;
58         hdr->ctdb_version = CTDB_VERSION;
59         hdr->srcnode      = ctdb->pnn;
60         if (ctdb->vnn_map) {
61                 hdr->generation = ctdb->vnn_map->generation;
62         }
63
64         return hdr;
65 }
66
67 /*
68   local version of ctdb_call
69 */
70 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
71                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
72                     TDB_DATA *data, uint32_t caller)
73 {
74         struct ctdb_call_info *c;
75         struct ctdb_registered_call *fn;
76         struct ctdb_context *ctdb = ctdb_db->ctdb;
77         
78         c = talloc(ctdb, struct ctdb_call_info);
79         CTDB_NO_MEMORY(ctdb, c);
80
81         c->key = call->key;
82         c->call_data = &call->call_data;
83         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
84         c->record_data.dsize = data->dsize;
85         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
86         c->new_data = NULL;
87         c->reply_data = NULL;
88         c->status = 0;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         if (header->laccessor != caller) {
106                 header->lacount = 0;
107         }
108         header->laccessor = caller;
109         header->lacount++;
110
111         /* we need to force the record to be written out if this was a remote access,
112            so that the lacount is updated */
113         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
114                 c->new_data = &c->record_data;
115         }
116
117         if (c->new_data) {
118                 /* XXX check that we always have the lock here? */
119                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
120                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
121                         talloc_free(c);
122                         return -1;
123                 }
124         }
125
126         if (c->reply_data) {
127                 call->reply_data = *c->reply_data;
128
129                 talloc_steal(call, call->reply_data.dptr);
130                 talloc_set_name_const(call->reply_data.dptr, __location__);
131         } else {
132                 call->reply_data.dptr = NULL;
133                 call->reply_data.dsize = 0;
134         }
135         call->status = c->status;
136
137         talloc_free(c);
138
139         return 0;
140 }
141
142
143 /*
144   queue a packet for sending from client to daemon
145 */
146 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
147 {
148         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
149 }
150
151
152 /*
153   called when a CTDB_REPLY_CALL packet comes in in the client
154
155   This packet comes in response to a CTDB_REQ_CALL request packet. It
156   contains any reply data from the call
157 */
158 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
159 {
160         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
161         struct ctdb_client_call_state *state;
162
163         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
164         if (state == NULL) {
165                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
166                 return;
167         }
168
169         if (hdr->reqid != state->reqid) {
170                 /* we found a record  but it was the wrong one */
171                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
172                 return;
173         }
174
175         state->call->reply_data.dptr = c->data;
176         state->call->reply_data.dsize = c->datalen;
177         state->call->status = c->status;
178
179         talloc_steal(state, c);
180
181         state->state = CTDB_CALL_DONE;
182
183         if (state->async.fn) {
184                 state->async.fn(state);
185         }
186 }
187
188 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
189
190 /*
191   this is called in the client, when data comes in from the daemon
192  */
193 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
194 {
195         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
196         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
197         TALLOC_CTX *tmp_ctx;
198
199         /* place the packet as a child of a tmp_ctx. We then use
200            talloc_free() below to free it. If any of the calls want
201            to keep it, then they will steal it somewhere else, and the
202            talloc_free() will be a no-op */
203         tmp_ctx = talloc_new(ctdb);
204         talloc_steal(tmp_ctx, hdr);
205
206         if (cnt == 0) {
207                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
208                 exit(0);
209         }
210
211         if (cnt < sizeof(*hdr)) {
212                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
213                 goto done;
214         }
215         if (cnt != hdr->length) {
216                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
217                                (unsigned)hdr->length, (unsigned)cnt);
218                 goto done;
219         }
220
221         if (hdr->ctdb_magic != CTDB_MAGIC) {
222                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
223                 goto done;
224         }
225
226         if (hdr->ctdb_version != CTDB_VERSION) {
227                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
228                 goto done;
229         }
230
231         switch (hdr->operation) {
232         case CTDB_REPLY_CALL:
233                 ctdb_client_reply_call(ctdb, hdr);
234                 break;
235
236         case CTDB_REQ_MESSAGE:
237                 ctdb_request_message(ctdb, hdr);
238                 break;
239
240         case CTDB_REPLY_CONTROL:
241                 ctdb_client_reply_control(ctdb, hdr);
242                 break;
243
244         default:
245                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
246         }
247
248 done:
249         talloc_free(tmp_ctx);
250 }
251
252 /*
253   connect to a unix domain socket
254 */
255 int ctdb_socket_connect(struct ctdb_context *ctdb)
256 {
257         struct sockaddr_un addr;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
264         if (ctdb->daemon.sd == -1) {
265                 return -1;
266         }
267
268         set_nonblocking(ctdb->daemon.sd);
269         set_close_on_exec(ctdb->daemon.sd);
270         
271         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
272                 close(ctdb->daemon.sd);
273                 ctdb->daemon.sd = -1;
274                 return -1;
275         }
276
277         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
278                                               CTDB_DS_ALIGNMENT, 
279                                               ctdb_client_read_cb, ctdb);
280         return 0;
281 }
282
283
284 struct ctdb_record_handle {
285         struct ctdb_db_context *ctdb_db;
286         TDB_DATA key;
287         TDB_DATA *data;
288         struct ctdb_ltdb_header header;
289 };
290
291
292 /*
293   make a recv call to the local ctdb daemon - called from client context
294
295   This is called when the program wants to wait for a ctdb_call to complete and get the 
296   results. This call will block unless the call has already completed.
297 */
298 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
299 {
300         if (state == NULL) {
301                 return -1;
302         }
303
304         while (state->state < CTDB_CALL_DONE) {
305                 event_loop_once(state->ctdb_db->ctdb->ev);
306         }
307         if (state->state != CTDB_CALL_DONE) {
308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
309                 talloc_free(state);
310                 return -1;
311         }
312
313         if (state->call->reply_data.dsize) {
314                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315                                                       state->call->reply_data.dptr,
316                                                       state->call->reply_data.dsize);
317                 call->reply_data.dsize = state->call->reply_data.dsize;
318         } else {
319                 call->reply_data.dptr = NULL;
320                 call->reply_data.dsize = 0;
321         }
322         call->status = state->call->status;
323         talloc_free(state);
324
325         return 0;
326 }
327
328
329
330
331 /*
332   destroy a ctdb_call in client
333 */
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
335 {
336         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
337         return 0;
338 }
339
340 /*
341   construct an event driven local ctdb_call
342
343   this is used so that locally processed ctdb_call requests are processed
344   in an event driven manner
345 */
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
347                                                                   struct ctdb_call *call,
348                                                                   struct ctdb_ltdb_header *header,
349                                                                   TDB_DATA *data)
350 {
351         struct ctdb_client_call_state *state;
352         struct ctdb_context *ctdb = ctdb_db->ctdb;
353         int ret;
354
355         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356         CTDB_NO_MEMORY_NULL(ctdb, state);
357         state->call = talloc_zero(state, struct ctdb_call);
358         CTDB_NO_MEMORY_NULL(ctdb, state->call);
359
360         talloc_steal(state, data->dptr);
361
362         state->state   = CTDB_CALL_DONE;
363         *(state->call) = *call;
364         state->ctdb_db = ctdb_db;
365
366         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if (ret == 0 && header.dmaster == ctdb->pnn) {
402                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
403                 talloc_free(data.dptr);
404                 ctdb_ltdb_unlock(ctdb_db, call->key);
405                 return state;
406         }
407
408         ctdb_ltdb_unlock(ctdb_db, call->key);
409         talloc_free(data.dptr);
410
411         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
412         if (state == NULL) {
413                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
414                 return NULL;
415         }
416         state->call = talloc_zero(state, struct ctdb_call);
417         if (state->call == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
419                 return NULL;
420         }
421
422         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
423         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
424         if (c == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
426                 return NULL;
427         }
428
429         state->reqid     = ctdb_reqid_new(ctdb, state);
430         state->ctdb_db = ctdb_db;
431         talloc_set_destructor(state, ctdb_client_call_destructor);
432
433         c->hdr.reqid     = state->reqid;
434         c->flags         = call->flags;
435         c->db_id         = ctdb_db->db_id;
436         c->callid        = call->call_id;
437         c->hopcount      = 0;
438         c->keylen        = call->key.dsize;
439         c->calldatalen   = call->call_data.dsize;
440         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
441         memcpy(&c->data[call->key.dsize], 
442                call->call_data.dptr, call->call_data.dsize);
443         *(state->call)              = *call;
444         state->call->call_data.dptr = &c->data[call->key.dsize];
445         state->call->key.dptr       = &c->data[0];
446
447         state->state  = CTDB_CALL_WAIT;
448
449
450         ctdb_client_queue_pkt(ctdb, &c->hdr);
451
452         return state;
453 }
454
455
456 /*
457   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
458 */
459 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
460 {
461         struct ctdb_client_call_state *state;
462
463         state = ctdb_call_send(ctdb_db, call);
464         return ctdb_call_recv(state, call);
465 }
466
467
468 /*
469   tell the daemon what messaging srvid we will use, and register the message
470   handler function in the client
471 */
472 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
473                              ctdb_message_fn_t handler,
474                              void *private_data)
475                                     
476 {
477         int res;
478         int32_t status;
479         
480         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
481                            tdb_null, NULL, NULL, &status, NULL, NULL);
482         if (res != 0 || status != 0) {
483                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
484                 return -1;
485         }
486
487         /* also need to register the handler with our own ctdb structure */
488         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
489 }
490
491 /*
492   tell the daemon we no longer want a srvid
493 */
494 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
495 {
496         int res;
497         int32_t status;
498         
499         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
500                            tdb_null, NULL, NULL, &status, NULL, NULL);
501         if (res != 0 || status != 0) {
502                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
503                 return -1;
504         }
505
506         /* also need to register the handler with our own ctdb structure */
507         ctdb_deregister_message_handler(ctdb, srvid, private_data);
508         return 0;
509 }
510
511
512 /*
513   send a message - from client context
514  */
515 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
516                       uint64_t srvid, TDB_DATA data)
517 {
518         struct ctdb_req_message *r;
519         int len, res;
520
521         len = offsetof(struct ctdb_req_message, data) + data.dsize;
522         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
523                                len, struct ctdb_req_message);
524         CTDB_NO_MEMORY(ctdb, r);
525
526         r->hdr.destnode  = pnn;
527         r->srvid         = srvid;
528         r->datalen       = data.dsize;
529         memcpy(&r->data[0], data.dptr, data.dsize);
530         
531         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
532         if (res != 0) {
533                 return res;
534         }
535
536         talloc_free(r);
537         return 0;
538 }
539
540
541 /*
542   cancel a ctdb_fetch_lock operation, releasing the lock
543  */
544 static int fetch_lock_destructor(struct ctdb_record_handle *h)
545 {
546         ctdb_ltdb_unlock(h->ctdb_db, h->key);
547         return 0;
548 }
549
550 /*
551   force the migration of a record to this node
552  */
553 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
554 {
555         struct ctdb_call call;
556         ZERO_STRUCT(call);
557         call.call_id = CTDB_NULL_FUNC;
558         call.key = key;
559         call.flags = CTDB_IMMEDIATE_MIGRATION;
560         return ctdb_call(ctdb_db, &call);
561 }
562
563 /*
564   get a lock on a record, and return the records data. Blocks until it gets the lock
565  */
566 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
567                                            TDB_DATA key, TDB_DATA *data)
568 {
569         int ret;
570         struct ctdb_record_handle *h;
571
572         /*
573           procedure is as follows:
574
575           1) get the chain lock. 
576           2) check if we are dmaster
577           3) if we are the dmaster then return handle 
578           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
579              reply from ctdbd
580           5) when we get the reply, goto (1)
581          */
582
583         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
584         if (h == NULL) {
585                 return NULL;
586         }
587
588         h->ctdb_db = ctdb_db;
589         h->key     = key;
590         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
591         if (h->key.dptr == NULL) {
592                 talloc_free(h);
593                 return NULL;
594         }
595         h->data    = data;
596
597         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
598                  (const char *)key.dptr));
599
600 again:
601         /* step 1 - get the chain lock */
602         ret = ctdb_ltdb_lock(ctdb_db, key);
603         if (ret != 0) {
604                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
605                 talloc_free(h);
606                 return NULL;
607         }
608
609         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
610
611         talloc_set_destructor(h, fetch_lock_destructor);
612
613         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
614
615         /* when torturing, ensure we test the remote path */
616         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
617             random() % 5 == 0) {
618                 h->header.dmaster = (uint32_t)-1;
619         }
620
621
622         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
623
624         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
625                 ctdb_ltdb_unlock(ctdb_db, key);
626                 ret = ctdb_client_force_migration(ctdb_db, key);
627                 if (ret != 0) {
628                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
629                         talloc_free(h);
630                         return NULL;
631                 }
632                 goto again;
633         }
634
635         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
636         return h;
637 }
638
639 /*
640   store some data to the record that was locked with ctdb_fetch_lock()
641 */
642 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
643 {
644         int ret;
645         int32_t status;
646         struct ctdb_rec_data *rec;
647         TDB_DATA recdata;
648
649         if (h->ctdb_db->persistent) {
650                 h->header.rsn++;
651         }
652
653         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
654         if (ret != 0) {
655                 return ret;
656         }
657
658         /* don't need the persistent_store control for non-persistent databases */
659         if (!h->ctdb_db->persistent) {
660                 return 0;
661         }
662
663         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
664         if (rec == NULL) {
665                 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
666                 return -1;
667         }
668
669         recdata.dptr = (uint8_t *)rec;
670         recdata.dsize = rec->length;
671
672         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
673                            CTDB_CONTROL_PERSISTENT_STORE, 0,
674                            recdata, NULL, NULL, &status, NULL, NULL);
675
676         talloc_free(rec);
677
678         if (ret != 0 || status != 0) {
679                 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
680                 return -1;
681         }
682
683         return 0;
684 }
685
686 /*
687   non-locking fetch of a record
688  */
689 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
690                TDB_DATA key, TDB_DATA *data)
691 {
692         struct ctdb_call call;
693         int ret;
694
695         call.call_id = CTDB_FETCH_FUNC;
696         call.call_data.dptr = NULL;
697         call.call_data.dsize = 0;
698
699         ret = ctdb_call(ctdb_db, &call);
700
701         if (ret == 0) {
702                 *data = call.reply_data;
703                 talloc_steal(mem_ctx, data->dptr);
704         }
705
706         return ret;
707 }
708
709
710
711 /*
712    called when a control completes or timesout to invoke the callback
713    function the user provided
714 */
715 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
716         struct timeval t, void *private_data)
717 {
718         struct ctdb_client_control_state *state;
719         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
720         int ret;
721
722         state = talloc_get_type(private_data, struct ctdb_client_control_state);
723         talloc_steal(tmp_ctx, state);
724
725         ret = ctdb_control_recv(state->ctdb, state, state,
726                         NULL, 
727                         NULL, 
728                         NULL);
729
730         talloc_free(tmp_ctx);
731 }
732
733 /*
734   called when a CTDB_REPLY_CONTROL packet comes in in the client
735
736   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
737   contains any reply data from the control
738 */
739 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
740                                       struct ctdb_req_header *hdr)
741 {
742         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
743         struct ctdb_client_control_state *state;
744
745         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
746         if (state == NULL) {
747                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
748                 return;
749         }
750
751         if (hdr->reqid != state->reqid) {
752                 /* we found a record  but it was the wrong one */
753                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
754                 return;
755         }
756
757         state->outdata.dptr = c->data;
758         state->outdata.dsize = c->datalen;
759         state->status = c->status;
760         if (c->errorlen) {
761                 state->errormsg = talloc_strndup(state, 
762                                                  (char *)&c->data[c->datalen], 
763                                                  c->errorlen);
764         }
765
766         /* state->outdata now uses resources from c so we dont want c
767            to just dissappear from under us while state is still alive
768         */
769         talloc_steal(state, c);
770
771         state->state = CTDB_CONTROL_DONE;
772
773         /* if we had a callback registered for this control, pull the response
774            and call the callback.
775         */
776         if (state->async.fn) {
777                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
778         }
779 }
780
781
782 /*
783   destroy a ctdb_control in client
784 */
785 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
786 {
787         ctdb_reqid_remove(state->ctdb, state->reqid);
788         return 0;
789 }
790
791
792 /* time out handler for ctdb_control */
793 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
794         struct timeval t, void *private_data)
795 {
796         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
797
798         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
799
800         state->state = CTDB_CONTROL_TIMEOUT;
801
802         /* if we had a callback registered for this control, pull the response
803            and call the callback.
804         */
805         if (state->async.fn) {
806                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
807         }
808 }
809
810 /* async version of send control request */
811 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
812                 uint32_t destnode, uint64_t srvid, 
813                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
814                 TALLOC_CTX *mem_ctx,
815                 struct timeval *timeout,
816                 char **errormsg)
817 {
818         struct ctdb_client_control_state *state;
819         size_t len;
820         struct ctdb_req_control *c;
821         int ret;
822
823         if (errormsg) {
824                 *errormsg = NULL;
825         }
826
827         /* if the domain socket is not yet open, open it */
828         if (ctdb->daemon.sd==-1) {
829                 ctdb_socket_connect(ctdb);
830         }
831
832         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
833         CTDB_NO_MEMORY_NULL(ctdb, state);
834
835         state->ctdb       = ctdb;
836         state->reqid      = ctdb_reqid_new(ctdb, state);
837         state->state      = CTDB_CONTROL_WAIT;
838         state->errormsg   = NULL;
839
840         talloc_set_destructor(state, ctdb_control_destructor);
841
842         len = offsetof(struct ctdb_req_control, data) + data.dsize;
843         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
844                                len, struct ctdb_req_control);
845         state->c            = c;        
846         CTDB_NO_MEMORY_NULL(ctdb, c);
847         c->hdr.reqid        = state->reqid;
848         c->hdr.destnode     = destnode;
849         c->hdr.reqid        = state->reqid;
850         c->opcode           = opcode;
851         c->client_id        = 0;
852         c->flags            = flags;
853         c->srvid            = srvid;
854         c->datalen          = data.dsize;
855         if (data.dsize) {
856                 memcpy(&c->data[0], data.dptr, data.dsize);
857         }
858
859         /* timeout */
860         if (timeout && !timeval_is_zero(timeout)) {
861                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
862         }
863
864         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
865         if (ret != 0) {
866                 talloc_free(state);
867                 return NULL;
868         }
869
870         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
871                 talloc_free(state);
872                 return NULL;
873         }
874
875         return state;
876 }
877
878
879 /* async version of receive control reply */
880 int ctdb_control_recv(struct ctdb_context *ctdb, 
881                 struct ctdb_client_control_state *state, 
882                 TALLOC_CTX *mem_ctx,
883                 TDB_DATA *outdata, int32_t *status, char **errormsg)
884 {
885         TALLOC_CTX *tmp_ctx;
886
887         if (status != NULL) {
888                 *status = -1;
889         }
890         if (errormsg != NULL) {
891                 *errormsg = NULL;
892         }
893
894         if (state == NULL) {
895                 return -1;
896         }
897
898         /* prevent double free of state */
899         tmp_ctx = talloc_new(ctdb);
900         talloc_steal(tmp_ctx, state);
901
902         /* loop one event at a time until we either timeout or the control
903            completes.
904         */
905         while (state->state == CTDB_CONTROL_WAIT) {
906                 event_loop_once(ctdb->ev);
907         }
908
909         if (state->state != CTDB_CONTROL_DONE) {
910                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
911                 if (state->async.fn) {
912                         state->async.fn(state);
913                 }
914                 talloc_free(tmp_ctx);
915                 return -1;
916         }
917
918         if (state->errormsg) {
919                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
920                 if (errormsg) {
921                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
922                 }
923                 if (state->async.fn) {
924                         state->async.fn(state);
925                 }
926                 talloc_free(tmp_ctx);
927                 return -1;
928         }
929
930         if (outdata) {
931                 *outdata = state->outdata;
932                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
933         }
934
935         if (status) {
936                 *status = state->status;
937         }
938
939         if (state->async.fn) {
940                 state->async.fn(state);
941         }
942
943         talloc_free(tmp_ctx);
944         return 0;
945 }
946
947
948
949 /*
950   send a ctdb control message
951   timeout specifies how long we should wait for a reply.
952   if timeout is NULL we wait indefinitely
953  */
954 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
955                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
956                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
957                  struct timeval *timeout,
958                  char **errormsg)
959 {
960         struct ctdb_client_control_state *state;
961
962         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
963                         flags, data, mem_ctx,
964                         timeout, errormsg);
965         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
966                         errormsg);
967 }
968
969
970
971
972 /*
973   a process exists call. Returns 0 if process exists, -1 otherwise
974  */
975 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
976 {
977         int ret;
978         TDB_DATA data;
979         int32_t status;
980
981         data.dptr = (uint8_t*)&pid;
982         data.dsize = sizeof(pid);
983
984         ret = ctdb_control(ctdb, destnode, 0, 
985                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
986                            NULL, NULL, &status, NULL, NULL);
987         if (ret != 0) {
988                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
989                 return -1;
990         }
991
992         return status;
993 }
994
995 /*
996   get remote statistics
997  */
998 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
999 {
1000         int ret;
1001         TDB_DATA data;
1002         int32_t res;
1003
1004         ret = ctdb_control(ctdb, destnode, 0, 
1005                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1006                            ctdb, &data, &res, NULL, NULL);
1007         if (ret != 0 || res != 0) {
1008                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1009                 return -1;
1010         }
1011
1012         if (data.dsize != sizeof(struct ctdb_statistics)) {
1013                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1014                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1015                       return -1;
1016         }
1017
1018         *status = *(struct ctdb_statistics *)data.dptr;
1019         talloc_free(data.dptr);
1020                         
1021         return 0;
1022 }
1023
1024 /*
1025   shutdown a remote ctdb node
1026  */
1027 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1028 {
1029         struct ctdb_client_control_state *state;
1030
1031         state = ctdb_control_send(ctdb, destnode, 0, 
1032                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1033                            NULL, &timeout, NULL);
1034         if (state == NULL) {
1035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1036                 return -1;
1037         }
1038
1039         return 0;
1040 }
1041
1042 /*
1043   get vnn map from a remote node
1044  */
1045 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1046 {
1047         int ret;
1048         TDB_DATA outdata;
1049         int32_t res;
1050         struct ctdb_vnn_map_wire *map;
1051
1052         ret = ctdb_control(ctdb, destnode, 0, 
1053                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1054                            mem_ctx, &outdata, &res, &timeout, NULL);
1055         if (ret != 0 || res != 0) {
1056                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1057                 return -1;
1058         }
1059         
1060         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1061         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1062             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1063                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1064                 return -1;
1065         }
1066
1067         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1068         CTDB_NO_MEMORY(ctdb, *vnnmap);
1069         (*vnnmap)->generation = map->generation;
1070         (*vnnmap)->size       = map->size;
1071         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1072
1073         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1074         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1075         talloc_free(outdata.dptr);
1076                     
1077         return 0;
1078 }
1079
1080
1081 /*
1082   get the recovery mode of a remote node
1083  */
1084 struct ctdb_client_control_state *
1085 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1086 {
1087         return ctdb_control_send(ctdb, destnode, 0, 
1088                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1089                            mem_ctx, &timeout, NULL);
1090 }
1091
1092 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1093 {
1094         int ret;
1095         int32_t res;
1096
1097         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1098         if (ret != 0) {
1099                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1100                 return -1;
1101         }
1102
1103         if (recmode) {
1104                 *recmode = (uint32_t)res;
1105         }
1106
1107         return 0;
1108 }
1109
1110 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1111 {
1112         struct ctdb_client_control_state *state;
1113
1114         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1115         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1116 }
1117
1118
1119
1120
1121 /*
1122   set the recovery mode of a remote node
1123  */
1124 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1125 {
1126         int ret;
1127         TDB_DATA data;
1128         int32_t res;
1129
1130         data.dsize = sizeof(uint32_t);
1131         data.dptr = (unsigned char *)&recmode;
1132
1133         ret = ctdb_control(ctdb, destnode, 0, 
1134                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1135                            NULL, NULL, &res, &timeout, NULL);
1136         if (ret != 0 || res != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1138                 return -1;
1139         }
1140
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   get the recovery master of a remote node
1148  */
1149 struct ctdb_client_control_state *
1150 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1151                         struct timeval timeout, uint32_t destnode)
1152 {
1153         return ctdb_control_send(ctdb, destnode, 0, 
1154                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1155                            mem_ctx, &timeout, NULL);
1156 }
1157
1158 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1159 {
1160         int ret;
1161         int32_t res;
1162
1163         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1164         if (ret != 0) {
1165                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1166                 return -1;
1167         }
1168
1169         if (recmaster) {
1170                 *recmaster = (uint32_t)res;
1171         }
1172
1173         return 0;
1174 }
1175
1176 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1177 {
1178         struct ctdb_client_control_state *state;
1179
1180         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1181         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1182 }
1183
1184
1185 /*
1186   set the recovery master of a remote node
1187  */
1188 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1189 {
1190         int ret;
1191         TDB_DATA data;
1192         int32_t res;
1193
1194         ZERO_STRUCT(data);
1195         data.dsize = sizeof(uint32_t);
1196         data.dptr = (unsigned char *)&recmaster;
1197
1198         ret = ctdb_control(ctdb, destnode, 0, 
1199                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1200                            NULL, NULL, &res, &timeout, NULL);
1201         if (ret != 0 || res != 0) {
1202                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1203                 return -1;
1204         }
1205
1206         return 0;
1207 }
1208
1209
1210 /*
1211   get a list of databases off a remote node
1212  */
1213 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1214                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1215 {
1216         int ret;
1217         TDB_DATA outdata;
1218         int32_t res;
1219
1220         ret = ctdb_control(ctdb, destnode, 0, 
1221                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1222                            mem_ctx, &outdata, &res, &timeout, NULL);
1223         if (ret != 0 || res != 0) {
1224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed\n"));
1225                 return -1;
1226         }
1227
1228         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1229         talloc_free(outdata.dptr);
1230                     
1231         return 0;
1232 }
1233
1234 /*
1235   get the reclock filename
1236  */
1237 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1238                        TALLOC_CTX *mem_ctx, const char **reclock)
1239 {
1240         int ret;
1241         TDB_DATA outdata;
1242         int32_t res;
1243
1244         ret = ctdb_control(ctdb, destnode, 0, 
1245                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
1246                            mem_ctx, &outdata, &res, &timeout, NULL);
1247         if (ret != 0 || res != 0) {
1248                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getreclock failed\n"));
1249                 return -1;
1250         }
1251
1252         *reclock = (const char *)talloc_steal(mem_ctx, outdata.dptr);
1253
1254         return 0;
1255 }
1256
1257 /*
1258   get a list of nodes (vnn and flags ) from a remote node
1259  */
1260 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1261                 struct timeval timeout, uint32_t destnode, 
1262                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1263 {
1264         int ret;
1265         TDB_DATA outdata;
1266         int32_t res;
1267
1268         ret = ctdb_control(ctdb, destnode, 0, 
1269                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1270                            mem_ctx, &outdata, &res, &timeout, NULL);
1271         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed\n"));
1273                 return -1;
1274         }
1275
1276         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1277         talloc_free(outdata.dptr);
1278                     
1279         return 0;
1280 }
1281
1282 /*
1283   drop the transport, reload the nodes file and restart the transport
1284  */
1285 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1286                     struct timeval timeout, uint32_t destnode)
1287 {
1288         int ret;
1289         int32_t res;
1290
1291         ret = ctdb_control(ctdb, destnode, 0, 
1292                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1293                            NULL, NULL, &res, &timeout, NULL);
1294         if (ret != 0 || res != 0) {
1295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1296                 return -1;
1297         }
1298
1299         return 0;
1300 }
1301
1302
1303 /*
1304   set vnn map on a node
1305  */
1306 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1307                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1308 {
1309         int ret;
1310         TDB_DATA data;
1311         int32_t res;
1312         struct ctdb_vnn_map_wire *map;
1313         size_t len;
1314
1315         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1316         map = talloc_size(mem_ctx, len);
1317         CTDB_NO_MEMORY(ctdb, map);
1318
1319         map->generation = vnnmap->generation;
1320         map->size = vnnmap->size;
1321         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1322         
1323         data.dsize = len;
1324         data.dptr  = (uint8_t *)map;
1325
1326         ret = ctdb_control(ctdb, destnode, 0, 
1327                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1328                            NULL, NULL, &res, &timeout, NULL);
1329         if (ret != 0 || res != 0) {
1330                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1331                 return -1;
1332         }
1333
1334         talloc_free(map);
1335
1336         return 0;
1337 }
1338
1339
1340 /*
1341   async send for pull database
1342  */
1343 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1344         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1345         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1346 {
1347         TDB_DATA indata;
1348         struct ctdb_control_pulldb *pull;
1349         struct ctdb_client_control_state *state;
1350
1351         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1352         CTDB_NO_MEMORY_NULL(ctdb, pull);
1353
1354         pull->db_id   = dbid;
1355         pull->lmaster = lmaster;
1356
1357         indata.dsize = sizeof(struct ctdb_control_pulldb);
1358         indata.dptr  = (unsigned char *)pull;
1359
1360         state = ctdb_control_send(ctdb, destnode, 0, 
1361                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1362                                   mem_ctx, &timeout, NULL);
1363         talloc_free(pull);
1364
1365         return state;
1366 }
1367
1368 /*
1369   async recv for pull database
1370  */
1371 int ctdb_ctrl_pulldb_recv(
1372         struct ctdb_context *ctdb, 
1373         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1374         TDB_DATA *outdata)
1375 {
1376         int ret;
1377         int32_t res;
1378
1379         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1380         if ( (ret != 0) || (res != 0) ){
1381                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1382                 return -1;
1383         }
1384
1385         return 0;
1386 }
1387
1388 /*
1389   pull all keys and records for a specific database on a node
1390  */
1391 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1392                 uint32_t dbid, uint32_t lmaster, 
1393                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1394                 TDB_DATA *outdata)
1395 {
1396         struct ctdb_client_control_state *state;
1397
1398         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1399                                       timeout);
1400         
1401         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1402 }
1403
1404
1405 /*
1406   change dmaster for all keys in the database to the new value
1407  */
1408 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1409                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1410 {
1411         int ret;
1412         TDB_DATA indata;
1413         int32_t res;
1414
1415         indata.dsize = 2*sizeof(uint32_t);
1416         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1417
1418         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1419         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1420
1421         ret = ctdb_control(ctdb, destnode, 0, 
1422                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1423                            NULL, NULL, &res, &timeout, NULL);
1424         if (ret != 0 || res != 0) {
1425                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1426                 return -1;
1427         }
1428
1429         return 0;
1430 }
1431
1432 /*
1433   ping a node, return number of clients connected
1434  */
1435 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1436 {
1437         int ret;
1438         int32_t res;
1439
1440         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1441                            tdb_null, NULL, NULL, &res, NULL, NULL);
1442         if (ret != 0) {
1443                 return -1;
1444         }
1445         return res;
1446 }
1447
1448 /*
1449   find the real path to a ltdb 
1450  */
1451 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1452                    const char **path)
1453 {
1454         int ret;
1455         int32_t res;
1456         TDB_DATA data;
1457
1458         data.dptr = (uint8_t *)&dbid;
1459         data.dsize = sizeof(dbid);
1460
1461         ret = ctdb_control(ctdb, destnode, 0, 
1462                            CTDB_CONTROL_GETDBPATH, 0, data, 
1463                            mem_ctx, &data, &res, &timeout, NULL);
1464         if (ret != 0 || res != 0) {
1465                 return -1;
1466         }
1467
1468         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1469         if ((*path) == NULL) {
1470                 return -1;
1471         }
1472
1473         talloc_free(data.dptr);
1474
1475         return 0;
1476 }
1477
1478 /*
1479   find the name of a db 
1480  */
1481 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1482                    const char **name)
1483 {
1484         int ret;
1485         int32_t res;
1486         TDB_DATA data;
1487
1488         data.dptr = (uint8_t *)&dbid;
1489         data.dsize = sizeof(dbid);
1490
1491         ret = ctdb_control(ctdb, destnode, 0, 
1492                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1493                            mem_ctx, &data, &res, &timeout, NULL);
1494         if (ret != 0 || res != 0) {
1495                 return -1;
1496         }
1497
1498         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1499         if ((*name) == NULL) {
1500                 return -1;
1501         }
1502
1503         talloc_free(data.dptr);
1504
1505         return 0;
1506 }
1507
1508 /*
1509   create a database
1510  */
1511 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1512                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1513 {
1514         int ret;
1515         int32_t res;
1516         TDB_DATA data;
1517
1518         data.dptr = discard_const(name);
1519         data.dsize = strlen(name)+1;
1520
1521         ret = ctdb_control(ctdb, destnode, 0, 
1522                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1523                            0, data, 
1524                            mem_ctx, &data, &res, &timeout, NULL);
1525
1526         if (ret != 0 || res != 0) {
1527                 return -1;
1528         }
1529
1530         return 0;
1531 }
1532
1533 /*
1534   get debug level on a node
1535  */
1536 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1537 {
1538         int ret;
1539         int32_t res;
1540         TDB_DATA data;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1543                            ctdb, &data, &res, NULL, NULL);
1544         if (ret != 0 || res != 0) {
1545                 return -1;
1546         }
1547         if (data.dsize != sizeof(int32_t)) {
1548                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1549                          (unsigned)data.dsize));
1550                 return -1;
1551         }
1552         *level = *(int32_t *)data.dptr;
1553         talloc_free(data.dptr);
1554         return 0;
1555 }
1556
1557 /*
1558   set debug level on a node
1559  */
1560 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1561 {
1562         int ret;
1563         int32_t res;
1564         TDB_DATA data;
1565
1566         data.dptr = (uint8_t *)&level;
1567         data.dsize = sizeof(level);
1568
1569         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1570                            NULL, NULL, &res, NULL, NULL);
1571         if (ret != 0 || res != 0) {
1572                 return -1;
1573         }
1574         return 0;
1575 }
1576
1577
1578 /*
1579   get a list of connected nodes
1580  */
1581 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1582                                 struct timeval timeout,
1583                                 TALLOC_CTX *mem_ctx,
1584                                 uint32_t *num_nodes)
1585 {
1586         struct ctdb_node_map *map=NULL;
1587         int ret, i;
1588         uint32_t *nodes;
1589
1590         *num_nodes = 0;
1591
1592         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1593         if (ret != 0) {
1594                 return NULL;
1595         }
1596
1597         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1598         if (nodes == NULL) {
1599                 return NULL;
1600         }
1601
1602         for (i=0;i<map->num;i++) {
1603                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1604                         nodes[*num_nodes] = map->nodes[i].pnn;
1605                         (*num_nodes)++;
1606                 }
1607         }
1608
1609         return nodes;
1610 }
1611
1612
1613 /*
1614   reset remote status
1615  */
1616 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1617 {
1618         int ret;
1619         int32_t res;
1620
1621         ret = ctdb_control(ctdb, destnode, 0, 
1622                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1623                            NULL, NULL, &res, NULL, NULL);
1624         if (ret != 0 || res != 0) {
1625                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1626                 return -1;
1627         }
1628         return 0;
1629 }
1630
1631 /*
1632   this is the dummy null procedure that all databases support
1633 */
1634 static int ctdb_null_func(struct ctdb_call_info *call)
1635 {
1636         return 0;
1637 }
1638
1639 /*
1640   this is a plain fetch procedure that all databases support
1641 */
1642 static int ctdb_fetch_func(struct ctdb_call_info *call)
1643 {
1644         call->reply_data = &call->record_data;
1645         return 0;
1646 }
1647
1648 /*
1649   attach to a specific database - client call
1650 */
1651 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1652 {
1653         struct ctdb_db_context *ctdb_db;
1654         TDB_DATA data;
1655         int ret;
1656         int32_t res;
1657
1658         ctdb_db = ctdb_db_handle(ctdb, name);
1659         if (ctdb_db) {
1660                 return ctdb_db;
1661         }
1662
1663         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1664         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1665
1666         ctdb_db->ctdb = ctdb;
1667         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1668         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1669
1670         data.dptr = discard_const(name);
1671         data.dsize = strlen(name)+1;
1672
1673         /* tell ctdb daemon to attach */
1674         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1675                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1676                            0, data, ctdb_db, &data, &res, NULL, NULL);
1677         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1678                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1679                 talloc_free(ctdb_db);
1680                 return NULL;
1681         }
1682         
1683         ctdb_db->db_id = *(uint32_t *)data.dptr;
1684         talloc_free(data.dptr);
1685
1686         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1687         if (ret != 0) {
1688                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1689                 talloc_free(ctdb_db);
1690                 return NULL;
1691         }
1692
1693         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, persistent?TDB_DEFAULT:TDB_NOSYNC, O_RDWR, 0);
1694         if (ctdb_db->ltdb == NULL) {
1695                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1696                 talloc_free(ctdb_db);
1697                 return NULL;
1698         }
1699
1700         ctdb_db->persistent = persistent;
1701
1702         DLIST_ADD(ctdb->db_list, ctdb_db);
1703
1704         /* add well known functions */
1705         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1706         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1707
1708         return ctdb_db;
1709 }
1710
1711
1712 /*
1713   setup a call for a database
1714  */
1715 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1716 {
1717         struct ctdb_registered_call *call;
1718
1719 #if 0
1720         TDB_DATA data;
1721         int32_t status;
1722         struct ctdb_control_set_call c;
1723         int ret;
1724
1725         /* this is no longer valid with the separate daemon architecture */
1726         c.db_id = ctdb_db->db_id;
1727         c.fn    = fn;
1728         c.id    = id;
1729
1730         data.dptr = (uint8_t *)&c;
1731         data.dsize = sizeof(c);
1732
1733         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1734                            data, NULL, NULL, &status, NULL, NULL);
1735         if (ret != 0 || status != 0) {
1736                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1737                 return -1;
1738         }
1739 #endif
1740
1741         /* also register locally */
1742         call = talloc(ctdb_db, struct ctdb_registered_call);
1743         call->fn = fn;
1744         call->id = id;
1745
1746         DLIST_ADD(ctdb_db->calls, call);        
1747         return 0;
1748 }
1749
1750
1751 struct traverse_state {
1752         bool done;
1753         uint32_t count;
1754         ctdb_traverse_func fn;
1755         void *private_data;
1756 };
1757
1758 /*
1759   called on each key during a ctdb_traverse
1760  */
1761 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1762 {
1763         struct traverse_state *state = (struct traverse_state *)p;
1764         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1765         TDB_DATA key;
1766
1767         if (data.dsize < sizeof(uint32_t) ||
1768             d->length != data.dsize) {
1769                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1770                 state->done = True;
1771                 return;
1772         }
1773
1774         key.dsize = d->keylen;
1775         key.dptr  = &d->data[0];
1776         data.dsize = d->datalen;
1777         data.dptr = &d->data[d->keylen];
1778
1779         if (key.dsize == 0 && data.dsize == 0) {
1780                 /* end of traverse */
1781                 state->done = True;
1782                 return;
1783         }
1784
1785         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1786                 state->done = True;
1787         }
1788
1789         state->count++;
1790 }
1791
1792
1793 /*
1794   start a cluster wide traverse, calling the supplied fn on each record
1795   return the number of records traversed, or -1 on error
1796  */
1797 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1798 {
1799         TDB_DATA data;
1800         struct ctdb_traverse_start t;
1801         int32_t status;
1802         int ret;
1803         uint64_t srvid = (getpid() | 0xFLL<<60);
1804         struct traverse_state state;
1805
1806         state.done = False;
1807         state.count = 0;
1808         state.private_data = private_data;
1809         state.fn = fn;
1810
1811         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1812         if (ret != 0) {
1813                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1814                 return -1;
1815         }
1816
1817         t.db_id = ctdb_db->db_id;
1818         t.srvid = srvid;
1819         t.reqid = 0;
1820
1821         data.dptr = (uint8_t *)&t;
1822         data.dsize = sizeof(t);
1823
1824         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1825                            data, NULL, NULL, &status, NULL, NULL);
1826         if (ret != 0 || status != 0) {
1827                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1828                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1829                 return -1;
1830         }
1831
1832         while (!state.done) {
1833                 event_loop_once(ctdb_db->ctdb->ev);
1834         }
1835
1836         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1837         if (ret != 0) {
1838                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1839                 return -1;
1840         }
1841
1842         return state.count;
1843 }
1844
1845 /*
1846   called on each key during a catdb
1847  */
1848 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1849 {
1850         int i;
1851         FILE *f = (FILE *)p;
1852         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1853
1854         fprintf(f, "dmaster: %u\n", h->dmaster);
1855         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1856
1857         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1858         for (i=0;i<key.dsize;i++) {
1859                 if (isascii(key.dptr[i])) {
1860                         fprintf(f, "%c", key.dptr[i]);
1861                 } else {
1862                         fprintf(f, "\\%02X", key.dptr[i]);
1863                 }
1864         }
1865         fprintf(f, "\"\n");
1866
1867         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1868         for (i=sizeof(*h);i<data.dsize;i++) {
1869                 if (isascii(data.dptr[i])) {
1870                         fprintf(f, "%c", data.dptr[i]);
1871                 } else {
1872                         fprintf(f, "\\%02X", data.dptr[i]);
1873                 }
1874         }
1875         fprintf(f, "\"\n");
1876
1877         return 0;
1878 }
1879
1880 /*
1881   convenience function to list all keys to stdout
1882  */
1883 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1884 {
1885         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1886 }
1887
1888 /*
1889   get the pid of a ctdb daemon
1890  */
1891 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1892 {
1893         int ret;
1894         int32_t res;
1895
1896         ret = ctdb_control(ctdb, destnode, 0, 
1897                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1898                            NULL, NULL, &res, &timeout, NULL);
1899         if (ret != 0) {
1900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1901                 return -1;
1902         }
1903
1904         *pid = res;
1905
1906         return 0;
1907 }
1908
1909
1910 /*
1911   async freeze send control
1912  */
1913 struct ctdb_client_control_state *
1914 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1915 {
1916         return ctdb_control_send(ctdb, destnode, 0, 
1917                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1918                            mem_ctx, &timeout, NULL);
1919 }
1920
1921 /* 
1922    async freeze recv control
1923 */
1924 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1925 {
1926         int ret;
1927         int32_t res;
1928
1929         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1930         if ( (ret != 0) || (res != 0) ){
1931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1932                 return -1;
1933         }
1934
1935         return 0;
1936 }
1937
1938 /*
1939   freeze a node
1940  */
1941 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1942 {
1943         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1944         struct ctdb_client_control_state *state;
1945         int ret;
1946
1947         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1948         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1949         talloc_free(tmp_ctx);
1950
1951         return ret;
1952 }
1953
1954 /*
1955   thaw a node
1956  */
1957 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1958 {
1959         int ret;
1960         int32_t res;
1961
1962         ret = ctdb_control(ctdb, destnode, 0, 
1963                            CTDB_CONTROL_THAW, 0, tdb_null, 
1964                            NULL, NULL, &res, &timeout, NULL);
1965         if (ret != 0 || res != 0) {
1966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1967                 return -1;
1968         }
1969
1970         return 0;
1971 }
1972
1973 /*
1974   get pnn of a node, or -1
1975  */
1976 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1977 {
1978         int ret;
1979         int32_t res;
1980
1981         ret = ctdb_control(ctdb, destnode, 0, 
1982                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1983                            NULL, NULL, &res, &timeout, NULL);
1984         if (ret != 0) {
1985                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1986                 return -1;
1987         }
1988
1989         return res;
1990 }
1991
1992 /*
1993   get the monitoring mode of a remote node
1994  */
1995 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1996 {
1997         int ret;
1998         int32_t res;
1999
2000         ret = ctdb_control(ctdb, destnode, 0, 
2001                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2002                            NULL, NULL, &res, &timeout, NULL);
2003         if (ret != 0) {
2004                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2005                 return -1;
2006         }
2007
2008         *monmode = res;
2009
2010         return 0;
2011 }
2012
2013
2014 /*
2015  set the monitoring mode of a remote node to active
2016  */
2017 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2018 {
2019         int ret;
2020         
2021
2022         ret = ctdb_control(ctdb, destnode, 0, 
2023                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2024                            NULL, NULL,NULL, &timeout, NULL);
2025         if (ret != 0) {
2026                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2027                 return -1;
2028         }
2029
2030         
2031
2032         return 0;
2033 }
2034
2035 /*
2036   set the monitoring mode of a remote node to disable
2037  */
2038 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2039 {
2040         int ret;
2041         
2042
2043         ret = ctdb_control(ctdb, destnode, 0, 
2044                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2045                            NULL, NULL, NULL, &timeout, NULL);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2048                 return -1;
2049         }
2050
2051         
2052
2053         return 0;
2054 }
2055
2056
2057
2058 /* 
2059   sent to a node to make it take over an ip address
2060 */
2061 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2062                           uint32_t destnode, struct ctdb_public_ip *ip)
2063 {
2064         TDB_DATA data;
2065         int ret;
2066         int32_t res;
2067
2068         data.dsize = sizeof(*ip);
2069         data.dptr  = (uint8_t *)ip;
2070
2071         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2072                            NULL, &res, &timeout, NULL);
2073
2074         if (ret != 0 || res != 0) {
2075                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2076                 return -1;
2077         }
2078
2079         return 0;       
2080 }
2081
2082
2083 /* 
2084   sent to a node to make it release an ip address
2085 */
2086 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2087                          uint32_t destnode, struct ctdb_public_ip *ip)
2088 {
2089         TDB_DATA data;
2090         int ret;
2091         int32_t res;
2092
2093         data.dsize = sizeof(*ip);
2094         data.dptr  = (uint8_t *)ip;
2095
2096         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2097                            NULL, &res, &timeout, NULL);
2098
2099         if (ret != 0 || res != 0) {
2100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2101                 return -1;
2102         }
2103
2104         return 0;       
2105 }
2106
2107
2108 /*
2109   get a tunable
2110  */
2111 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2112                           struct timeval timeout, 
2113                           uint32_t destnode,
2114                           const char *name, uint32_t *value)
2115 {
2116         struct ctdb_control_get_tunable *t;
2117         TDB_DATA data, outdata;
2118         int32_t res;
2119         int ret;
2120
2121         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2122         data.dptr  = talloc_size(ctdb, data.dsize);
2123         CTDB_NO_MEMORY(ctdb, data.dptr);
2124
2125         t = (struct ctdb_control_get_tunable *)data.dptr;
2126         t->length = strlen(name)+1;
2127         memcpy(t->name, name, t->length);
2128
2129         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2130                            &outdata, &res, &timeout, NULL);
2131         talloc_free(data.dptr);
2132         if (ret != 0 || res != 0) {
2133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2134                 return -1;
2135         }
2136
2137         if (outdata.dsize != sizeof(uint32_t)) {
2138                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2139                 talloc_free(outdata.dptr);
2140                 return -1;
2141         }
2142         
2143         *value = *(uint32_t *)outdata.dptr;
2144         talloc_free(outdata.dptr);
2145
2146         return 0;
2147 }
2148
2149 /*
2150   set a tunable
2151  */
2152 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2153                           struct timeval timeout, 
2154                           uint32_t destnode,
2155                           const char *name, uint32_t value)
2156 {
2157         struct ctdb_control_set_tunable *t;
2158         TDB_DATA data;
2159         int32_t res;
2160         int ret;
2161
2162         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2163         data.dptr  = talloc_size(ctdb, data.dsize);
2164         CTDB_NO_MEMORY(ctdb, data.dptr);
2165
2166         t = (struct ctdb_control_set_tunable *)data.dptr;
2167         t->length = strlen(name)+1;
2168         memcpy(t->name, name, t->length);
2169         t->value = value;
2170
2171         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2172                            NULL, &res, &timeout, NULL);
2173         talloc_free(data.dptr);
2174         if (ret != 0 || res != 0) {
2175                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2176                 return -1;
2177         }
2178
2179         return 0;
2180 }
2181
2182 /*
2183   list tunables
2184  */
2185 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2186                             struct timeval timeout, 
2187                             uint32_t destnode,
2188                             TALLOC_CTX *mem_ctx,
2189                             const char ***list, uint32_t *count)
2190 {
2191         TDB_DATA outdata;
2192         int32_t res;
2193         int ret;
2194         struct ctdb_control_list_tunable *t;
2195         char *p, *s, *ptr;
2196
2197         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2198                            mem_ctx, &outdata, &res, &timeout, NULL);
2199         if (ret != 0 || res != 0) {
2200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2201                 return -1;
2202         }
2203
2204         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2205         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2206             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2207                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2208                 talloc_free(outdata.dptr);
2209                 return -1;              
2210         }
2211         
2212         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2213         CTDB_NO_MEMORY(ctdb, p);
2214
2215         talloc_free(outdata.dptr);
2216         
2217         (*list) = NULL;
2218         (*count) = 0;
2219
2220         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2221                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2222                 CTDB_NO_MEMORY(ctdb, *list);
2223                 (*list)[*count] = talloc_strdup(*list, s);
2224                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2225                 (*count)++;
2226         }
2227
2228         talloc_free(p);
2229
2230         return 0;
2231 }
2232
2233
2234 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2235                         struct timeval timeout, uint32_t destnode, 
2236                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2237 {
2238         int ret;
2239         TDB_DATA outdata;
2240         int32_t res;
2241
2242         ret = ctdb_control(ctdb, destnode, 0, 
2243                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2244                            mem_ctx, &outdata, &res, &timeout, NULL);
2245         if (ret != 0 || res != 0) {
2246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2247                 return -1;
2248         }
2249
2250         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2251         talloc_free(outdata.dptr);
2252                     
2253         return 0;
2254 }
2255
2256 /*
2257   set/clear the permanent disabled bit on a remote node
2258  */
2259 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2260                        uint32_t set, uint32_t clear)
2261 {
2262         int ret;
2263         TDB_DATA data;
2264         struct ctdb_node_modflags m;
2265         int32_t res;
2266
2267         m.set = set;
2268         m.clear = clear;
2269
2270         data.dsize = sizeof(m);
2271         data.dptr = (unsigned char *)&m;
2272
2273         ret = ctdb_control(ctdb, destnode, 0, 
2274                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2275                            NULL, NULL, &res, &timeout, NULL);
2276         if (ret != 0 || res != 0) {
2277                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for modflags failed\n"));
2278                 return -1;
2279         }
2280
2281         return 0;
2282 }
2283
2284
2285 /*
2286   get all tunables
2287  */
2288 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2289                                struct timeval timeout, 
2290                                uint32_t destnode,
2291                                struct ctdb_tunable *tunables)
2292 {
2293         TDB_DATA outdata;
2294         int ret;
2295         int32_t res;
2296
2297         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2298                            &outdata, &res, &timeout, NULL);
2299         if (ret != 0 || res != 0) {
2300                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2301                 return -1;
2302         }
2303
2304         if (outdata.dsize != sizeof(*tunables)) {
2305                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2306                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2307                 return -1;              
2308         }
2309
2310         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2311         talloc_free(outdata.dptr);
2312         return 0;
2313 }
2314
2315 /*
2316   add a public address to a node
2317  */
2318 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2319                       struct timeval timeout, 
2320                       uint32_t destnode,
2321                       struct ctdb_control_ip_iface *pub)
2322 {
2323         TDB_DATA data;
2324         int32_t res;
2325         int ret;
2326
2327         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2328         data.dptr  = (unsigned char *)pub;
2329
2330         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2331                            NULL, &res, &timeout, NULL);
2332         if (ret != 0 || res != 0) {
2333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2334                 return -1;
2335         }
2336
2337         return 0;
2338 }
2339
2340 /*
2341   delete a public address from a node
2342  */
2343 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2344                       struct timeval timeout, 
2345                       uint32_t destnode,
2346                       struct ctdb_control_ip_iface *pub)
2347 {
2348         TDB_DATA data;
2349         int32_t res;
2350         int ret;
2351
2352         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2353         data.dptr  = (unsigned char *)pub;
2354
2355         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2356                            NULL, &res, &timeout, NULL);
2357         if (ret != 0 || res != 0) {
2358                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2359                 return -1;
2360         }
2361
2362         return 0;
2363 }
2364
2365 /*
2366   kill a tcp connection
2367  */
2368 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2369                       struct timeval timeout, 
2370                       uint32_t destnode,
2371                       struct ctdb_control_killtcp *killtcp)
2372 {
2373         TDB_DATA data;
2374         int32_t res;
2375         int ret;
2376
2377         data.dsize = sizeof(struct ctdb_control_killtcp);
2378         data.dptr  = (unsigned char *)killtcp;
2379
2380         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2381                            NULL, &res, &timeout, NULL);
2382         if (ret != 0 || res != 0) {
2383                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2384                 return -1;
2385         }
2386
2387         return 0;
2388 }
2389
2390 /*
2391   send a gratious arp
2392  */
2393 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2394                       struct timeval timeout, 
2395                       uint32_t destnode,
2396                       ctdb_sock_addr *addr,
2397                       const char *ifname)
2398 {
2399         TDB_DATA data;
2400         int32_t res;
2401         int ret, len;
2402         struct ctdb_control_gratious_arp *gratious_arp;
2403         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2404
2405
2406         len = strlen(ifname)+1;
2407         gratious_arp = talloc_size(tmp_ctx, 
2408                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2409         CTDB_NO_MEMORY(ctdb, gratious_arp);
2410
2411         gratious_arp->addr = *addr;
2412         gratious_arp->len = len;
2413         memcpy(&gratious_arp->iface[0], ifname, len);
2414
2415
2416         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2417         data.dptr  = (unsigned char *)gratious_arp;
2418
2419         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2420                            NULL, &res, &timeout, NULL);
2421         if (ret != 0 || res != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2423                 talloc_free(tmp_ctx);
2424                 return -1;
2425         }
2426
2427         talloc_free(tmp_ctx);
2428         return 0;
2429 }
2430
2431 /*
2432   get a list of all tcp tickles that a node knows about for a particular vnn
2433  */
2434 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2435                               struct timeval timeout, uint32_t destnode, 
2436                               TALLOC_CTX *mem_ctx, 
2437                               struct sockaddr_in *ip,
2438                               struct ctdb_control_tcp_tickle_list **list)
2439 {
2440         int ret;
2441         TDB_DATA data, outdata;
2442         int32_t status;
2443
2444         data.dptr = (uint8_t*)ip;
2445         data.dsize = sizeof(struct sockaddr_in);
2446
2447         ret = ctdb_control(ctdb, destnode, 0, 
2448                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2449                            mem_ctx, &outdata, &status, NULL, NULL);
2450         if (ret != 0) {
2451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2452                 return -1;
2453         }
2454
2455         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2456
2457         return status;
2458 }
2459
2460 /*
2461   register a server id
2462  */
2463 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2464                       struct timeval timeout, 
2465                       struct ctdb_server_id *id)
2466 {
2467         TDB_DATA data;
2468         int32_t res;
2469         int ret;
2470
2471         data.dsize = sizeof(struct ctdb_server_id);
2472         data.dptr  = (unsigned char *)id;
2473
2474         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2475                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2476                         0, data, NULL,
2477                         NULL, &res, &timeout, NULL);
2478         if (ret != 0 || res != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2480                 return -1;
2481         }
2482
2483         return 0;
2484 }
2485
2486 /*
2487   unregister a server id
2488  */
2489 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2490                       struct timeval timeout, 
2491                       struct ctdb_server_id *id)
2492 {
2493         TDB_DATA data;
2494         int32_t res;
2495         int ret;
2496
2497         data.dsize = sizeof(struct ctdb_server_id);
2498         data.dptr  = (unsigned char *)id;
2499
2500         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2501                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2502                         0, data, NULL,
2503                         NULL, &res, &timeout, NULL);
2504         if (ret != 0 || res != 0) {
2505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2506                 return -1;
2507         }
2508
2509         return 0;
2510 }
2511
2512
2513 /*
2514   check if a server id exists
2515
2516   if a server id does exist, return *status == 1, otherwise *status == 0
2517  */
2518 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2519                       struct timeval timeout, 
2520                       uint32_t destnode,
2521                       struct ctdb_server_id *id,
2522                       uint32_t *status)
2523 {
2524         TDB_DATA data;
2525         int32_t res;
2526         int ret;
2527
2528         data.dsize = sizeof(struct ctdb_server_id);
2529         data.dptr  = (unsigned char *)id;
2530
2531         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2532                         0, data, NULL,
2533                         NULL, &res, &timeout, NULL);
2534         if (ret != 0) {
2535                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2536                 return -1;
2537         }
2538
2539         if (res) {
2540                 *status = 1;
2541         } else {
2542                 *status = 0;
2543         }
2544
2545         return 0;
2546 }
2547
2548 /*
2549    get the list of server ids that are registered on a node
2550 */
2551 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2552                 TALLOC_CTX *mem_ctx,
2553                 struct timeval timeout, uint32_t destnode, 
2554                 struct ctdb_server_id_list **svid_list)
2555 {
2556         int ret;
2557         TDB_DATA outdata;
2558         int32_t res;
2559
2560         ret = ctdb_control(ctdb, destnode, 0, 
2561                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2562                            mem_ctx, &outdata, &res, &timeout, NULL);
2563         if (ret != 0 || res != 0) {
2564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2565                 return -1;
2566         }
2567
2568         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2569                     
2570         return 0;
2571 }
2572
2573 /*
2574   initialise the ctdb daemon for client applications
2575
2576   NOTE: In current code the daemon does not fork. This is for testing purposes only
2577   and to simplify the code.
2578 */
2579 struct ctdb_context *ctdb_init(struct event_context *ev)
2580 {
2581         struct ctdb_context *ctdb;
2582
2583         ctdb = talloc_zero(ev, struct ctdb_context);
2584         ctdb->ev  = ev;
2585         ctdb->idr = idr_init(ctdb);
2586         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2587
2588         ctdb_set_socketname(ctdb, CTDB_PATH);
2589
2590         return ctdb;
2591 }
2592
2593
2594 /*
2595   set some ctdb flags
2596 */
2597 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2598 {
2599         ctdb->flags |= flags;
2600 }
2601
2602 /*
2603   setup the local socket name
2604 */
2605 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2606 {
2607         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2608         return 0;
2609 }
2610
2611 /*
2612   return the pnn of this node
2613 */
2614 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2615 {
2616         return ctdb->pnn;
2617 }
2618
2619
2620 /*
2621   get the uptime of a remote node
2622  */
2623 struct ctdb_client_control_state *
2624 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2625 {
2626         return ctdb_control_send(ctdb, destnode, 0, 
2627                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2628                            mem_ctx, &timeout, NULL);
2629 }
2630
2631 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2632 {
2633         int ret;
2634         int32_t res;
2635         TDB_DATA outdata;
2636
2637         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2638         if (ret != 0 || res != 0) {
2639                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2640                 return -1;
2641         }
2642
2643         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2644
2645         return 0;
2646 }
2647
2648 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2649 {
2650         struct ctdb_client_control_state *state;
2651
2652         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2653         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2654 }
2655
2656 /*
2657   send a control to execute the "recovered" event script on a node
2658  */
2659 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2660 {
2661         int ret;
2662         int32_t status;
2663
2664         ret = ctdb_control(ctdb, destnode, 0, 
2665                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2666                            NULL, NULL, &status, &timeout, NULL);
2667         if (ret != 0 || status != 0) {
2668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2669                 return -1;
2670         }
2671
2672         return 0;
2673 }
2674
2675 /* 
2676   callback for the async helpers used when sending the same control
2677   to multiple nodes in parallell.
2678 */
2679 static void async_callback(struct ctdb_client_control_state *state)
2680 {
2681         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2682         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2683         int ret;
2684         TDB_DATA outdata;
2685         int32_t res;
2686         uint32_t destnode = state->c->hdr.destnode;
2687
2688         /* one more node has responded with recmode data */
2689         data->count--;
2690
2691         /* if we failed to push the db, then return an error and let
2692            the main loop try again.
2693         */
2694         if (state->state != CTDB_CONTROL_DONE) {
2695                 if ( !data->dont_log_errors) {
2696                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2697                 }
2698                 data->fail_count++;
2699                 if (data->fail_callback) {
2700                         data->fail_callback(ctdb, destnode, res, outdata,
2701                                         data->callback_data);
2702                 }
2703                 return;
2704         }
2705         
2706         state->async.fn = NULL;
2707
2708         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2709         if ((ret != 0) || (res != 0)) {
2710                 if ( !data->dont_log_errors) {
2711                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2712                 }
2713                 data->fail_count++;
2714                 if (data->fail_callback) {
2715                         data->fail_callback(ctdb, destnode, res, outdata,
2716                                         data->callback_data);
2717                 }
2718         }
2719         if ((ret == 0) && (data->callback != NULL)) {
2720                 data->callback(ctdb, destnode, res, outdata,
2721                                         data->callback_data);
2722         }
2723 }
2724
2725
2726 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2727 {
2728         /* set up the callback functions */
2729         state->async.fn = async_callback;
2730         state->async.private_data = data;
2731         
2732         /* one more control to wait for to complete */
2733         data->count++;
2734 }
2735
2736
2737 /* wait for up to the maximum number of seconds allowed
2738    or until all nodes we expect a response from has replied
2739 */
2740 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2741 {
2742         while (data->count > 0) {
2743                 event_loop_once(ctdb->ev);
2744         }
2745         if (data->fail_count != 0) {
2746                 if (!data->dont_log_errors) {
2747                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2748                                  data->fail_count));
2749                 }
2750                 return -1;
2751         }
2752         return 0;
2753 }
2754
2755
2756 /* 
2757    perform a simple control on the listed nodes
2758    The control cannot return data
2759  */
2760 int ctdb_client_async_control(struct ctdb_context *ctdb,
2761                                 enum ctdb_controls opcode,
2762                                 uint32_t *nodes,
2763                                 struct timeval timeout,
2764                                 bool dont_log_errors,
2765                                 TDB_DATA data,
2766                                 client_async_callback client_callback,
2767                                 client_async_callback fail_callback,
2768                                 void *callback_data)
2769 {
2770         struct client_async_data *async_data;
2771         struct ctdb_client_control_state *state;
2772         int j, num_nodes;
2773
2774         async_data = talloc_zero(ctdb, struct client_async_data);
2775         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2776         async_data->dont_log_errors = dont_log_errors;
2777         async_data->callback = client_callback;
2778         async_data->fail_callback = fail_callback;
2779         async_data->callback_data = callback_data;
2780         async_data->opcode        = opcode;
2781
2782         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2783
2784         /* loop over all nodes and send an async control to each of them */
2785         for (j=0; j<num_nodes; j++) {
2786                 uint32_t pnn = nodes[j];
2787
2788                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2789                                           0, data, async_data, &timeout, NULL);
2790                 if (state == NULL) {
2791                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2792                         talloc_free(async_data);
2793                         return -1;
2794                 }
2795                 
2796                 ctdb_client_async_add(async_data, state);
2797         }
2798
2799         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2800                 talloc_free(async_data);
2801                 return -1;
2802         }
2803
2804         talloc_free(async_data);
2805         return 0;
2806 }
2807
2808 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2809                                 struct ctdb_vnn_map *vnn_map,
2810                                 TALLOC_CTX *mem_ctx,
2811                                 bool include_self)
2812 {
2813         int i, j, num_nodes;
2814         uint32_t *nodes;
2815
2816         for (i=num_nodes=0;i<vnn_map->size;i++) {
2817                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2818                         continue;
2819                 }
2820                 num_nodes++;
2821         } 
2822
2823         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2824         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2825
2826         for (i=j=0;i<vnn_map->size;i++) {
2827                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2828                         continue;
2829                 }
2830                 nodes[j++] = vnn_map->map[i];
2831         } 
2832
2833         return nodes;
2834 }
2835
2836 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2837                                 struct ctdb_node_map *node_map,
2838                                 TALLOC_CTX *mem_ctx,
2839                                 bool include_self)
2840 {
2841         int i, j, num_nodes;
2842         uint32_t *nodes;
2843
2844         for (i=num_nodes=0;i<node_map->num;i++) {
2845                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2846                         continue;
2847                 }
2848                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2849                         continue;
2850                 }
2851                 num_nodes++;
2852         } 
2853
2854         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2855         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2856
2857         for (i=j=0;i<node_map->num;i++) {
2858                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2859                         continue;
2860                 }
2861                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2862                         continue;
2863                 }
2864                 nodes[j++] = node_map->nodes[i].pnn;
2865         } 
2866
2867         return nodes;
2868 }
2869
2870 /* 
2871   this is used to test if a pnn lock exists and if it exists will return
2872   the number of connections that pnn has reported or -1 if that recovery
2873   daemon is not running.
2874 */
2875 int
2876 ctdb_read_pnn_lock(int fd, int32_t pnn)
2877 {
2878         struct flock lock;
2879         char c;
2880
2881         lock.l_type = F_WRLCK;
2882         lock.l_whence = SEEK_SET;
2883         lock.l_start = pnn;
2884         lock.l_len = 1;
2885         lock.l_pid = 0;
2886
2887         if (fcntl(fd, F_GETLK, &lock) != 0) {
2888                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2889                 return -1;
2890         }
2891
2892         if (lock.l_type == F_UNLCK) {
2893                 return -1;
2894         }
2895
2896         if (pread(fd, &c, 1, pnn) == -1) {
2897                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2898                 return -1;
2899         }
2900
2901         return c;
2902 }
2903
2904 /*
2905   get capabilities of a remote node
2906  */
2907 struct ctdb_client_control_state *
2908 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2909 {
2910         return ctdb_control_send(ctdb, destnode, 0, 
2911                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2912                            mem_ctx, &timeout, NULL);
2913 }
2914
2915 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2916 {
2917         int ret;
2918         int32_t res;
2919         TDB_DATA outdata;
2920
2921         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2922         if ( (ret != 0) || (res != 0) ) {
2923                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2924                 return -1;
2925         }
2926
2927         if (capabilities) {
2928                 *capabilities = *((uint32_t *)outdata.dptr);
2929         }
2930
2931         return 0;
2932 }
2933
2934 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2935 {
2936         struct ctdb_client_control_state *state;
2937         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2938         int ret;
2939
2940         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2941         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2942         talloc_free(tmp_ctx);
2943         return ret;
2944 }