2cdec005e9cc2ca8489977815b9855d0602a1e9f
[rusty/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
30
31 /*
32   allocate a packet for use in client<->daemon communication
33  */
34 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
35                                             TALLOC_CTX *mem_ctx, 
36                                             enum ctdb_operation operation, 
37                                             size_t length, size_t slength,
38                                             const char *type)
39 {
40         int size;
41         struct ctdb_req_header *hdr;
42
43         length = MAX(length, slength);
44         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
45
46         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
47         if (hdr == NULL) {
48                 DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
49                          operation, (unsigned)length));
50                 return NULL;
51         }
52         talloc_set_name_const(hdr, type);
53         memset(hdr, 0, slength);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, uint32_t caller)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88
89         for (fn=ctdb_db->calls;fn;fn=fn->next) {
90                 if (fn->id == call->call_id) break;
91         }
92         if (fn == NULL) {
93                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
94                 talloc_free(c);
95                 return -1;
96         }
97
98         if (fn->fn(c) != 0) {
99                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
100                 talloc_free(c);
101                 return -1;
102         }
103
104         if (header->laccessor != caller) {
105                 header->lacount = 0;
106         }
107         header->laccessor = caller;
108         header->lacount++;
109
110         /* we need to force the record to be written out if this was a remote access,
111            so that the lacount is updated */
112         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
113                 c->new_data = &c->record_data;
114         }
115
116         if (c->new_data) {
117                 /* XXX check that we always have the lock here? */
118                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
119                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
120                         talloc_free(c);
121                         return -1;
122                 }
123         }
124
125         if (c->reply_data) {
126                 call->reply_data = *c->reply_data;
127                 talloc_steal(ctdb, call->reply_data.dptr);
128                 talloc_set_name_const(call->reply_data.dptr, __location__);
129         } else {
130                 call->reply_data.dptr = NULL;
131                 call->reply_data.dsize = 0;
132         }
133         call->status = c->status;
134
135         talloc_free(c);
136
137         return 0;
138 }
139
140
141 /*
142   queue a packet for sending from client to daemon
143 */
144 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
145 {
146         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
147 }
148
149
150 /*
151   called when a CTDB_REPLY_CALL packet comes in in the client
152
153   This packet comes in response to a CTDB_REQ_CALL request packet. It
154   contains any reply data from the call
155 */
156 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
157 {
158         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
159         struct ctdb_client_call_state *state;
160
161         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
162         if (state == NULL) {
163                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
164                 return;
165         }
166
167         if (hdr->reqid != state->reqid) {
168                 /* we found a record  but it was the wrong one */
169                 DEBUG(0, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
170                 return;
171         }
172
173         state->call.reply_data.dptr = c->data;
174         state->call.reply_data.dsize = c->datalen;
175         state->call.status = c->status;
176
177         talloc_steal(state, c);
178
179         state->state = CTDB_CALL_DONE;
180
181         if (state->async.fn) {
182                 state->async.fn(state);
183         }
184 }
185
186 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
187
188 /*
189   this is called in the client, when data comes in from the daemon
190  */
191 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
192 {
193         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
194         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
195         TALLOC_CTX *tmp_ctx;
196
197         /* place the packet as a child of a tmp_ctx. We then use
198            talloc_free() below to free it. If any of the calls want
199            to keep it, then they will steal it somewhere else, and the
200            talloc_free() will be a no-op */
201         tmp_ctx = talloc_new(ctdb);
202         talloc_steal(tmp_ctx, hdr);
203
204         if (cnt == 0) {
205                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
206                 exit(0);
207         }
208
209         if (cnt < sizeof(*hdr)) {
210                 DEBUG(0,("Bad packet length %u in client\n", (unsigned)cnt));
211                 goto done;
212         }
213         if (cnt != hdr->length) {
214                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
215                                (unsigned)hdr->length, (unsigned)cnt);
216                 goto done;
217         }
218
219         if (hdr->ctdb_magic != CTDB_MAGIC) {
220                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
221                 goto done;
222         }
223
224         if (hdr->ctdb_version != CTDB_VERSION) {
225                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
226                 goto done;
227         }
228
229         switch (hdr->operation) {
230         case CTDB_REPLY_CALL:
231                 ctdb_client_reply_call(ctdb, hdr);
232                 break;
233
234         case CTDB_REQ_MESSAGE:
235                 ctdb_request_message(ctdb, hdr);
236                 break;
237
238         case CTDB_REPLY_CONTROL:
239                 ctdb_client_reply_control(ctdb, hdr);
240                 break;
241
242         default:
243                 DEBUG(0,("bogus operation code:%u\n",hdr->operation));
244         }
245
246 done:
247         talloc_free(tmp_ctx);
248 }
249
250 /*
251   connect to a unix domain socket
252 */
253 int ctdb_socket_connect(struct ctdb_context *ctdb)
254 {
255         struct sockaddr_un addr;
256
257         memset(&addr, 0, sizeof(addr));
258         addr.sun_family = AF_UNIX;
259         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
260
261         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
262         if (ctdb->daemon.sd == -1) {
263                 return -1;
264         }
265
266         set_nonblocking(ctdb->daemon.sd);
267         set_close_on_exec(ctdb->daemon.sd);
268         
269         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
270                 close(ctdb->daemon.sd);
271                 ctdb->daemon.sd = -1;
272                 return -1;
273         }
274
275         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
276                                               CTDB_DS_ALIGNMENT, 
277                                               ctdb_client_read_cb, ctdb);
278         return 0;
279 }
280
281
282 struct ctdb_record_handle {
283         struct ctdb_db_context *ctdb_db;
284         TDB_DATA key;
285         TDB_DATA *data;
286         struct ctdb_ltdb_header header;
287 };
288
289
290 /*
291   make a recv call to the local ctdb daemon - called from client context
292
293   This is called when the program wants to wait for a ctdb_call to complete and get the 
294   results. This call will block unless the call has already completed.
295 */
296 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
297 {
298         if (state == NULL) {
299                 return -1;
300         }
301
302         while (state->state < CTDB_CALL_DONE) {
303                 event_loop_once(state->ctdb_db->ctdb->ev);
304         }
305         if (state->state != CTDB_CALL_DONE) {
306                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
307                 talloc_free(state);
308                 return -1;
309         }
310
311         if (state->call.reply_data.dsize) {
312                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
313                                                       state->call.reply_data.dptr,
314                                                       state->call.reply_data.dsize);
315                 call->reply_data.dsize = state->call.reply_data.dsize;
316         } else {
317                 call->reply_data.dptr = NULL;
318                 call->reply_data.dsize = 0;
319         }
320         call->status = state->call.status;
321         talloc_free(state);
322
323         return 0;
324 }
325
326
327
328
329 /*
330   destroy a ctdb_call in client
331 */
332 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
333 {
334         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
335         return 0;
336 }
337
338 /*
339   construct an event driven local ctdb_call
340
341   this is used so that locally processed ctdb_call requests are processed
342   in an event driven manner
343 */
344 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
345                                                                   struct ctdb_call *call,
346                                                                   struct ctdb_ltdb_header *header,
347                                                                   TDB_DATA *data)
348 {
349         struct ctdb_client_call_state *state;
350         struct ctdb_context *ctdb = ctdb_db->ctdb;
351         int ret;
352
353         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
354         CTDB_NO_MEMORY_NULL(ctdb, state);
355
356         talloc_steal(state, data->dptr);
357
358         state->state = CTDB_CALL_DONE;
359         state->call = *call;
360         state->ctdb_db = ctdb_db;
361
362         ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->pnn);
363
364         return state;
365 }
366
367 /*
368   make a ctdb call to the local daemon - async send. Called from client context.
369
370   This constructs a ctdb_call request and queues it for processing. 
371   This call never blocks.
372 */
373 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
374                                               struct ctdb_call *call)
375 {
376         struct ctdb_client_call_state *state;
377         struct ctdb_context *ctdb = ctdb_db->ctdb;
378         struct ctdb_ltdb_header header;
379         TDB_DATA data;
380         int ret;
381         size_t len;
382         struct ctdb_req_call *c;
383
384         /* if the domain socket is not yet open, open it */
385         if (ctdb->daemon.sd==-1) {
386                 ctdb_socket_connect(ctdb);
387         }
388
389         ret = ctdb_ltdb_lock(ctdb_db, call->key);
390         if (ret != 0) {
391                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
392                 return NULL;
393         }
394
395         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
396
397         if (ret == 0 && header.dmaster == ctdb->pnn) {
398                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
399                 talloc_free(data.dptr);
400                 ctdb_ltdb_unlock(ctdb_db, call->key);
401                 return state;
402         }
403
404         ctdb_ltdb_unlock(ctdb_db, call->key);
405         talloc_free(data.dptr);
406
407         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
408         if (state == NULL) {
409                 DEBUG(0, (__location__ " failed to allocate state\n"));
410                 return NULL;
411         }
412
413         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
414         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
415         if (c == NULL) {
416                 DEBUG(0, (__location__ " failed to allocate packet\n"));
417                 return NULL;
418         }
419
420         state->reqid     = ctdb_reqid_new(ctdb, state);
421         state->ctdb_db = ctdb_db;
422         talloc_set_destructor(state, ctdb_client_call_destructor);
423
424         c->hdr.reqid     = state->reqid;
425         c->flags         = call->flags;
426         c->db_id         = ctdb_db->db_id;
427         c->callid        = call->call_id;
428         c->hopcount      = 0;
429         c->keylen        = call->key.dsize;
430         c->calldatalen   = call->call_data.dsize;
431         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
432         memcpy(&c->data[call->key.dsize], 
433                call->call_data.dptr, call->call_data.dsize);
434         state->call                = *call;
435         state->call.call_data.dptr = &c->data[call->key.dsize];
436         state->call.key.dptr       = &c->data[0];
437
438         state->state  = CTDB_CALL_WAIT;
439
440
441         ctdb_client_queue_pkt(ctdb, &c->hdr);
442
443         return state;
444 }
445
446
447 /*
448   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
449 */
450 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
451 {
452         struct ctdb_client_call_state *state;
453
454         state = ctdb_call_send(ctdb_db, call);
455         return ctdb_call_recv(state, call);
456 }
457
458
459 /*
460   tell the daemon what messaging srvid we will use, and register the message
461   handler function in the client
462 */
463 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
464                              ctdb_message_fn_t handler,
465                              void *private_data)
466                                     
467 {
468         int res;
469         int32_t status;
470         
471         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
472                            tdb_null, NULL, NULL, &status, NULL, NULL);
473         if (res != 0 || status != 0) {
474                 DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
475                 return -1;
476         }
477
478         /* also need to register the handler with our own ctdb structure */
479         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
480 }
481
482 /*
483   tell the daemon we no longer want a srvid
484 */
485 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
486 {
487         int res;
488         int32_t status;
489         
490         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
491                            tdb_null, NULL, NULL, &status, NULL, NULL);
492         if (res != 0 || status != 0) {
493                 DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
494                 return -1;
495         }
496
497         /* also need to register the handler with our own ctdb structure */
498         ctdb_deregister_message_handler(ctdb, srvid, private_data);
499         return 0;
500 }
501
502
503 /*
504   send a message - from client context
505  */
506 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
507                       uint64_t srvid, TDB_DATA data)
508 {
509         struct ctdb_req_message *r;
510         int len, res;
511
512         len = offsetof(struct ctdb_req_message, data) + data.dsize;
513         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
514                                len, struct ctdb_req_message);
515         CTDB_NO_MEMORY(ctdb, r);
516
517         r->hdr.destnode  = pnn;
518         r->srvid         = srvid;
519         r->datalen       = data.dsize;
520         memcpy(&r->data[0], data.dptr, data.dsize);
521         
522         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
523         if (res != 0) {
524                 return res;
525         }
526
527         talloc_free(r);
528         return 0;
529 }
530
531
532 /*
533   cancel a ctdb_fetch_lock operation, releasing the lock
534  */
535 static int fetch_lock_destructor(struct ctdb_record_handle *h)
536 {
537         ctdb_ltdb_unlock(h->ctdb_db, h->key);
538         return 0;
539 }
540
541 /*
542   force the migration of a record to this node
543  */
544 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
545 {
546         struct ctdb_call call;
547         ZERO_STRUCT(call);
548         call.call_id = CTDB_NULL_FUNC;
549         call.key = key;
550         call.flags = CTDB_IMMEDIATE_MIGRATION;
551         return ctdb_call(ctdb_db, &call);
552 }
553
554 /*
555   get a lock on a record, and return the records data. Blocks until it gets the lock
556  */
557 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
558                                            TDB_DATA key, TDB_DATA *data)
559 {
560         int ret;
561         struct ctdb_record_handle *h;
562
563         /*
564           procedure is as follows:
565
566           1) get the chain lock. 
567           2) check if we are dmaster
568           3) if we are the dmaster then return handle 
569           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
570              reply from ctdbd
571           5) when we get the reply, goto (1)
572          */
573
574         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
575         if (h == NULL) {
576                 return NULL;
577         }
578
579         h->ctdb_db = ctdb_db;
580         h->key     = key;
581         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
582         if (h->key.dptr == NULL) {
583                 talloc_free(h);
584                 return NULL;
585         }
586         h->data    = data;
587
588         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
589                  (const char *)key.dptr));
590
591 again:
592         /* step 1 - get the chain lock */
593         ret = ctdb_ltdb_lock(ctdb_db, key);
594         if (ret != 0) {
595                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
596                 talloc_free(h);
597                 return NULL;
598         }
599
600         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
601
602         talloc_set_destructor(h, fetch_lock_destructor);
603
604         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
605
606         /* when torturing, ensure we test the remote path */
607         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
608             random() % 5 == 0) {
609                 h->header.dmaster = (uint32_t)-1;
610         }
611
612
613         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
614
615         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
616                 ctdb_ltdb_unlock(ctdb_db, key);
617                 ret = ctdb_client_force_migration(ctdb_db, key);
618                 if (ret != 0) {
619                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
620                         talloc_free(h);
621                         return NULL;
622                 }
623                 goto again;
624         }
625
626         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
627         return h;
628 }
629
630 /*
631   store some data to the record that was locked with ctdb_fetch_lock()
632 */
633 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
634 {
635         int ret;
636         int32_t status;
637         struct ctdb_rec_data *rec;
638         TDB_DATA recdata;
639
640         if (h->ctdb_db->persistent) {
641                 h->header.rsn++;
642         }
643
644         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
645         if (ret != 0) {
646                 return ret;
647         }
648
649         /* don't need the persistent_store control for non-persistent databases */
650         if (!h->ctdb_db->persistent) {
651                 return 0;
652         }
653
654         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
655         if (rec == NULL) {
656                 DEBUG(0,("Unable to marshall record in ctdb_record_store\n"));
657                 return -1;
658         }
659
660         recdata.dptr = (uint8_t *)rec;
661         recdata.dsize = rec->length;
662
663         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
664                            CTDB_CONTROL_PERSISTENT_STORE, 0,
665                            recdata, NULL, NULL, &status, NULL, NULL);
666
667         talloc_free(rec);
668
669         if (ret != 0 || status != 0) {
670                 DEBUG(0,("Failed persistent store in ctdb_record_store\n"));
671                 return -1;
672         }
673
674         return 0;
675 }
676
677 /*
678   non-locking fetch of a record
679  */
680 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
681                TDB_DATA key, TDB_DATA *data)
682 {
683         struct ctdb_call call;
684         int ret;
685
686         call.call_id = CTDB_FETCH_FUNC;
687         call.call_data.dptr = NULL;
688         call.call_data.dsize = 0;
689
690         ret = ctdb_call(ctdb_db, &call);
691
692         if (ret == 0) {
693                 *data = call.reply_data;
694                 talloc_steal(mem_ctx, data->dptr);
695         }
696
697         return ret;
698 }
699
700
701
702 /*
703    called when a control completes or timesout to invoke the callback
704    function the user provided
705 */
706 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
707         struct timeval t, void *private_data)
708 {
709         struct ctdb_client_control_state *state;
710         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
711         int ret;
712
713         state = talloc_get_type(private_data, struct ctdb_client_control_state);
714         talloc_steal(tmp_ctx, state);
715
716         ret = ctdb_control_recv(state->ctdb, state, state,
717                         NULL, 
718                         NULL, 
719                         NULL);
720
721         talloc_free(tmp_ctx);
722 }
723
724 /*
725   called when a CTDB_REPLY_CONTROL packet comes in in the client
726
727   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
728   contains any reply data from the control
729 */
730 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
731                                       struct ctdb_req_header *hdr)
732 {
733         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
734         struct ctdb_client_control_state *state;
735
736         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
737         if (state == NULL) {
738                 DEBUG(0,(__location__ " reqid %u not found\n", hdr->reqid));
739                 return;
740         }
741
742         if (hdr->reqid != state->reqid) {
743                 /* we found a record  but it was the wrong one */
744                 DEBUG(0, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
745                 return;
746         }
747
748         state->outdata.dptr = c->data;
749         state->outdata.dsize = c->datalen;
750         state->status = c->status;
751         if (c->errorlen) {
752                 state->errormsg = talloc_strndup(state, 
753                                                  (char *)&c->data[c->datalen], 
754                                                  c->errorlen);
755         }
756
757         /* state->outdata now uses resources from c so we dont want c
758            to just dissappear from under us while state is still alive
759         */
760         talloc_steal(state, c);
761
762         state->state = CTDB_CONTROL_DONE;
763
764         /* if we had a callback registered for this control, pull the response
765            and call the callback.
766         */
767         if (state->async.fn) {
768                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
769         }
770 }
771
772
773 /*
774   destroy a ctdb_control in client
775 */
776 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
777 {
778         ctdb_reqid_remove(state->ctdb, state->reqid);
779         return 0;
780 }
781
782
783 /* time out handler for ctdb_control */
784 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
785         struct timeval t, void *private_data)
786 {
787         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
788
789         DEBUG(0,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
790
791         state->state = CTDB_CONTROL_TIMEOUT;
792
793         /* if we had a callback registered for this control, pull the response
794            and call the callback.
795         */
796         if (state->async.fn) {
797                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
798         }
799 }
800
801 /* async version of send control request */
802 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
803                 uint32_t destnode, uint64_t srvid, 
804                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
805                 TALLOC_CTX *mem_ctx,
806                 struct timeval *timeout,
807                 char **errormsg)
808 {
809         struct ctdb_client_control_state *state;
810         size_t len;
811         struct ctdb_req_control *c;
812         int ret;
813
814         if (errormsg) {
815                 *errormsg = NULL;
816         }
817
818         /* if the domain socket is not yet open, open it */
819         if (ctdb->daemon.sd==-1) {
820                 ctdb_socket_connect(ctdb);
821         }
822
823         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
824         CTDB_NO_MEMORY_NULL(ctdb, state);
825
826         state->ctdb       = ctdb;
827         state->reqid      = ctdb_reqid_new(ctdb, state);
828         state->state      = CTDB_CONTROL_WAIT;
829         state->errormsg   = NULL;
830
831         talloc_set_destructor(state, ctdb_control_destructor);
832
833         len = offsetof(struct ctdb_req_control, data) + data.dsize;
834         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
835                                len, struct ctdb_req_control);
836         state->c            = c;        
837         CTDB_NO_MEMORY_NULL(ctdb, c);
838         c->hdr.reqid        = state->reqid;
839         c->hdr.destnode     = destnode;
840         c->hdr.reqid        = state->reqid;
841         c->opcode           = opcode;
842         c->client_id        = 0;
843         c->flags            = flags;
844         c->srvid            = srvid;
845         c->datalen          = data.dsize;
846         if (data.dsize) {
847                 memcpy(&c->data[0], data.dptr, data.dsize);
848         }
849
850         /* timeout */
851         if (timeout && !timeval_is_zero(timeout)) {
852                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
853         }
854
855         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
856         if (ret != 0) {
857                 talloc_free(state);
858                 return NULL;
859         }
860
861         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
862                 talloc_free(state);
863                 return NULL;
864         }
865
866         return state;
867 }
868
869
870 /* async version of receive control reply */
871 int ctdb_control_recv(struct ctdb_context *ctdb, 
872                 struct ctdb_client_control_state *state, 
873                 TALLOC_CTX *mem_ctx,
874                 TDB_DATA *outdata, int32_t *status, char **errormsg)
875 {
876         TALLOC_CTX *tmp_ctx;
877
878         if (state == NULL) {
879                 return -1;
880         }
881
882         /* prevent double free of state */
883         tmp_ctx = talloc_new(ctdb);
884         talloc_steal(tmp_ctx, state);
885
886         /* loop one event at a time until we either timeout or the control
887            completes.
888         */
889         while (state->state == CTDB_CONTROL_WAIT) {
890                 event_loop_once(ctdb->ev);
891         }
892
893         if (state->state != CTDB_CONTROL_DONE) {
894                 DEBUG(0,(__location__ " ctdb_control_recv failed\n"));
895                 if (state->async.fn) {
896                         state->async.fn(state);
897                 }
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (state->errormsg) {
903                 DEBUG(0,("ctdb_control error: '%s'\n", state->errormsg));
904                 if (errormsg) {
905                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
906                 }
907                 if (state->async.fn) {
908                         state->async.fn(state);
909                 }
910                 talloc_free(tmp_ctx);
911                 return -1;
912         }
913
914         if (outdata) {
915                 *outdata = state->outdata;
916                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
917         }
918
919         if (status) {
920                 *status = state->status;
921         }
922
923         if (state->async.fn) {
924                 state->async.fn(state);
925         }
926
927         talloc_free(tmp_ctx);
928         return 0;
929 }
930
931
932
933 /*
934   send a ctdb control message
935   timeout specifies how long we should wait for a reply.
936   if timeout is NULL we wait indefinitely
937  */
938 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
939                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
940                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
941                  struct timeval *timeout,
942                  char **errormsg)
943 {
944         struct ctdb_client_control_state *state;
945
946         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
947                         flags, data, mem_ctx,
948                         timeout, errormsg);
949         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
950                         errormsg);
951 }
952
953
954
955
956 /*
957   a process exists call. Returns 0 if process exists, -1 otherwise
958  */
959 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
960 {
961         int ret;
962         TDB_DATA data;
963         int32_t status;
964
965         data.dptr = (uint8_t*)&pid;
966         data.dsize = sizeof(pid);
967
968         ret = ctdb_control(ctdb, destnode, 0, 
969                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
970                            NULL, NULL, &status, NULL, NULL);
971         if (ret != 0) {
972                 DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
973                 return -1;
974         }
975
976         return status;
977 }
978
979 /*
980   get remote statistics
981  */
982 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
983 {
984         int ret;
985         TDB_DATA data;
986         int32_t res;
987
988         ret = ctdb_control(ctdb, destnode, 0, 
989                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
990                            ctdb, &data, &res, NULL, NULL);
991         if (ret != 0 || res != 0) {
992                 DEBUG(0,(__location__ " ctdb_control for statistics failed\n"));
993                 return -1;
994         }
995
996         if (data.dsize != sizeof(struct ctdb_statistics)) {
997                 DEBUG(0,(__location__ " Wrong statistics size %u - expected %u\n",
998                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
999                       return -1;
1000         }
1001
1002         *status = *(struct ctdb_statistics *)data.dptr;
1003         talloc_free(data.dptr);
1004                         
1005         return 0;
1006 }
1007
1008 /*
1009   shutdown a remote ctdb node
1010  */
1011 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1012 {
1013         struct ctdb_client_control_state *state;
1014
1015         state = ctdb_control_send(ctdb, destnode, 0, 
1016                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1017                            NULL, &timeout, NULL);
1018         if (state == NULL) {
1019                 DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
1020                 return -1;
1021         }
1022
1023         return 0;
1024 }
1025
1026 /*
1027   get vnn map from a remote node
1028  */
1029 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1030 {
1031         int ret;
1032         TDB_DATA outdata;
1033         int32_t res;
1034         struct ctdb_vnn_map_wire *map;
1035
1036         ret = ctdb_control(ctdb, destnode, 0, 
1037                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1038                            mem_ctx, &outdata, &res, &timeout, NULL);
1039         if (ret != 0 || res != 0) {
1040                 DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
1041                 return -1;
1042         }
1043         
1044         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1045         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1046             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1047                 DEBUG(0,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1048                 return -1;
1049         }
1050
1051         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1052         CTDB_NO_MEMORY(ctdb, *vnnmap);
1053         (*vnnmap)->generation = map->generation;
1054         (*vnnmap)->size       = map->size;
1055         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1056
1057         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1058         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1059         talloc_free(outdata.dptr);
1060                     
1061         return 0;
1062 }
1063
1064
1065 /*
1066   get the recovery mode of a remote node
1067  */
1068 struct ctdb_client_control_state *
1069 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1070 {
1071         return ctdb_control_send(ctdb, destnode, 0, 
1072                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1073                            mem_ctx, &timeout, NULL);
1074 }
1075
1076 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1077 {
1078         int ret;
1079         int32_t res;
1080
1081         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1082         if (ret != 0) {
1083                 DEBUG(0,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1084                 return -1;
1085         }
1086
1087         if (recmode) {
1088                 *recmode = (uint32_t)res;
1089         }
1090
1091         return 0;
1092 }
1093
1094 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1095 {
1096         struct ctdb_client_control_state *state;
1097
1098         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1099         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1100 }
1101
1102
1103
1104
1105 /*
1106   set the recovery mode of a remote node
1107  */
1108 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1109 {
1110         int ret;
1111         TDB_DATA data;
1112         int32_t res;
1113
1114         data.dsize = sizeof(uint32_t);
1115         data.dptr = (unsigned char *)&recmode;
1116
1117         ret = ctdb_control(ctdb, destnode, 0, 
1118                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1119                            NULL, NULL, &res, &timeout, NULL);
1120         if (ret != 0 || res != 0) {
1121                 DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
1122                 return -1;
1123         }
1124
1125         return 0;
1126 }
1127
1128
1129
1130 /*
1131   get the recovery master of a remote node
1132  */
1133 struct ctdb_client_control_state *
1134 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1135                         struct timeval timeout, uint32_t destnode)
1136 {
1137         return ctdb_control_send(ctdb, destnode, 0, 
1138                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1139                            mem_ctx, &timeout, NULL);
1140 }
1141
1142 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1143 {
1144         int ret;
1145         int32_t res;
1146
1147         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1148         if (ret != 0) {
1149                 DEBUG(0,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1150                 return -1;
1151         }
1152
1153         if (recmaster) {
1154                 *recmaster = (uint32_t)res;
1155         }
1156
1157         return 0;
1158 }
1159
1160 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1161 {
1162         struct ctdb_client_control_state *state;
1163
1164         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1165         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1166 }
1167
1168
1169 /*
1170   set the recovery master of a remote node
1171  */
1172 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1173 {
1174         int ret;
1175         TDB_DATA data;
1176         int32_t res;
1177
1178         ZERO_STRUCT(data);
1179         data.dsize = sizeof(uint32_t);
1180         data.dptr = (unsigned char *)&recmaster;
1181
1182         ret = ctdb_control(ctdb, destnode, 0, 
1183                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1184                            NULL, NULL, &res, &timeout, NULL);
1185         if (ret != 0 || res != 0) {
1186                 DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
1187                 return -1;
1188         }
1189
1190         return 0;
1191 }
1192
1193
1194 /*
1195   get a list of databases off a remote node
1196  */
1197 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1198                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1199 {
1200         int ret;
1201         TDB_DATA outdata;
1202         int32_t res;
1203
1204         ret = ctdb_control(ctdb, destnode, 0, 
1205                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1206                            mem_ctx, &outdata, &res, &timeout, NULL);
1207         if (ret != 0 || res != 0) {
1208                 DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
1209                 return -1;
1210         }
1211
1212         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1213         talloc_free(outdata.dptr);
1214                     
1215         return 0;
1216 }
1217
1218
1219 /*
1220   get a list of nodes (vnn and flags ) from a remote node
1221  */
1222 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1223                 struct timeval timeout, uint32_t destnode, 
1224                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1225 {
1226         int ret;
1227         TDB_DATA outdata;
1228         int32_t res;
1229
1230         ret = ctdb_control(ctdb, destnode, 0, 
1231                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1232                            mem_ctx, &outdata, &res, &timeout, NULL);
1233         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1234                 DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
1235                 return -1;
1236         }
1237
1238         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1239         talloc_free(outdata.dptr);
1240                     
1241         return 0;
1242 }
1243
1244 /*
1245   set vnn map on a node
1246  */
1247 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1248                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1249 {
1250         int ret;
1251         TDB_DATA data;
1252         int32_t res;
1253         struct ctdb_vnn_map_wire *map;
1254         size_t len;
1255
1256         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1257         map = talloc_size(mem_ctx, len);
1258         CTDB_NO_MEMORY_VOID(ctdb, map);
1259
1260         map->generation = vnnmap->generation;
1261         map->size = vnnmap->size;
1262         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1263         
1264         data.dsize = len;
1265         data.dptr  = (uint8_t *)map;
1266
1267         ret = ctdb_control(ctdb, destnode, 0, 
1268                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1269                            NULL, NULL, &res, &timeout, NULL);
1270         if (ret != 0 || res != 0) {
1271                 DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
1272                 return -1;
1273         }
1274
1275         talloc_free(map);
1276
1277         return 0;
1278 }
1279
1280
1281 /*
1282   async send for pull database
1283  */
1284 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1285         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1286         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1287 {
1288         TDB_DATA indata;
1289         struct ctdb_control_pulldb *pull;
1290         struct ctdb_client_control_state *state;
1291
1292         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1293         CTDB_NO_MEMORY_NULL(ctdb, pull);
1294
1295         pull->db_id   = dbid;
1296         pull->lmaster = lmaster;
1297
1298         indata.dsize = sizeof(struct ctdb_control_pulldb);
1299         indata.dptr  = (unsigned char *)pull;
1300
1301         state = ctdb_control_send(ctdb, destnode, 0, 
1302                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1303                                   mem_ctx, &timeout, NULL);
1304         talloc_free(pull);
1305
1306         return state;
1307 }
1308
1309 /*
1310   async recv for pull database
1311  */
1312 int ctdb_ctrl_pulldb_recv(
1313         struct ctdb_context *ctdb, 
1314         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1315         TDB_DATA *outdata)
1316 {
1317         int ret;
1318         int32_t res;
1319
1320         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1321         if ( (ret != 0) || (res != 0) ){
1322                 DEBUG(0,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1323                 return -1;
1324         }
1325
1326         return 0;
1327 }
1328
1329 /*
1330   pull all keys and records for a specific database on a node
1331  */
1332 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1333                 uint32_t dbid, uint32_t lmaster, 
1334                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1335                 TDB_DATA *outdata)
1336 {
1337         struct ctdb_client_control_state *state;
1338
1339         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1340                                       timeout);
1341         
1342         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1343 }
1344
1345
1346 /*
1347   change dmaster for all keys in the database to the new value
1348  */
1349 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1350                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1351 {
1352         int ret;
1353         TDB_DATA indata;
1354         int32_t res;
1355
1356         indata.dsize = 2*sizeof(uint32_t);
1357         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1358
1359         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1360         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1361
1362         ret = ctdb_control(ctdb, destnode, 0, 
1363                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1364                            NULL, NULL, &res, &timeout, NULL);
1365         if (ret != 0 || res != 0) {
1366                 DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
1367                 return -1;
1368         }
1369
1370         return 0;
1371 }
1372
1373 /*
1374   ping a node, return number of clients connected
1375  */
1376 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1377 {
1378         int ret;
1379         int32_t res;
1380
1381         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1382                            tdb_null, NULL, NULL, &res, NULL, NULL);
1383         if (ret != 0) {
1384                 return -1;
1385         }
1386         return res;
1387 }
1388
1389 /*
1390   find the real path to a ltdb 
1391  */
1392 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1393                    const char **path)
1394 {
1395         int ret;
1396         int32_t res;
1397         TDB_DATA data;
1398
1399         data.dptr = (uint8_t *)&dbid;
1400         data.dsize = sizeof(dbid);
1401
1402         ret = ctdb_control(ctdb, destnode, 0, 
1403                            CTDB_CONTROL_GETDBPATH, 0, data, 
1404                            mem_ctx, &data, &res, &timeout, NULL);
1405         if (ret != 0 || res != 0) {
1406                 return -1;
1407         }
1408
1409         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1410         if ((*path) == NULL) {
1411                 return -1;
1412         }
1413
1414         talloc_free(data.dptr);
1415
1416         return 0;
1417 }
1418
1419 /*
1420   find the name of a db 
1421  */
1422 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1423                    const char **name)
1424 {
1425         int ret;
1426         int32_t res;
1427         TDB_DATA data;
1428
1429         data.dptr = (uint8_t *)&dbid;
1430         data.dsize = sizeof(dbid);
1431
1432         ret = ctdb_control(ctdb, destnode, 0, 
1433                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1434                            mem_ctx, &data, &res, &timeout, NULL);
1435         if (ret != 0 || res != 0) {
1436                 return -1;
1437         }
1438
1439         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1440         if ((*name) == NULL) {
1441                 return -1;
1442         }
1443
1444         talloc_free(data.dptr);
1445
1446         return 0;
1447 }
1448
1449 /*
1450   create a database
1451  */
1452 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1453                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1454 {
1455         int ret;
1456         int32_t res;
1457         TDB_DATA data;
1458
1459         data.dptr = discard_const(name);
1460         data.dsize = strlen(name)+1;
1461
1462         ret = ctdb_control(ctdb, destnode, 0, 
1463                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1464                            0, data, 
1465                            mem_ctx, &data, &res, &timeout, NULL);
1466
1467         if (ret != 0 || res != 0) {
1468                 return -1;
1469         }
1470
1471         return 0;
1472 }
1473
1474 /*
1475   get debug level on a node
1476  */
1477 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *level)
1478 {
1479         int ret;
1480         int32_t res;
1481         TDB_DATA data;
1482
1483         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1484                            ctdb, &data, &res, NULL, NULL);
1485         if (ret != 0 || res != 0) {
1486                 return -1;
1487         }
1488         if (data.dsize != sizeof(uint32_t)) {
1489                 DEBUG(0,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1490                          (unsigned)data.dsize));
1491                 return -1;
1492         }
1493         *level = *(uint32_t *)data.dptr;
1494         talloc_free(data.dptr);
1495         return 0;
1496 }
1497
1498 /*
1499   set debug level on a node
1500  */
1501 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t level)
1502 {
1503         int ret;
1504         int32_t res;
1505         TDB_DATA data;
1506
1507         data.dptr = (uint8_t *)&level;
1508         data.dsize = sizeof(level);
1509
1510         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1511                            NULL, NULL, &res, NULL, NULL);
1512         if (ret != 0 || res != 0) {
1513                 return -1;
1514         }
1515         return 0;
1516 }
1517
1518
1519 /*
1520   get a list of connected nodes
1521  */
1522 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1523                                 struct timeval timeout,
1524                                 TALLOC_CTX *mem_ctx,
1525                                 uint32_t *num_nodes)
1526 {
1527         struct ctdb_node_map *map=NULL;
1528         int ret, i;
1529         uint32_t *nodes;
1530
1531         *num_nodes = 0;
1532
1533         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1534         if (ret != 0) {
1535                 return NULL;
1536         }
1537
1538         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1539         if (nodes == NULL) {
1540                 return NULL;
1541         }
1542
1543         for (i=0;i<map->num;i++) {
1544                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1545                         nodes[*num_nodes] = map->nodes[i].pnn;
1546                         (*num_nodes)++;
1547                 }
1548         }
1549
1550         return nodes;
1551 }
1552
1553
1554 /*
1555   reset remote status
1556  */
1557 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1558 {
1559         int ret;
1560         int32_t res;
1561
1562         ret = ctdb_control(ctdb, destnode, 0, 
1563                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1564                            NULL, NULL, &res, NULL, NULL);
1565         if (ret != 0 || res != 0) {
1566                 DEBUG(0,(__location__ " ctdb_control for reset statistics failed\n"));
1567                 return -1;
1568         }
1569         return 0;
1570 }
1571
1572 /*
1573   this is the dummy null procedure that all databases support
1574 */
1575 static int ctdb_null_func(struct ctdb_call_info *call)
1576 {
1577         return 0;
1578 }
1579
1580 /*
1581   this is a plain fetch procedure that all databases support
1582 */
1583 static int ctdb_fetch_func(struct ctdb_call_info *call)
1584 {
1585         call->reply_data = &call->record_data;
1586         return 0;
1587 }
1588
1589 /*
1590   attach to a specific database - client call
1591 */
1592 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent)
1593 {
1594         struct ctdb_db_context *ctdb_db;
1595         TDB_DATA data;
1596         int ret;
1597         int32_t res;
1598
1599         ctdb_db = ctdb_db_handle(ctdb, name);
1600         if (ctdb_db) {
1601                 return ctdb_db;
1602         }
1603
1604         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1605         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1606
1607         ctdb_db->ctdb = ctdb;
1608         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1609         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1610
1611         data.dptr = discard_const(name);
1612         data.dsize = strlen(name)+1;
1613
1614         /* tell ctdb daemon to attach */
1615         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
1616                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1617                            0, data, ctdb_db, &data, &res, NULL, NULL);
1618         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1619                 DEBUG(0,("Failed to attach to database '%s'\n", name));
1620                 talloc_free(ctdb_db);
1621                 return NULL;
1622         }
1623         
1624         ctdb_db->db_id = *(uint32_t *)data.dptr;
1625         talloc_free(data.dptr);
1626
1627         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1628         if (ret != 0) {
1629                 DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
1630                 talloc_free(ctdb_db);
1631                 return NULL;
1632         }
1633
1634         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, persistent?TDB_DEFAULT:TDB_NOSYNC, O_RDWR, 0);
1635         if (ctdb_db->ltdb == NULL) {
1636                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1637                 talloc_free(ctdb_db);
1638                 return NULL;
1639         }
1640
1641         ctdb_db->persistent = persistent;
1642
1643         DLIST_ADD(ctdb->db_list, ctdb_db);
1644
1645         /* add well known functions */
1646         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1647         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1648
1649         return ctdb_db;
1650 }
1651
1652
1653 /*
1654   setup a call for a database
1655  */
1656 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1657 {
1658         struct ctdb_registered_call *call;
1659
1660 #if 0
1661         TDB_DATA data;
1662         int32_t status;
1663         struct ctdb_control_set_call c;
1664         int ret;
1665
1666         /* this is no longer valid with the separate daemon architecture */
1667         c.db_id = ctdb_db->db_id;
1668         c.fn    = fn;
1669         c.id    = id;
1670
1671         data.dptr = (uint8_t *)&c;
1672         data.dsize = sizeof(c);
1673
1674         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1675                            data, NULL, NULL, &status, NULL, NULL);
1676         if (ret != 0 || status != 0) {
1677                 DEBUG(0,("ctdb_set_call failed for call %u\n", id));
1678                 return -1;
1679         }
1680 #endif
1681
1682         /* also register locally */
1683         call = talloc(ctdb_db, struct ctdb_registered_call);
1684         call->fn = fn;
1685         call->id = id;
1686
1687         DLIST_ADD(ctdb_db->calls, call);        
1688         return 0;
1689 }
1690
1691
1692 struct traverse_state {
1693         bool done;
1694         uint32_t count;
1695         ctdb_traverse_func fn;
1696         void *private_data;
1697 };
1698
1699 /*
1700   called on each key during a ctdb_traverse
1701  */
1702 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1703 {
1704         struct traverse_state *state = (struct traverse_state *)p;
1705         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1706         TDB_DATA key;
1707
1708         if (data.dsize < sizeof(uint32_t) ||
1709             d->length != data.dsize) {
1710                 DEBUG(0,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1711                 state->done = True;
1712                 return;
1713         }
1714
1715         key.dsize = d->keylen;
1716         key.dptr  = &d->data[0];
1717         data.dsize = d->datalen;
1718         data.dptr = &d->data[d->keylen];
1719
1720         if (key.dsize == 0 && data.dsize == 0) {
1721                 /* end of traverse */
1722                 state->done = True;
1723                 return;
1724         }
1725
1726         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1727                 state->done = True;
1728         }
1729
1730         state->count++;
1731 }
1732
1733
1734 /*
1735   start a cluster wide traverse, calling the supplied fn on each record
1736   return the number of records traversed, or -1 on error
1737  */
1738 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1739 {
1740         TDB_DATA data;
1741         struct ctdb_traverse_start t;
1742         int32_t status;
1743         int ret;
1744         uint64_t srvid = (getpid() | 0xFLL<<60);
1745         struct traverse_state state;
1746
1747         state.done = False;
1748         state.count = 0;
1749         state.private_data = private_data;
1750         state.fn = fn;
1751
1752         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1753         if (ret != 0) {
1754                 DEBUG(0,("Failed to setup traverse handler\n"));
1755                 return -1;
1756         }
1757
1758         t.db_id = ctdb_db->db_id;
1759         t.srvid = srvid;
1760         t.reqid = 0;
1761
1762         data.dptr = (uint8_t *)&t;
1763         data.dsize = sizeof(t);
1764
1765         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1766                            data, NULL, NULL, &status, NULL, NULL);
1767         if (ret != 0 || status != 0) {
1768                 DEBUG(0,("ctdb_traverse_all failed\n"));
1769                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1770                 return -1;
1771         }
1772
1773         while (!state.done) {
1774                 event_loop_once(ctdb_db->ctdb->ev);
1775         }
1776
1777         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1778         if (ret != 0) {
1779                 DEBUG(0,("Failed to remove ctdb_traverse handler\n"));
1780                 return -1;
1781         }
1782
1783         return state.count;
1784 }
1785
1786 /*
1787   called on each key during a catdb
1788  */
1789 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1790 {
1791         FILE *f = (FILE *)p;
1792         char *keystr, *datastr;
1793         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1794
1795         keystr  = hex_encode_talloc(ctdb, key.dptr, key.dsize);
1796         datastr = hex_encode_talloc(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
1797
1798         fprintf(f, "dmaster: %u\n", h->dmaster);
1799         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1800         fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
1801
1802         talloc_free(keystr);
1803         talloc_free(datastr);
1804         return 0;
1805 }
1806
1807 /*
1808   convenience function to list all keys to stdout
1809  */
1810 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1811 {
1812         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1813 }
1814
1815 /*
1816   get the pid of a ctdb daemon
1817  */
1818 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1819 {
1820         int ret;
1821         int32_t res;
1822
1823         ret = ctdb_control(ctdb, destnode, 0, 
1824                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1825                            NULL, NULL, &res, &timeout, NULL);
1826         if (ret != 0) {
1827                 DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
1828                 return -1;
1829         }
1830
1831         *pid = res;
1832
1833         return 0;
1834 }
1835
1836
1837 /*
1838   async freeze send control
1839  */
1840 struct ctdb_client_control_state *
1841 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1842 {
1843         return ctdb_control_send(ctdb, destnode, 0, 
1844                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1845                            mem_ctx, &timeout, NULL);
1846 }
1847
1848 /* 
1849    async freeze recv control
1850 */
1851 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1852 {
1853         int ret;
1854         int32_t res;
1855
1856         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1857         if ( (ret != 0) || (res != 0) ){
1858                 DEBUG(0,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1859                 return -1;
1860         }
1861
1862         return 0;
1863 }
1864
1865 /*
1866   freeze a node
1867  */
1868 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1869 {
1870         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1871         struct ctdb_client_control_state *state;
1872         int ret;
1873
1874         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1875         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1876         talloc_free(tmp_ctx);
1877
1878         return ret;
1879 }
1880
1881 /*
1882   thaw a node
1883  */
1884 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1885 {
1886         int ret;
1887         int32_t res;
1888
1889         ret = ctdb_control(ctdb, destnode, 0, 
1890                            CTDB_CONTROL_THAW, 0, tdb_null, 
1891                            NULL, NULL, &res, &timeout, NULL);
1892         if (ret != 0 || res != 0) {
1893                 DEBUG(0,(__location__ " ctdb_control thaw failed\n"));
1894                 return -1;
1895         }
1896
1897         return 0;
1898 }
1899
1900 /*
1901   get pnn of a node, or -1
1902  */
1903 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1904 {
1905         int ret;
1906         int32_t res;
1907
1908         ret = ctdb_control(ctdb, destnode, 0, 
1909                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1910                            NULL, NULL, &res, &timeout, NULL);
1911         if (ret != 0) {
1912                 DEBUG(0,(__location__ " ctdb_control for getpnn failed\n"));
1913                 return -1;
1914         }
1915
1916         return res;
1917 }
1918
1919 /*
1920   get the monitoring mode of a remote node
1921  */
1922 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1923 {
1924         int ret;
1925         int32_t res;
1926
1927         ret = ctdb_control(ctdb, destnode, 0, 
1928                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1929                            NULL, NULL, &res, &timeout, NULL);
1930         if (ret != 0) {
1931                 DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
1932                 return -1;
1933         }
1934
1935         *monmode = res;
1936
1937         return 0;
1938 }
1939
1940 /* 
1941   sent to a node to make it take over an ip address
1942 */
1943 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1944                           uint32_t destnode, struct ctdb_public_ip *ip)
1945 {
1946         TDB_DATA data;
1947         int ret;
1948         int32_t res;
1949
1950         data.dsize = sizeof(*ip);
1951         data.dptr  = (uint8_t *)ip;
1952
1953         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1954                            NULL, &res, &timeout, NULL);
1955
1956         if (ret != 0 || res != 0) {
1957                 DEBUG(0,(__location__ " ctdb_control for takeover_ip failed\n"));
1958                 return -1;
1959         }
1960
1961         return 0;       
1962 }
1963
1964
1965 /* 
1966   sent to a node to make it release an ip address
1967 */
1968 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1969                          uint32_t destnode, struct ctdb_public_ip *ip)
1970 {
1971         TDB_DATA data;
1972         int ret;
1973         int32_t res;
1974
1975         data.dsize = sizeof(*ip);
1976         data.dptr  = (uint8_t *)ip;
1977
1978         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1979                            NULL, &res, &timeout, NULL);
1980
1981         if (ret != 0 || res != 0) {
1982                 DEBUG(0,(__location__ " ctdb_control for release_ip failed\n"));
1983                 return -1;
1984         }
1985
1986         return 0;       
1987 }
1988
1989
1990 /*
1991   get a tunable
1992  */
1993 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1994                           struct timeval timeout, 
1995                           uint32_t destnode,
1996                           const char *name, uint32_t *value)
1997 {
1998         struct ctdb_control_get_tunable *t;
1999         TDB_DATA data, outdata;
2000         int32_t res;
2001         int ret;
2002
2003         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2004         data.dptr  = talloc_size(ctdb, data.dsize);
2005         CTDB_NO_MEMORY(ctdb, data.dptr);
2006
2007         t = (struct ctdb_control_get_tunable *)data.dptr;
2008         t->length = strlen(name)+1;
2009         memcpy(t->name, name, t->length);
2010
2011         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2012                            &outdata, &res, &timeout, NULL);
2013         talloc_free(data.dptr);
2014         if (ret != 0 || res != 0) {
2015                 DEBUG(0,(__location__ " ctdb_control for get_tunable failed\n"));
2016                 return -1;
2017         }
2018
2019         if (outdata.dsize != sizeof(uint32_t)) {
2020                 DEBUG(0,("Invalid return data in get_tunable\n"));
2021                 talloc_free(outdata.dptr);
2022                 return -1;
2023         }
2024         
2025         *value = *(uint32_t *)outdata.dptr;
2026         talloc_free(outdata.dptr);
2027
2028         return 0;
2029 }
2030
2031 /*
2032   set a tunable
2033  */
2034 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2035                           struct timeval timeout, 
2036                           uint32_t destnode,
2037                           const char *name, uint32_t value)
2038 {
2039         struct ctdb_control_set_tunable *t;
2040         TDB_DATA data;
2041         int32_t res;
2042         int ret;
2043
2044         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2045         data.dptr  = talloc_size(ctdb, data.dsize);
2046         CTDB_NO_MEMORY(ctdb, data.dptr);
2047
2048         t = (struct ctdb_control_set_tunable *)data.dptr;
2049         t->length = strlen(name)+1;
2050         memcpy(t->name, name, t->length);
2051         t->value = value;
2052
2053         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2054                            NULL, &res, &timeout, NULL);
2055         talloc_free(data.dptr);
2056         if (ret != 0 || res != 0) {
2057                 DEBUG(0,(__location__ " ctdb_control for set_tunable failed\n"));
2058                 return -1;
2059         }
2060
2061         return 0;
2062 }
2063
2064 /*
2065   list tunables
2066  */
2067 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2068                             struct timeval timeout, 
2069                             uint32_t destnode,
2070                             TALLOC_CTX *mem_ctx,
2071                             const char ***list, uint32_t *count)
2072 {
2073         TDB_DATA outdata;
2074         int32_t res;
2075         int ret;
2076         struct ctdb_control_list_tunable *t;
2077         char *p, *s, *ptr;
2078
2079         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2080                            mem_ctx, &outdata, &res, &timeout, NULL);
2081         if (ret != 0 || res != 0) {
2082                 DEBUG(0,(__location__ " ctdb_control for list_tunables failed\n"));
2083                 return -1;
2084         }
2085
2086         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2087         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2088             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2089                 DEBUG(0,("Invalid data in list_tunables reply\n"));
2090                 talloc_free(outdata.dptr);
2091                 return -1;              
2092         }
2093         
2094         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2095         CTDB_NO_MEMORY(ctdb, p);
2096
2097         talloc_free(outdata.dptr);
2098         
2099         (*list) = NULL;
2100         (*count) = 0;
2101
2102         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2103                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2104                 CTDB_NO_MEMORY(ctdb, *list);
2105                 (*list)[*count] = talloc_strdup(*list, s);
2106                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2107                 (*count)++;
2108         }
2109
2110         talloc_free(p);
2111
2112         return 0;
2113 }
2114
2115
2116 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2117                         struct timeval timeout, uint32_t destnode, 
2118                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2119 {
2120         int ret;
2121         TDB_DATA outdata;
2122         int32_t res;
2123
2124         ret = ctdb_control(ctdb, destnode, 0, 
2125                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2126                            mem_ctx, &outdata, &res, &timeout, NULL);
2127         if (ret != 0 || res != 0) {
2128                 DEBUG(0,(__location__ " ctdb_control for getpublicips failed\n"));
2129                 return -1;
2130         }
2131
2132         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2133         talloc_free(outdata.dptr);
2134                     
2135         return 0;
2136 }
2137
2138 /*
2139   set/clear the permanent disabled bit on a remote node
2140  */
2141 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2142                        uint32_t set, uint32_t clear)
2143 {
2144         int ret;
2145         TDB_DATA data;
2146         struct ctdb_node_modflags m;
2147         int32_t res;
2148
2149         m.set = set;
2150         m.clear = clear;
2151
2152         data.dsize = sizeof(m);
2153         data.dptr = (unsigned char *)&m;
2154
2155         ret = ctdb_control(ctdb, destnode, 0, 
2156                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2157                            NULL, NULL, &res, &timeout, NULL);
2158         if (ret != 0 || res != 0) {
2159                 DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
2160                 return -1;
2161         }
2162
2163         return 0;
2164 }
2165
2166
2167 /*
2168   get all tunables
2169  */
2170 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2171                                struct timeval timeout, 
2172                                uint32_t destnode,
2173                                struct ctdb_tunable *tunables)
2174 {
2175         TDB_DATA outdata;
2176         int ret;
2177         int32_t res;
2178
2179         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2180                            &outdata, &res, &timeout, NULL);
2181         if (ret != 0 || res != 0) {
2182                 DEBUG(0,(__location__ " ctdb_control for get all tunables failed\n"));
2183                 return -1;
2184         }
2185
2186         if (outdata.dsize != sizeof(*tunables)) {
2187                 DEBUG(0,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2188                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2189                 return -1;              
2190         }
2191
2192         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2193         talloc_free(outdata.dptr);
2194         return 0;
2195 }
2196
2197
2198 /*
2199   kill a tcp connection
2200  */
2201 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2202                       struct timeval timeout, 
2203                       uint32_t destnode,
2204                       struct ctdb_control_killtcp *killtcp)
2205 {
2206         TDB_DATA data;
2207         int32_t res;
2208         int ret;
2209
2210         data.dsize = sizeof(struct ctdb_control_killtcp);
2211         data.dptr  = (unsigned char *)killtcp;
2212
2213         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2214                            NULL, &res, &timeout, NULL);
2215         if (ret != 0 || res != 0) {
2216                 DEBUG(0,(__location__ " ctdb_control for killtcp failed\n"));
2217                 return -1;
2218         }
2219
2220         return 0;
2221 }
2222
2223 /*
2224   send a gratious arp
2225  */
2226 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2227                       struct timeval timeout, 
2228                       uint32_t destnode,
2229                       struct sockaddr_in *sin,
2230                       const char *ifname)
2231 {
2232         TDB_DATA data;
2233         int32_t res;
2234         int ret, len;
2235         struct ctdb_control_gratious_arp *gratious_arp;
2236         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2237
2238
2239         len = strlen(ifname)+1;
2240         gratious_arp = talloc_size(tmp_ctx, 
2241                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2242         CTDB_NO_MEMORY(ctdb, gratious_arp);
2243
2244         gratious_arp->sin = *sin;
2245         gratious_arp->len = len;
2246         memcpy(&gratious_arp->iface[0], ifname, len);
2247
2248
2249         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2250         data.dptr  = (unsigned char *)gratious_arp;
2251
2252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2253                            NULL, &res, &timeout, NULL);
2254         if (ret != 0 || res != 0) {
2255                 DEBUG(0,(__location__ " ctdb_control for gratious_arp failed\n"));
2256                 talloc_free(tmp_ctx);
2257                 return -1;
2258         }
2259
2260         talloc_free(tmp_ctx);
2261         return 0;
2262 }
2263
2264 /*
2265   get a list of all tcp tickles that a node knows about for a particular vnn
2266  */
2267 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2268                               struct timeval timeout, uint32_t destnode, 
2269                               TALLOC_CTX *mem_ctx, 
2270                               struct sockaddr_in *ip,
2271                               struct ctdb_control_tcp_tickle_list **list)
2272 {
2273         int ret;
2274         TDB_DATA data, outdata;
2275         int32_t status;
2276
2277         data.dptr = (uint8_t*)ip;
2278         data.dsize = sizeof(struct sockaddr_in);
2279
2280         ret = ctdb_control(ctdb, destnode, 0, 
2281                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2282                            mem_ctx, &outdata, &status, NULL, NULL);
2283         if (ret != 0) {
2284                 DEBUG(0,(__location__ " ctdb_control for get tcp tickles failed\n"));
2285                 return -1;
2286         }
2287
2288         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2289
2290         return status;
2291 }
2292
2293 /*
2294   register a server id
2295  */
2296 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2297                       struct timeval timeout, 
2298                       struct ctdb_server_id *id)
2299 {
2300         TDB_DATA data;
2301         int32_t res;
2302         int ret;
2303
2304         data.dsize = sizeof(struct ctdb_server_id);
2305         data.dptr  = (unsigned char *)id;
2306
2307         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2308                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2309                         0, data, NULL,
2310                         NULL, &res, &timeout, NULL);
2311         if (ret != 0 || res != 0) {
2312                 DEBUG(0,(__location__ " ctdb_control for register server id failed\n"));
2313                 return -1;
2314         }
2315
2316         return 0;
2317 }
2318
2319 /*
2320   unregister a server id
2321  */
2322 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2323                       struct timeval timeout, 
2324                       struct ctdb_server_id *id)
2325 {
2326         TDB_DATA data;
2327         int32_t res;
2328         int ret;
2329
2330         data.dsize = sizeof(struct ctdb_server_id);
2331         data.dptr  = (unsigned char *)id;
2332
2333         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2334                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2335                         0, data, NULL,
2336                         NULL, &res, &timeout, NULL);
2337         if (ret != 0 || res != 0) {
2338                 DEBUG(0,(__location__ " ctdb_control for unregister server id failed\n"));
2339                 return -1;
2340         }
2341
2342         return 0;
2343 }
2344
2345
2346 /*
2347   check if a server id exists
2348
2349   if a server id does exist, return *status == 1, otherwise *status == 0
2350  */
2351 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2352                       struct timeval timeout, 
2353                       uint32_t destnode,
2354                       struct ctdb_server_id *id,
2355                       uint32_t *status)
2356 {
2357         TDB_DATA data;
2358         int32_t res;
2359         int ret;
2360
2361         data.dsize = sizeof(struct ctdb_server_id);
2362         data.dptr  = (unsigned char *)id;
2363
2364         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2365                         0, data, NULL,
2366                         NULL, &res, &timeout, NULL);
2367         if (ret != 0) {
2368                 DEBUG(0,(__location__ " ctdb_control for check server id failed\n"));
2369                 return -1;
2370         }
2371
2372         if (res) {
2373                 *status = 1;
2374         } else {
2375                 *status = 0;
2376         }
2377
2378         return 0;
2379 }
2380
2381 /*
2382    get the list of server ids that are registered on a node
2383 */
2384 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2385                 TALLOC_CTX *mem_ctx,
2386                 struct timeval timeout, uint32_t destnode, 
2387                 struct ctdb_server_id_list **svid_list)
2388 {
2389         int ret;
2390         TDB_DATA outdata;
2391         int32_t res;
2392
2393         ret = ctdb_control(ctdb, destnode, 0, 
2394                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2395                            mem_ctx, &outdata, &res, &timeout, NULL);
2396         if (ret != 0 || res != 0) {
2397                 DEBUG(0,(__location__ " ctdb_control for get_server_id_list failed\n"));
2398                 return -1;
2399         }
2400
2401         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2402                     
2403         return 0;
2404 }
2405
2406 /*
2407   initialise the ctdb daemon for client applications
2408
2409   NOTE: In current code the daemon does not fork. This is for testing purposes only
2410   and to simplify the code.
2411 */
2412 struct ctdb_context *ctdb_init(struct event_context *ev)
2413 {
2414         struct ctdb_context *ctdb;
2415
2416         ctdb = talloc_zero(ev, struct ctdb_context);
2417         ctdb->ev  = ev;
2418         ctdb->idr = idr_init(ctdb);
2419         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2420
2421         ctdb_set_socketname(ctdb, CTDB_PATH);
2422
2423         return ctdb;
2424 }
2425
2426
2427 /*
2428   set some ctdb flags
2429 */
2430 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2431 {
2432         ctdb->flags |= flags;
2433 }
2434
2435 /*
2436   setup the local socket name
2437 */
2438 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2439 {
2440         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2441         return 0;
2442 }
2443
2444 /*
2445   return the pnn of this node
2446 */
2447 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2448 {
2449         return ctdb->pnn;
2450 }
2451
2452
2453 /*
2454   get the uptime of a remote node
2455  */
2456 struct ctdb_client_control_state *
2457 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2458 {
2459         return ctdb_control_send(ctdb, destnode, 0, 
2460                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2461                            mem_ctx, &timeout, NULL);
2462 }
2463
2464 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2465 {
2466         int ret;
2467         int32_t res;
2468         TDB_DATA outdata;
2469
2470         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2471         if (ret != 0 || res != 0) {
2472                 DEBUG(0,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2473                 return -1;
2474         }
2475
2476         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2477
2478         return 0;
2479 }
2480
2481 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2482 {
2483         struct ctdb_client_control_state *state;
2484
2485         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2486         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2487 }
2488
2489 /*
2490   send a control to execute the "recovered" event script on a node
2491  */
2492 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2493 {
2494         int ret;
2495         int32_t status;
2496
2497         ret = ctdb_control(ctdb, destnode, 0, 
2498                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2499                            NULL, NULL, &status, &timeout, NULL);
2500         if (ret != 0 || status != 0) {
2501                 DEBUG(0,(__location__ " ctdb_control for end_recovery failed\n"));
2502                 return -1;
2503         }
2504
2505         return 0;
2506 }
2507
2508 /* 
2509   callback for the async helpers used when sending the same control
2510   to multiple nodes in parallell.
2511 */
2512 static void async_callback(struct ctdb_client_control_state *state)
2513 {
2514         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2515         int ret;
2516         int32_t res;
2517
2518         /* one more node has responded with recmode data */
2519         data->count--;
2520
2521         /* if we failed to push the db, then return an error and let
2522            the main loop try again.
2523         */
2524         if (state->state != CTDB_CONTROL_DONE) {
2525                 if ( !data->dont_log_errors) {
2526                         DEBUG(0,("Async operation failed with state %d\n", state->state));
2527                 }
2528                 data->fail_count++;
2529                 return;
2530         }
2531         
2532         state->async.fn = NULL;
2533
2534         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
2535         if ((ret != 0) || (res != 0)) {
2536                 if ( !data->dont_log_errors) {
2537                         DEBUG(0,("Async operation failed with ret=%d res=%d\n", ret, (int)res));
2538                 }
2539                 data->fail_count++;
2540         }
2541 }
2542
2543
2544 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2545 {
2546         /* set up the callback functions */
2547         state->async.fn = async_callback;
2548         state->async.private_data = data;
2549         
2550         /* one more control to wait for to complete */
2551         data->count++;
2552 }
2553
2554
2555 /* wait for up to the maximum number of seconds allowed
2556    or until all nodes we expect a response from has replied
2557 */
2558 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2559 {
2560         while (data->count > 0) {
2561                 event_loop_once(ctdb->ev);
2562         }
2563         if (data->fail_count != 0) {
2564                 if (!data->dont_log_errors) {
2565                         DEBUG(0,("Async wait failed - fail_count=%u\n", 
2566                                  data->fail_count));
2567                 }
2568                 return -1;
2569         }
2570         return 0;
2571 }
2572
2573
2574 /* 
2575    perform a simple control on the listed nodes
2576    The control cannot return data
2577  */
2578 int ctdb_client_async_control(struct ctdb_context *ctdb,
2579                                 enum ctdb_controls opcode,
2580                                 uint32_t *nodes,
2581                                 struct timeval timeout,
2582                                 bool dont_log_errors,
2583                                 TDB_DATA data)
2584 {
2585         struct client_async_data *async_data;
2586         struct ctdb_client_control_state *state;
2587         int j, num_nodes;
2588         
2589         async_data = talloc_zero(ctdb, struct client_async_data);
2590         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2591         async_data->dont_log_errors = dont_log_errors;
2592
2593         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2594
2595         /* loop over all nodes and send an async control to each of them */
2596         for (j=0; j<num_nodes; j++) {
2597                 uint32_t pnn = nodes[j];
2598
2599                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2600                                           0, data, async_data, &timeout, NULL);
2601                 if (state == NULL) {
2602                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2603                         talloc_free(async_data);
2604                         return -1;
2605                 }
2606                 
2607                 ctdb_client_async_add(async_data, state);
2608         }
2609
2610         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2611                 talloc_free(async_data);
2612                 return -1;
2613         }
2614
2615         talloc_free(async_data);
2616         return 0;
2617 }
2618
2619 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2620                                 struct ctdb_vnn_map *vnn_map,
2621                                 TALLOC_CTX *mem_ctx,
2622                                 bool include_self)
2623 {
2624         int i, j, num_nodes;
2625         uint32_t *nodes;
2626
2627         for (i=num_nodes=0;i<vnn_map->size;i++) {
2628                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2629                         continue;
2630                 }
2631                 num_nodes++;
2632         } 
2633
2634         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2635         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2636
2637         for (i=j=0;i<vnn_map->size;i++) {
2638                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2639                         continue;
2640                 }
2641                 nodes[j++] = vnn_map->map[i];
2642         } 
2643
2644         return nodes;
2645 }
2646
2647 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2648                                 struct ctdb_node_map *node_map,
2649                                 TALLOC_CTX *mem_ctx,
2650                                 bool include_self)
2651 {
2652         int i, j, num_nodes;
2653         uint32_t *nodes;
2654
2655         for (i=num_nodes=0;i<node_map->num;i++) {
2656                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2657                         continue;
2658                 }
2659                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2660                         continue;
2661                 }
2662                 num_nodes++;
2663         } 
2664
2665         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2666         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2667
2668         for (i=j=0;i<node_map->num;i++) {
2669                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2670                         continue;
2671                 }
2672                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2673                         continue;
2674                 }
2675                 nodes[j++] = node_map->nodes[i].pnn;
2676         } 
2677
2678         return nodes;
2679 }