6e1a209a7c27a50195896bddff132b751fb671ea
[metze/ctdb/wip.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         memset(hdr, 0, slength);
55         hdr->length       = length;
56         hdr->operation    = operation;
57         hdr->ctdb_magic   = CTDB_MAGIC;
58         hdr->ctdb_version = CTDB_VERSION;
59         hdr->srcnode      = ctdb->pnn;
60         if (ctdb->vnn_map) {
61                 hdr->generation = ctdb->vnn_map->generation;
62         }
63
64         return hdr;
65 }
66
67 /*
68   local version of ctdb_call
69 */
70 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
71                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
72                     TDB_DATA *data, uint32_t caller)
73 {
74         struct ctdb_call_info *c;
75         struct ctdb_registered_call *fn;
76         struct ctdb_context *ctdb = ctdb_db->ctdb;
77         
78         c = talloc(ctdb, struct ctdb_call_info);
79         CTDB_NO_MEMORY(ctdb, c);
80
81         c->key = call->key;
82         c->call_data = &call->call_data;
83         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
84         c->record_data.dsize = data->dsize;
85         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
86         c->new_data = NULL;
87         c->reply_data = NULL;
88         c->status = 0;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         if (header->laccessor != caller) {
106                 header->lacount = 0;
107         }
108         header->laccessor = caller;
109         header->lacount++;
110
111         /* we need to force the record to be written out if this was a remote access,
112            so that the lacount is updated */
113         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
114                 c->new_data = &c->record_data;
115         }
116
117         if (c->new_data) {
118                 /* XXX check that we always have the lock here? */
119                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
120                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
121                         talloc_free(c);
122                         return -1;
123                 }
124         }
125
126         if (c->reply_data) {
127                 call->reply_data = *c->reply_data;
128
129                 talloc_steal(call, call->reply_data.dptr);
130                 talloc_set_name_const(call->reply_data.dptr, __location__);
131         } else {
132                 call->reply_data.dptr = NULL;
133                 call->reply_data.dsize = 0;
134         }
135         call->status = c->status;
136
137         talloc_free(c);
138
139         return 0;
140 }
141
142
143 /*
144   queue a packet for sending from client to daemon
145 */
146 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
147 {
148         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
149 }
150
151
152 /*
153   called when a CTDB_REPLY_CALL packet comes in in the client
154
155   This packet comes in response to a CTDB_REQ_CALL request packet. It
156   contains any reply data from the call
157 */
158 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
159 {
160         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
161         struct ctdb_client_call_state *state;
162
163         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
164         if (state == NULL) {
165                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
166                 return;
167         }
168
169         if (hdr->reqid != state->reqid) {
170                 /* we found a record  but it was the wrong one */
171                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
172                 return;
173         }
174
175         state->call->reply_data.dptr = c->data;
176         state->call->reply_data.dsize = c->datalen;
177         state->call->status = c->status;
178
179         talloc_steal(state, c);
180
181         state->state = CTDB_CALL_DONE;
182
183         if (state->async.fn) {
184                 state->async.fn(state);
185         }
186 }
187
188 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
189
190 /*
191   this is called in the client, when data comes in from the daemon
192  */
193 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
194 {
195         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
196         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
197         TALLOC_CTX *tmp_ctx;
198
199         /* place the packet as a child of a tmp_ctx. We then use
200            talloc_free() below to free it. If any of the calls want
201            to keep it, then they will steal it somewhere else, and the
202            talloc_free() will be a no-op */
203         tmp_ctx = talloc_new(ctdb);
204         talloc_steal(tmp_ctx, hdr);
205
206         if (cnt == 0) {
207                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
208                 exit(0);
209         }
210
211         if (cnt < sizeof(*hdr)) {
212                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
213                 goto done;
214         }
215         if (cnt != hdr->length) {
216                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
217                                (unsigned)hdr->length, (unsigned)cnt);
218                 goto done;
219         }
220
221         if (hdr->ctdb_magic != CTDB_MAGIC) {
222                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
223                 goto done;
224         }
225
226         if (hdr->ctdb_version != CTDB_VERSION) {
227                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
228                 goto done;
229         }
230
231         switch (hdr->operation) {
232         case CTDB_REPLY_CALL:
233                 ctdb_client_reply_call(ctdb, hdr);
234                 break;
235
236         case CTDB_REQ_MESSAGE:
237                 ctdb_request_message(ctdb, hdr);
238                 break;
239
240         case CTDB_REPLY_CONTROL:
241                 ctdb_client_reply_control(ctdb, hdr);
242                 break;
243
244         default:
245                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
246         }
247
248 done:
249         talloc_free(tmp_ctx);
250 }
251
252 /*
253   connect to a unix domain socket
254 */
255 int ctdb_socket_connect(struct ctdb_context *ctdb)
256 {
257         struct sockaddr_un addr;
258
259         memset(&addr, 0, sizeof(addr));
260         addr.sun_family = AF_UNIX;
261         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
262
263         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
264         if (ctdb->daemon.sd == -1) {
265                 return -1;
266         }
267
268         set_nonblocking(ctdb->daemon.sd);
269         set_close_on_exec(ctdb->daemon.sd);
270         
271         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
272                 close(ctdb->daemon.sd);
273                 ctdb->daemon.sd = -1;
274                 return -1;
275         }
276
277         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
278                                               CTDB_DS_ALIGNMENT, 
279                                               ctdb_client_read_cb, ctdb);
280         return 0;
281 }
282
283
284 struct ctdb_record_handle {
285         struct ctdb_db_context *ctdb_db;
286         TDB_DATA key;
287         TDB_DATA *data;
288         struct ctdb_ltdb_header header;
289 };
290
291
292 /*
293   make a recv call to the local ctdb daemon - called from client context
294
295   This is called when the program wants to wait for a ctdb_call to complete and get the 
296   results. This call will block unless the call has already completed.
297 */
298 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
299 {
300         if (state == NULL) {
301                 return -1;
302         }
303
304         while (state->state < CTDB_CALL_DONE) {
305                 event_loop_once(state->ctdb_db->ctdb->ev);
306         }
307         if (state->state != CTDB_CALL_DONE) {
308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
309                 talloc_free(state);
310                 return -1;
311         }
312
313         if (state->call->reply_data.dsize) {
314                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
315                                                       state->call->reply_data.dptr,
316                                                       state->call->reply_data.dsize);
317                 call->reply_data.dsize = state->call->reply_data.dsize;
318         } else {
319                 call->reply_data.dptr = NULL;
320                 call->reply_data.dsize = 0;
321         }
322         call->status = state->call->status;
323         talloc_free(state);
324
325         return 0;
326 }
327
328
329
330
331 /*
332   destroy a ctdb_call in client
333 */
334 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
335 {
336         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
337         return 0;
338 }
339
340 /*
341   construct an event driven local ctdb_call
342
343   this is used so that locally processed ctdb_call requests are processed
344   in an event driven manner
345 */
346 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
347                                                                   struct ctdb_call *call,
348                                                                   struct ctdb_ltdb_header *header,
349                                                                   TDB_DATA *data)
350 {
351         struct ctdb_client_call_state *state;
352         struct ctdb_context *ctdb = ctdb_db->ctdb;
353         int ret;
354
355         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
356         CTDB_NO_MEMORY_NULL(ctdb, state);
357         state->call = talloc_zero(state, struct ctdb_call);
358         CTDB_NO_MEMORY_NULL(ctdb, state->call);
359
360         talloc_steal(state, data->dptr);
361
362         state->state   = CTDB_CALL_DONE;
363         *(state->call) = *call;
364         state->ctdb_db = ctdb_db;
365
366         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if (ret == 0 && header.dmaster == ctdb->pnn) {
402                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
403                 talloc_free(data.dptr);
404                 ctdb_ltdb_unlock(ctdb_db, call->key);
405                 return state;
406         }
407
408         ctdb_ltdb_unlock(ctdb_db, call->key);
409         talloc_free(data.dptr);
410
411         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
412         if (state == NULL) {
413                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
414                 return NULL;
415         }
416         state->call = talloc_zero(state, struct ctdb_call);
417         if (state->call == NULL) {
418                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
419                 return NULL;
420         }
421
422         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
423         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
424         if (c == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
426                 return NULL;
427         }
428
429         state->reqid     = ctdb_reqid_new(ctdb, state);
430         state->ctdb_db = ctdb_db;
431         talloc_set_destructor(state, ctdb_client_call_destructor);
432
433         c->hdr.reqid     = state->reqid;
434         c->flags         = call->flags;
435         c->db_id         = ctdb_db->db_id;
436         c->callid        = call->call_id;
437         c->hopcount      = 0;
438         c->keylen        = call->key.dsize;
439         c->calldatalen   = call->call_data.dsize;
440         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
441         memcpy(&c->data[call->key.dsize], 
442                call->call_data.dptr, call->call_data.dsize);
443         *(state->call)              = *call;
444         state->call->call_data.dptr = &c->data[call->key.dsize];
445         state->call->key.dptr       = &c->data[0];
446
447         state->state  = CTDB_CALL_WAIT;
448
449
450         ctdb_client_queue_pkt(ctdb, &c->hdr);
451
452         return state;
453 }
454
455
456 /*
457   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
458 */
459 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
460 {
461         struct ctdb_client_call_state *state;
462
463         state = ctdb_call_send(ctdb_db, call);
464         return ctdb_call_recv(state, call);
465 }
466
467
468 /*
469   tell the daemon what messaging srvid we will use, and register the message
470   handler function in the client
471 */
472 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
473                              ctdb_message_fn_t handler,
474                              void *private_data)
475                                     
476 {
477         int res;
478         int32_t status;
479         
480         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
481                            tdb_null, NULL, NULL, &status, NULL, NULL);
482         if (res != 0 || status != 0) {
483                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
484                 return -1;
485         }
486
487         /* also need to register the handler with our own ctdb structure */
488         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
489 }
490
491 /*
492   tell the daemon we no longer want a srvid
493 */
494 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
495 {
496         int res;
497         int32_t status;
498         
499         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
500                            tdb_null, NULL, NULL, &status, NULL, NULL);
501         if (res != 0 || status != 0) {
502                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
503                 return -1;
504         }
505
506         /* also need to register the handler with our own ctdb structure */
507         ctdb_deregister_message_handler(ctdb, srvid, private_data);
508         return 0;
509 }
510
511
512 /*
513   send a message - from client context
514  */
515 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
516                       uint64_t srvid, TDB_DATA data)
517 {
518         struct ctdb_req_message *r;
519         int len, res;
520
521         len = offsetof(struct ctdb_req_message, data) + data.dsize;
522         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
523                                len, struct ctdb_req_message);
524         CTDB_NO_MEMORY(ctdb, r);
525
526         r->hdr.destnode  = pnn;
527         r->srvid         = srvid;
528         r->datalen       = data.dsize;
529         memcpy(&r->data[0], data.dptr, data.dsize);
530         
531         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
532         if (res != 0) {
533                 return res;
534         }
535
536         talloc_free(r);
537         return 0;
538 }
539
540
541 /*
542   cancel a ctdb_fetch_lock operation, releasing the lock
543  */
544 static int fetch_lock_destructor(struct ctdb_record_handle *h)
545 {
546         ctdb_ltdb_unlock(h->ctdb_db, h->key);
547         return 0;
548 }
549
550 /*
551   force the migration of a record to this node
552  */
553 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
554 {
555         struct ctdb_call call;
556         ZERO_STRUCT(call);
557         call.call_id = CTDB_NULL_FUNC;
558         call.key = key;
559         call.flags = CTDB_IMMEDIATE_MIGRATION;
560         return ctdb_call(ctdb_db, &call);
561 }
562
563 /*
564   get a lock on a record, and return the records data. Blocks until it gets the lock
565  */
566 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
567                                            TDB_DATA key, TDB_DATA *data)
568 {
569         int ret;
570         struct ctdb_record_handle *h;
571
572         /*
573           procedure is as follows:
574
575           1) get the chain lock. 
576           2) check if we are dmaster
577           3) if we are the dmaster then return handle 
578           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
579              reply from ctdbd
580           5) when we get the reply, goto (1)
581          */
582
583         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
584         if (h == NULL) {
585                 return NULL;
586         }
587
588         h->ctdb_db = ctdb_db;
589         h->key     = key;
590         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
591         if (h->key.dptr == NULL) {
592                 talloc_free(h);
593                 return NULL;
594         }
595         h->data    = data;
596
597         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
598                  (const char *)key.dptr));
599
600 again:
601         /* step 1 - get the chain lock */
602         ret = ctdb_ltdb_lock(ctdb_db, key);
603         if (ret != 0) {
604                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
605                 talloc_free(h);
606                 return NULL;
607         }
608
609         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
610
611         talloc_set_destructor(h, fetch_lock_destructor);
612
613         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
614
615         /* when torturing, ensure we test the remote path */
616         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
617             random() % 5 == 0) {
618                 h->header.dmaster = (uint32_t)-1;
619         }
620
621
622         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
623
624         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
625                 ctdb_ltdb_unlock(ctdb_db, key);
626                 ret = ctdb_client_force_migration(ctdb_db, key);
627                 if (ret != 0) {
628                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
629                         talloc_free(h);
630                         return NULL;
631                 }
632                 goto again;
633         }
634
635         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
636         return h;
637 }
638
639 /*
640   store some data to the record that was locked with ctdb_fetch_lock()
641 */
642 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
643 {
644         int ret;
645         int32_t status;
646         struct ctdb_rec_data *rec;
647         TDB_DATA recdata;
648
649         if (h->ctdb_db->persistent) {
650                 h->header.rsn++;
651         }
652
653         ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
654         if (ret != 0) {
655                 return ret;
656         }
657
658         /* don't need the persistent_store control for non-persistent databases */
659         if (!h->ctdb_db->persistent) {
660                 return 0;
661         }
662
663         rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
664         if (rec == NULL) {
665                 DEBUG(DEBUG_ERR,("Unable to marshall record in ctdb_record_store\n"));
666                 return -1;
667         }
668
669         recdata.dptr = (uint8_t *)rec;
670         recdata.dsize = rec->length;
671
672         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
673                            CTDB_CONTROL_PERSISTENT_STORE, 0,
674                            recdata, NULL, NULL, &status, NULL, NULL);
675
676         talloc_free(rec);
677
678         if (ret != 0 || status != 0) {
679                 DEBUG(DEBUG_ERR,("Failed persistent store in ctdb_record_store\n"));
680                 return -1;
681         }
682
683         return 0;
684 }
685
686 /*
687   non-locking fetch of a record
688  */
689 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
690                TDB_DATA key, TDB_DATA *data)
691 {
692         struct ctdb_call call;
693         int ret;
694
695         call.call_id = CTDB_FETCH_FUNC;
696         call.call_data.dptr = NULL;
697         call.call_data.dsize = 0;
698
699         ret = ctdb_call(ctdb_db, &call);
700
701         if (ret == 0) {
702                 *data = call.reply_data;
703                 talloc_steal(mem_ctx, data->dptr);
704         }
705
706         return ret;
707 }
708
709
710
711 /*
712    called when a control completes or timesout to invoke the callback
713    function the user provided
714 */
715 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
716         struct timeval t, void *private_data)
717 {
718         struct ctdb_client_control_state *state;
719         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
720         int ret;
721
722         state = talloc_get_type(private_data, struct ctdb_client_control_state);
723         talloc_steal(tmp_ctx, state);
724
725         ret = ctdb_control_recv(state->ctdb, state, state,
726                         NULL, 
727                         NULL, 
728                         NULL);
729
730         talloc_free(tmp_ctx);
731 }
732
733 /*
734   called when a CTDB_REPLY_CONTROL packet comes in in the client
735
736   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
737   contains any reply data from the control
738 */
739 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
740                                       struct ctdb_req_header *hdr)
741 {
742         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
743         struct ctdb_client_control_state *state;
744
745         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
746         if (state == NULL) {
747                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
748                 return;
749         }
750
751         if (hdr->reqid != state->reqid) {
752                 /* we found a record  but it was the wrong one */
753                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
754                 return;
755         }
756
757         state->outdata.dptr = c->data;
758         state->outdata.dsize = c->datalen;
759         state->status = c->status;
760         if (c->errorlen) {
761                 state->errormsg = talloc_strndup(state, 
762                                                  (char *)&c->data[c->datalen], 
763                                                  c->errorlen);
764         }
765
766         /* state->outdata now uses resources from c so we dont want c
767            to just dissappear from under us while state is still alive
768         */
769         talloc_steal(state, c);
770
771         state->state = CTDB_CONTROL_DONE;
772
773         /* if we had a callback registered for this control, pull the response
774            and call the callback.
775         */
776         if (state->async.fn) {
777                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
778         }
779 }
780
781
782 /*
783   destroy a ctdb_control in client
784 */
785 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
786 {
787         ctdb_reqid_remove(state->ctdb, state->reqid);
788         return 0;
789 }
790
791
792 /* time out handler for ctdb_control */
793 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
794         struct timeval t, void *private_data)
795 {
796         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
797
798         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
799
800         state->state = CTDB_CONTROL_TIMEOUT;
801
802         /* if we had a callback registered for this control, pull the response
803            and call the callback.
804         */
805         if (state->async.fn) {
806                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
807         }
808 }
809
810 /* async version of send control request */
811 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
812                 uint32_t destnode, uint64_t srvid, 
813                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
814                 TALLOC_CTX *mem_ctx,
815                 struct timeval *timeout,
816                 char **errormsg)
817 {
818         struct ctdb_client_control_state *state;
819         size_t len;
820         struct ctdb_req_control *c;
821         int ret;
822
823         if (errormsg) {
824                 *errormsg = NULL;
825         }
826
827         /* if the domain socket is not yet open, open it */
828         if (ctdb->daemon.sd==-1) {
829                 ctdb_socket_connect(ctdb);
830         }
831
832         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
833         CTDB_NO_MEMORY_NULL(ctdb, state);
834
835         state->ctdb       = ctdb;
836         state->reqid      = ctdb_reqid_new(ctdb, state);
837         state->state      = CTDB_CONTROL_WAIT;
838         state->errormsg   = NULL;
839
840         talloc_set_destructor(state, ctdb_control_destructor);
841
842         len = offsetof(struct ctdb_req_control, data) + data.dsize;
843         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
844                                len, struct ctdb_req_control);
845         state->c            = c;        
846         CTDB_NO_MEMORY_NULL(ctdb, c);
847         c->hdr.reqid        = state->reqid;
848         c->hdr.destnode     = destnode;
849         c->hdr.reqid        = state->reqid;
850         c->opcode           = opcode;
851         c->client_id        = 0;
852         c->flags            = flags;
853         c->srvid            = srvid;
854         c->datalen          = data.dsize;
855         if (data.dsize) {
856                 memcpy(&c->data[0], data.dptr, data.dsize);
857         }
858
859         /* timeout */
860         if (timeout && !timeval_is_zero(timeout)) {
861                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
862         }
863
864         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
865         if (ret != 0) {
866                 talloc_free(state);
867                 return NULL;
868         }
869
870         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
871                 talloc_free(state);
872                 return NULL;
873         }
874
875         return state;
876 }
877
878
879 /* async version of receive control reply */
880 int ctdb_control_recv(struct ctdb_context *ctdb, 
881                 struct ctdb_client_control_state *state, 
882                 TALLOC_CTX *mem_ctx,
883                 TDB_DATA *outdata, int32_t *status, char **errormsg)
884 {
885         TALLOC_CTX *tmp_ctx;
886
887         if (status != NULL) {
888                 *status = -1;
889         }
890         if (errormsg != NULL) {
891                 *errormsg = NULL;
892         }
893
894         if (state == NULL) {
895                 return -1;
896         }
897
898         /* prevent double free of state */
899         tmp_ctx = talloc_new(ctdb);
900         talloc_steal(tmp_ctx, state);
901
902         /* loop one event at a time until we either timeout or the control
903            completes.
904         */
905         while (state->state == CTDB_CONTROL_WAIT) {
906                 event_loop_once(ctdb->ev);
907         }
908
909         if (state->state != CTDB_CONTROL_DONE) {
910                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
911                 if (state->async.fn) {
912                         state->async.fn(state);
913                 }
914                 talloc_free(tmp_ctx);
915                 return -1;
916         }
917
918         if (state->errormsg) {
919                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
920                 if (errormsg) {
921                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
922                 }
923                 if (state->async.fn) {
924                         state->async.fn(state);
925                 }
926                 talloc_free(tmp_ctx);
927                 return -1;
928         }
929
930         if (outdata) {
931                 *outdata = state->outdata;
932                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
933         }
934
935         if (status) {
936                 *status = state->status;
937         }
938
939         if (state->async.fn) {
940                 state->async.fn(state);
941         }
942
943         talloc_free(tmp_ctx);
944         return 0;
945 }
946
947
948
949 /*
950   send a ctdb control message
951   timeout specifies how long we should wait for a reply.
952   if timeout is NULL we wait indefinitely
953  */
954 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
955                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
956                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
957                  struct timeval *timeout,
958                  char **errormsg)
959 {
960         struct ctdb_client_control_state *state;
961
962         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
963                         flags, data, mem_ctx,
964                         timeout, errormsg);
965         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
966                         errormsg);
967 }
968
969
970
971
972 /*
973   a process exists call. Returns 0 if process exists, -1 otherwise
974  */
975 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
976 {
977         int ret;
978         TDB_DATA data;
979         int32_t status;
980
981         data.dptr = (uint8_t*)&pid;
982         data.dsize = sizeof(pid);
983
984         ret = ctdb_control(ctdb, destnode, 0, 
985                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
986                            NULL, NULL, &status, NULL, NULL);
987         if (ret != 0) {
988                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
989                 return -1;
990         }
991
992         return status;
993 }
994
995 /*
996   get remote statistics
997  */
998 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
999 {
1000         int ret;
1001         TDB_DATA data;
1002         int32_t res;
1003
1004         ret = ctdb_control(ctdb, destnode, 0, 
1005                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1006                            ctdb, &data, &res, NULL, NULL);
1007         if (ret != 0 || res != 0) {
1008                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1009                 return -1;
1010         }
1011
1012         if (data.dsize != sizeof(struct ctdb_statistics)) {
1013                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1014                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1015                       return -1;
1016         }
1017
1018         *status = *(struct ctdb_statistics *)data.dptr;
1019         talloc_free(data.dptr);
1020                         
1021         return 0;
1022 }
1023
1024 /*
1025   shutdown a remote ctdb node
1026  */
1027 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1028 {
1029         struct ctdb_client_control_state *state;
1030
1031         state = ctdb_control_send(ctdb, destnode, 0, 
1032                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1033                            NULL, &timeout, NULL);
1034         if (state == NULL) {
1035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1036                 return -1;
1037         }
1038
1039         return 0;
1040 }
1041
1042 /*
1043   get vnn map from a remote node
1044  */
1045 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1046 {
1047         int ret;
1048         TDB_DATA outdata;
1049         int32_t res;
1050         struct ctdb_vnn_map_wire *map;
1051
1052         ret = ctdb_control(ctdb, destnode, 0, 
1053                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1054                            mem_ctx, &outdata, &res, &timeout, NULL);
1055         if (ret != 0 || res != 0) {
1056                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1057                 return -1;
1058         }
1059         
1060         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1061         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1062             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1063                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1064                 return -1;
1065         }
1066
1067         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1068         CTDB_NO_MEMORY(ctdb, *vnnmap);
1069         (*vnnmap)->generation = map->generation;
1070         (*vnnmap)->size       = map->size;
1071         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1072
1073         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1074         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1075         talloc_free(outdata.dptr);
1076                     
1077         return 0;
1078 }
1079
1080
1081 /*
1082   get the recovery mode of a remote node
1083  */
1084 struct ctdb_client_control_state *
1085 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1086 {
1087         return ctdb_control_send(ctdb, destnode, 0, 
1088                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1089                            mem_ctx, &timeout, NULL);
1090 }
1091
1092 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1093 {
1094         int ret;
1095         int32_t res;
1096
1097         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1098         if (ret != 0) {
1099                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1100                 return -1;
1101         }
1102
1103         if (recmode) {
1104                 *recmode = (uint32_t)res;
1105         }
1106
1107         return 0;
1108 }
1109
1110 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1111 {
1112         struct ctdb_client_control_state *state;
1113
1114         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1115         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1116 }
1117
1118
1119
1120
1121 /*
1122   set the recovery mode of a remote node
1123  */
1124 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1125 {
1126         int ret;
1127         TDB_DATA data;
1128         int32_t res;
1129
1130         data.dsize = sizeof(uint32_t);
1131         data.dptr = (unsigned char *)&recmode;
1132
1133         ret = ctdb_control(ctdb, destnode, 0, 
1134                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1135                            NULL, NULL, &res, &timeout, NULL);
1136         if (ret != 0 || res != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1138                 return -1;
1139         }
1140
1141         return 0;
1142 }
1143
1144
1145
1146 /*
1147   get the recovery master of a remote node
1148  */
1149 struct ctdb_client_control_state *
1150 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1151                         struct timeval timeout, uint32_t destnode)
1152 {
1153         return ctdb_control_send(ctdb, destnode, 0, 
1154                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1155                            mem_ctx, &timeout, NULL);
1156 }
1157
1158 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1159 {
1160         int ret;
1161         int32_t res;
1162
1163         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1164         if (ret != 0) {
1165                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1166                 return -1;
1167         }
1168
1169         if (recmaster) {
1170                 *recmaster = (uint32_t)res;
1171         }
1172
1173         return 0;
1174 }
1175
1176 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1177 {
1178         struct ctdb_client_control_state *state;
1179
1180         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1181         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1182 }
1183
1184
1185 /*
1186   set the recovery master of a remote node
1187  */
1188 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1189 {
1190         int ret;
1191         TDB_DATA data;
1192         int32_t res;
1193
1194         ZERO_STRUCT(data);
1195         data.dsize = sizeof(uint32_t);
1196         data.dptr = (unsigned char *)&recmaster;
1197
1198         ret = ctdb_control(ctdb, destnode, 0, 
1199                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1200                            NULL, NULL, &res, &timeout, NULL);
1201         if (ret != 0 || res != 0) {
1202                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1203                 return -1;
1204         }
1205
1206         return 0;
1207 }
1208
1209
1210 /*
1211   get a list of databases off a remote node
1212  */
1213 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1214                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1215 {
1216         int ret;
1217         TDB_DATA outdata;
1218         int32_t res;
1219
1220         ret = ctdb_control(ctdb, destnode, 0, 
1221                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1222                            mem_ctx, &outdata, &res, &timeout, NULL);
1223         if (ret != 0 || res != 0) {
1224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed\n"));
1225                 return -1;
1226         }
1227
1228         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1229         talloc_free(outdata.dptr);
1230                     
1231         return 0;
1232 }
1233
1234 /*
1235   get a list of nodes (vnn and flags ) from a remote node
1236  */
1237 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1238                 struct timeval timeout, uint32_t destnode, 
1239                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1240 {
1241         int ret;
1242         TDB_DATA outdata;
1243         int32_t res;
1244
1245         ret = ctdb_control(ctdb, destnode, 0, 
1246                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1247                            mem_ctx, &outdata, &res, &timeout, NULL);
1248         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed\n"));
1250                 return -1;
1251         }
1252
1253         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1254         talloc_free(outdata.dptr);
1255                     
1256         return 0;
1257 }
1258
1259 /*
1260   drop the transport, reload the nodes file and restart the transport
1261  */
1262 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1263                     struct timeval timeout, uint32_t destnode)
1264 {
1265         int ret;
1266         int32_t res;
1267
1268         ret = ctdb_control(ctdb, destnode, 0, 
1269                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1270                            NULL, NULL, &res, &timeout, NULL);
1271         if (ret != 0 || res != 0) {
1272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1273                 return -1;
1274         }
1275
1276         return 0;
1277 }
1278
1279
1280 /*
1281   set vnn map on a node
1282  */
1283 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1284                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1285 {
1286         int ret;
1287         TDB_DATA data;
1288         int32_t res;
1289         struct ctdb_vnn_map_wire *map;
1290         size_t len;
1291
1292         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1293         map = talloc_size(mem_ctx, len);
1294         CTDB_NO_MEMORY(ctdb, map);
1295
1296         map->generation = vnnmap->generation;
1297         map->size = vnnmap->size;
1298         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1299         
1300         data.dsize = len;
1301         data.dptr  = (uint8_t *)map;
1302
1303         ret = ctdb_control(ctdb, destnode, 0, 
1304                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1305                            NULL, NULL, &res, &timeout, NULL);
1306         if (ret != 0 || res != 0) {
1307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1308                 return -1;
1309         }
1310
1311         talloc_free(map);
1312
1313         return 0;
1314 }
1315
1316
1317 /*
1318   async send for pull database
1319  */
1320 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1321         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1322         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1323 {
1324         TDB_DATA indata;
1325         struct ctdb_control_pulldb *pull;
1326         struct ctdb_client_control_state *state;
1327
1328         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1329         CTDB_NO_MEMORY_NULL(ctdb, pull);
1330
1331         pull->db_id   = dbid;
1332         pull->lmaster = lmaster;
1333
1334         indata.dsize = sizeof(struct ctdb_control_pulldb);
1335         indata.dptr  = (unsigned char *)pull;
1336
1337         state = ctdb_control_send(ctdb, destnode, 0, 
1338                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1339                                   mem_ctx, &timeout, NULL);
1340         talloc_free(pull);
1341
1342         return state;
1343 }
1344
1345 /*
1346   async recv for pull database
1347  */
1348 int ctdb_ctrl_pulldb_recv(
1349         struct ctdb_context *ctdb, 
1350         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1351         TDB_DATA *outdata)
1352 {
1353         int ret;
1354         int32_t res;
1355
1356         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1357         if ( (ret != 0) || (res != 0) ){
1358                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1359                 return -1;
1360         }
1361
1362         return 0;
1363 }
1364
1365 /*
1366   pull all keys and records for a specific database on a node
1367  */
1368 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1369                 uint32_t dbid, uint32_t lmaster, 
1370                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1371                 TDB_DATA *outdata)
1372 {
1373         struct ctdb_client_control_state *state;
1374
1375         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1376                                       timeout);
1377         
1378         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1379 }
1380
1381
1382 /*
1383   change dmaster for all keys in the database to the new value
1384  */
1385 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1386                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1387 {
1388         int ret;
1389         TDB_DATA indata;
1390         int32_t res;
1391
1392         indata.dsize = 2*sizeof(uint32_t);
1393         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1394
1395         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1396         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1397
1398         ret = ctdb_control(ctdb, destnode, 0, 
1399                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1400                            NULL, NULL, &res, &timeout, NULL);
1401         if (ret != 0 || res != 0) {
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1403                 return -1;
1404         }
1405
1406         return 0;
1407 }
1408
1409 /*
1410   ping a node, return number of clients connected
1411  */
1412 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1413 {
1414         int ret;
1415         int32_t res;
1416
1417         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1418                            tdb_null, NULL, NULL, &res, NULL, NULL);
1419         if (ret != 0) {
1420                 return -1;
1421         }
1422         return res;
1423 }
1424
1425 /*
1426   find the real path to a ltdb 
1427  */
1428 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1429                    const char **path)
1430 {
1431         int ret;
1432         int32_t res;
1433         TDB_DATA data;
1434
1435         data.dptr = (uint8_t *)&dbid;
1436         data.dsize = sizeof(dbid);
1437
1438         ret = ctdb_control(ctdb, destnode, 0, 
1439                            CTDB_CONTROL_GETDBPATH, 0, data, 
1440                            mem_ctx, &data, &res, &timeout, NULL);
1441         if (ret != 0 || res != 0) {
1442                 return -1;
1443         }
1444
1445         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1446         if ((*path) == NULL) {
1447                 return -1;
1448         }
1449
1450         talloc_free(data.dptr);
1451
1452         return 0;
1453 }
1454
1455 /*
1456   find the name of a db 
1457  */
1458 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1459                    const char **name)
1460 {
1461         int ret;
1462         int32_t res;
1463         TDB_DATA data;
1464
1465         data.dptr = (uint8_t *)&dbid;
1466         data.dsize = sizeof(dbid);
1467
1468         ret = ctdb_control(ctdb, destnode, 0, 
1469                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1470                            mem_ctx, &data, &res, &timeout, NULL);
1471         if (ret != 0 || res != 0) {
1472                 return -1;
1473         }
1474
1475         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1476         if ((*name) == NULL) {
1477                 return -1;
1478         }
1479
1480         talloc_free(data.dptr);
1481
1482         return 0;
1483 }
1484
1485 /*
1486   create a database
1487  */
1488 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1489                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1490 {
1491         int ret;
1492         int32_t res;
1493         TDB_DATA data;
1494
1495         data.dptr = discard_const(name);
1496         data.dsize = strlen(name)+1;
1497
1498         ret = ctdb_control(ctdb, destnode, 0, 
1499                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1500                            0, data, 
1501                            mem_ctx, &data, &res, &timeout, NULL);
1502
1503         if (ret != 0 || res != 0) {
1504                 return -1;
1505         }
1506
1507         return 0;
1508 }
1509
1510 /*
1511   get debug level on a node
1512  */
1513 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1514 {
1515         int ret;
1516         int32_t res;
1517         TDB_DATA data;
1518
1519         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1520                            ctdb, &data, &res, NULL, NULL);
1521         if (ret != 0 || res != 0) {
1522                 return -1;
1523         }
1524         if (data.dsize != sizeof(int32_t)) {
1525                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1526                          (unsigned)data.dsize));
1527                 return -1;
1528         }
1529         *level = *(int32_t *)data.dptr;
1530         talloc_free(data.dptr);
1531         return 0;
1532 }
1533
1534 /*
1535   set debug level on a node
1536  */
1537 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1538 {
1539         int ret;
1540         int32_t res;
1541         TDB_DATA data;
1542
1543         data.dptr = (uint8_t *)&level;
1544         data.dsize = sizeof(level);
1545
1546         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1547                            NULL, NULL, &res, NULL, NULL);
1548         if (ret != 0 || res != 0) {
1549                 return -1;
1550         }
1551         return 0;
1552 }
1553
1554
1555 /*
1556   get a list of connected nodes
1557  */
1558 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1559                                 struct timeval timeout,
1560                                 TALLOC_CTX *mem_ctx,
1561                                 uint32_t *num_nodes)
1562 {
1563         struct ctdb_node_map *map=NULL;
1564         int ret, i;
1565         uint32_t *nodes;
1566
1567         *num_nodes = 0;
1568
1569         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1570         if (ret != 0) {
1571                 return NULL;
1572         }
1573
1574         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1575         if (nodes == NULL) {
1576                 return NULL;
1577         }
1578
1579         for (i=0;i<map->num;i++) {
1580                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1581                         nodes[*num_nodes] = map->nodes[i].pnn;
1582                         (*num_nodes)++;
1583                 }
1584         }
1585
1586         return nodes;
1587 }
1588
1589
1590 /*
1591   reset remote status
1592  */
1593 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1594 {
1595         int ret;
1596         int32_t res;
1597
1598         ret = ctdb_control(ctdb, destnode, 0, 
1599                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1600                            NULL, NULL, &res, NULL, NULL);
1601         if (ret != 0 || res != 0) {
1602                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1603                 return -1;
1604         }
1605         return 0;
1606 }
1607
1608 /*
1609   this is the dummy null procedure that all databases support
1610 */
1611 static int ctdb_null_func(struct ctdb_call_info *call)
1612 {
1613         return 0;
1614 }
1615
1616 /*
1617   this is a plain fetch procedure that all databases support
1618 */
1619 static int ctdb_fetch_func(struct ctdb_call_info *call)
1620 {
1621         call->reply_data = &call->record_data;
1622         return 0;
1623 }
1624
1625 /*
1626   attach to a specific database - client call
1627 */
1628 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1629 {
1630         struct ctdb_db_context *ctdb_db;
1631         TDB_DATA data;
1632         int ret;
1633         int32_t res;
1634
1635         ctdb_db = ctdb_db_handle(ctdb, name);
1636         if (ctdb_db) {
1637                 return ctdb_db;
1638         }
1639
1640         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1641         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1642
1643         ctdb_db->ctdb = ctdb;
1644         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1645         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1646
1647         data.dptr = discard_const(name);
1648         data.dsize = strlen(name)+1;
1649
1650         /* tell ctdb daemon to attach */
1651         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1652                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1653                            0, data, ctdb_db, &data, &res, NULL, NULL);
1654         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1655                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1656                 talloc_free(ctdb_db);
1657                 return NULL;
1658         }
1659         
1660         ctdb_db->db_id = *(uint32_t *)data.dptr;
1661         talloc_free(data.dptr);
1662
1663         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1664         if (ret != 0) {
1665                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1666                 talloc_free(ctdb_db);
1667                 return NULL;
1668         }
1669
1670         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1671         if (!ctdb->do_setsched) {
1672                 tdb_flags |= TDB_NOMMAP;
1673         }
1674
1675         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1676         if (ctdb_db->ltdb == NULL) {
1677                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1678                 talloc_free(ctdb_db);
1679                 return NULL;
1680         }
1681
1682         ctdb_db->persistent = persistent;
1683
1684         DLIST_ADD(ctdb->db_list, ctdb_db);
1685
1686         /* add well known functions */
1687         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1688         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1689
1690         return ctdb_db;
1691 }
1692
1693
1694 /*
1695   setup a call for a database
1696  */
1697 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1698 {
1699         struct ctdb_registered_call *call;
1700
1701 #if 0
1702         TDB_DATA data;
1703         int32_t status;
1704         struct ctdb_control_set_call c;
1705         int ret;
1706
1707         /* this is no longer valid with the separate daemon architecture */
1708         c.db_id = ctdb_db->db_id;
1709         c.fn    = fn;
1710         c.id    = id;
1711
1712         data.dptr = (uint8_t *)&c;
1713         data.dsize = sizeof(c);
1714
1715         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1716                            data, NULL, NULL, &status, NULL, NULL);
1717         if (ret != 0 || status != 0) {
1718                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1719                 return -1;
1720         }
1721 #endif
1722
1723         /* also register locally */
1724         call = talloc(ctdb_db, struct ctdb_registered_call);
1725         call->fn = fn;
1726         call->id = id;
1727
1728         DLIST_ADD(ctdb_db->calls, call);        
1729         return 0;
1730 }
1731
1732
1733 struct traverse_state {
1734         bool done;
1735         uint32_t count;
1736         ctdb_traverse_func fn;
1737         void *private_data;
1738 };
1739
1740 /*
1741   called on each key during a ctdb_traverse
1742  */
1743 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1744 {
1745         struct traverse_state *state = (struct traverse_state *)p;
1746         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1747         TDB_DATA key;
1748
1749         if (data.dsize < sizeof(uint32_t) ||
1750             d->length != data.dsize) {
1751                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1752                 state->done = True;
1753                 return;
1754         }
1755
1756         key.dsize = d->keylen;
1757         key.dptr  = &d->data[0];
1758         data.dsize = d->datalen;
1759         data.dptr = &d->data[d->keylen];
1760
1761         if (key.dsize == 0 && data.dsize == 0) {
1762                 /* end of traverse */
1763                 state->done = True;
1764                 return;
1765         }
1766
1767         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1768                 /* empty records are deleted records in ctdb */
1769                 return;
1770         }
1771
1772         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1773                 state->done = True;
1774         }
1775
1776         state->count++;
1777 }
1778
1779
1780 /*
1781   start a cluster wide traverse, calling the supplied fn on each record
1782   return the number of records traversed, or -1 on error
1783  */
1784 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1785 {
1786         TDB_DATA data;
1787         struct ctdb_traverse_start t;
1788         int32_t status;
1789         int ret;
1790         uint64_t srvid = (getpid() | 0xFLL<<60);
1791         struct traverse_state state;
1792
1793         state.done = False;
1794         state.count = 0;
1795         state.private_data = private_data;
1796         state.fn = fn;
1797
1798         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1799         if (ret != 0) {
1800                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1801                 return -1;
1802         }
1803
1804         t.db_id = ctdb_db->db_id;
1805         t.srvid = srvid;
1806         t.reqid = 0;
1807
1808         data.dptr = (uint8_t *)&t;
1809         data.dsize = sizeof(t);
1810
1811         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1812                            data, NULL, NULL, &status, NULL, NULL);
1813         if (ret != 0 || status != 0) {
1814                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1815                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1816                 return -1;
1817         }
1818
1819         while (!state.done) {
1820                 event_loop_once(ctdb_db->ctdb->ev);
1821         }
1822
1823         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1824         if (ret != 0) {
1825                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1826                 return -1;
1827         }
1828
1829         return state.count;
1830 }
1831
1832 #define ISASCII(x) ((x>31)&&(x<128))
1833 /*
1834   called on each key during a catdb
1835  */
1836 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1837 {
1838         int i;
1839         FILE *f = (FILE *)p;
1840         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1841
1842         fprintf(f, "dmaster: %u\n", h->dmaster);
1843         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1844
1845         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1846         for (i=0;i<key.dsize;i++) {
1847                 if (ISASCII(key.dptr[i])) {
1848                         fprintf(f, "%c", key.dptr[i]);
1849                 } else {
1850                         fprintf(f, "\\%02X", key.dptr[i]);
1851                 }
1852         }
1853         fprintf(f, "\"\n");
1854
1855         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1856         for (i=sizeof(*h);i<data.dsize;i++) {
1857                 if (ISASCII(data.dptr[i])) {
1858                         fprintf(f, "%c", data.dptr[i]);
1859                 } else {
1860                         fprintf(f, "\\%02X", data.dptr[i]);
1861                 }
1862         }
1863         fprintf(f, "\"\n");
1864
1865         return 0;
1866 }
1867
1868 /*
1869   convenience function to list all keys to stdout
1870  */
1871 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1872 {
1873         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1874 }
1875
1876 /*
1877   get the pid of a ctdb daemon
1878  */
1879 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1880 {
1881         int ret;
1882         int32_t res;
1883
1884         ret = ctdb_control(ctdb, destnode, 0, 
1885                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1886                            NULL, NULL, &res, &timeout, NULL);
1887         if (ret != 0) {
1888                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1889                 return -1;
1890         }
1891
1892         *pid = res;
1893
1894         return 0;
1895 }
1896
1897
1898 /*
1899   async freeze send control
1900  */
1901 struct ctdb_client_control_state *
1902 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1903 {
1904         return ctdb_control_send(ctdb, destnode, 0, 
1905                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1906                            mem_ctx, &timeout, NULL);
1907 }
1908
1909 /* 
1910    async freeze recv control
1911 */
1912 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1913 {
1914         int ret;
1915         int32_t res;
1916
1917         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1918         if ( (ret != 0) || (res != 0) ){
1919                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1920                 return -1;
1921         }
1922
1923         return 0;
1924 }
1925
1926 /*
1927   freeze a node
1928  */
1929 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1930 {
1931         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1932         struct ctdb_client_control_state *state;
1933         int ret;
1934
1935         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1936         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1937         talloc_free(tmp_ctx);
1938
1939         return ret;
1940 }
1941
1942 /*
1943   thaw a node
1944  */
1945 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1946 {
1947         int ret;
1948         int32_t res;
1949
1950         ret = ctdb_control(ctdb, destnode, 0, 
1951                            CTDB_CONTROL_THAW, 0, tdb_null, 
1952                            NULL, NULL, &res, &timeout, NULL);
1953         if (ret != 0 || res != 0) {
1954                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1955                 return -1;
1956         }
1957
1958         return 0;
1959 }
1960
1961 /*
1962   get pnn of a node, or -1
1963  */
1964 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1965 {
1966         int ret;
1967         int32_t res;
1968
1969         ret = ctdb_control(ctdb, destnode, 0, 
1970                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1971                            NULL, NULL, &res, &timeout, NULL);
1972         if (ret != 0) {
1973                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1974                 return -1;
1975         }
1976
1977         return res;
1978 }
1979
1980 /*
1981   get the monitoring mode of a remote node
1982  */
1983 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1984 {
1985         int ret;
1986         int32_t res;
1987
1988         ret = ctdb_control(ctdb, destnode, 0, 
1989                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1990                            NULL, NULL, &res, &timeout, NULL);
1991         if (ret != 0) {
1992                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1993                 return -1;
1994         }
1995
1996         *monmode = res;
1997
1998         return 0;
1999 }
2000
2001
2002 /*
2003  set the monitoring mode of a remote node to active
2004  */
2005 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2006 {
2007         int ret;
2008         
2009
2010         ret = ctdb_control(ctdb, destnode, 0, 
2011                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2012                            NULL, NULL,NULL, &timeout, NULL);
2013         if (ret != 0) {
2014                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2015                 return -1;
2016         }
2017
2018         
2019
2020         return 0;
2021 }
2022
2023 /*
2024   set the monitoring mode of a remote node to disable
2025  */
2026 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2027 {
2028         int ret;
2029         
2030
2031         ret = ctdb_control(ctdb, destnode, 0, 
2032                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2033                            NULL, NULL, NULL, &timeout, NULL);
2034         if (ret != 0) {
2035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2036                 return -1;
2037         }
2038
2039         
2040
2041         return 0;
2042 }
2043
2044
2045
2046 /* 
2047   sent to a node to make it take over an ip address
2048 */
2049 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2050                           uint32_t destnode, struct ctdb_public_ip *ip)
2051 {
2052         TDB_DATA data;
2053         int ret;
2054         int32_t res;
2055
2056         data.dsize = sizeof(*ip);
2057         data.dptr  = (uint8_t *)ip;
2058
2059         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2060                            NULL, &res, &timeout, NULL);
2061
2062         if (ret != 0 || res != 0) {
2063                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2064                 return -1;
2065         }
2066
2067         return 0;       
2068 }
2069
2070
2071 /* 
2072   sent to a node to make it release an ip address
2073 */
2074 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2075                          uint32_t destnode, struct ctdb_public_ip *ip)
2076 {
2077         TDB_DATA data;
2078         int ret;
2079         int32_t res;
2080
2081         data.dsize = sizeof(*ip);
2082         data.dptr  = (uint8_t *)ip;
2083
2084         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2085                            NULL, &res, &timeout, NULL);
2086
2087         if (ret != 0 || res != 0) {
2088                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2089                 return -1;
2090         }
2091
2092         return 0;       
2093 }
2094
2095
2096 /*
2097   get a tunable
2098  */
2099 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2100                           struct timeval timeout, 
2101                           uint32_t destnode,
2102                           const char *name, uint32_t *value)
2103 {
2104         struct ctdb_control_get_tunable *t;
2105         TDB_DATA data, outdata;
2106         int32_t res;
2107         int ret;
2108
2109         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2110         data.dptr  = talloc_size(ctdb, data.dsize);
2111         CTDB_NO_MEMORY(ctdb, data.dptr);
2112
2113         t = (struct ctdb_control_get_tunable *)data.dptr;
2114         t->length = strlen(name)+1;
2115         memcpy(t->name, name, t->length);
2116
2117         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2118                            &outdata, &res, &timeout, NULL);
2119         talloc_free(data.dptr);
2120         if (ret != 0 || res != 0) {
2121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2122                 return -1;
2123         }
2124
2125         if (outdata.dsize != sizeof(uint32_t)) {
2126                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2127                 talloc_free(outdata.dptr);
2128                 return -1;
2129         }
2130         
2131         *value = *(uint32_t *)outdata.dptr;
2132         talloc_free(outdata.dptr);
2133
2134         return 0;
2135 }
2136
2137 /*
2138   set a tunable
2139  */
2140 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2141                           struct timeval timeout, 
2142                           uint32_t destnode,
2143                           const char *name, uint32_t value)
2144 {
2145         struct ctdb_control_set_tunable *t;
2146         TDB_DATA data;
2147         int32_t res;
2148         int ret;
2149
2150         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2151         data.dptr  = talloc_size(ctdb, data.dsize);
2152         CTDB_NO_MEMORY(ctdb, data.dptr);
2153
2154         t = (struct ctdb_control_set_tunable *)data.dptr;
2155         t->length = strlen(name)+1;
2156         memcpy(t->name, name, t->length);
2157         t->value = value;
2158
2159         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2160                            NULL, &res, &timeout, NULL);
2161         talloc_free(data.dptr);
2162         if (ret != 0 || res != 0) {
2163                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2164                 return -1;
2165         }
2166
2167         return 0;
2168 }
2169
2170 /*
2171   list tunables
2172  */
2173 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2174                             struct timeval timeout, 
2175                             uint32_t destnode,
2176                             TALLOC_CTX *mem_ctx,
2177                             const char ***list, uint32_t *count)
2178 {
2179         TDB_DATA outdata;
2180         int32_t res;
2181         int ret;
2182         struct ctdb_control_list_tunable *t;
2183         char *p, *s, *ptr;
2184
2185         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2186                            mem_ctx, &outdata, &res, &timeout, NULL);
2187         if (ret != 0 || res != 0) {
2188                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2189                 return -1;
2190         }
2191
2192         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2193         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2194             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2195                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2196                 talloc_free(outdata.dptr);
2197                 return -1;              
2198         }
2199         
2200         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2201         CTDB_NO_MEMORY(ctdb, p);
2202
2203         talloc_free(outdata.dptr);
2204         
2205         (*list) = NULL;
2206         (*count) = 0;
2207
2208         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2209                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2210                 CTDB_NO_MEMORY(ctdb, *list);
2211                 (*list)[*count] = talloc_strdup(*list, s);
2212                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2213                 (*count)++;
2214         }
2215
2216         talloc_free(p);
2217
2218         return 0;
2219 }
2220
2221
2222 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2223                         struct timeval timeout, uint32_t destnode, 
2224                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2225 {
2226         int ret;
2227         TDB_DATA outdata;
2228         int32_t res;
2229
2230         ret = ctdb_control(ctdb, destnode, 0, 
2231                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2232                            mem_ctx, &outdata, &res, &timeout, NULL);
2233         if (ret != 0 || res != 0) {
2234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2235                 return -1;
2236         }
2237
2238         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2239         talloc_free(outdata.dptr);
2240                     
2241         return 0;
2242 }
2243
2244 /*
2245   set/clear the permanent disabled bit on a remote node
2246  */
2247 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2248                        uint32_t set, uint32_t clear)
2249 {
2250         int ret;
2251         TDB_DATA data;
2252         struct ctdb_node_modflags m;
2253         int32_t res;
2254
2255         m.set = set;
2256         m.clear = clear;
2257
2258         data.dsize = sizeof(m);
2259         data.dptr = (unsigned char *)&m;
2260
2261         ret = ctdb_control(ctdb, destnode, 0, 
2262                            CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
2263                            NULL, NULL, &res, &timeout, NULL);
2264         if (ret != 0 || res != 0) {
2265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for modflags failed\n"));
2266                 return -1;
2267         }
2268
2269         return 0;
2270 }
2271
2272
2273 /*
2274   get all tunables
2275  */
2276 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2277                                struct timeval timeout, 
2278                                uint32_t destnode,
2279                                struct ctdb_tunable *tunables)
2280 {
2281         TDB_DATA outdata;
2282         int ret;
2283         int32_t res;
2284
2285         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2286                            &outdata, &res, &timeout, NULL);
2287         if (ret != 0 || res != 0) {
2288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2289                 return -1;
2290         }
2291
2292         if (outdata.dsize != sizeof(*tunables)) {
2293                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2294                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2295                 return -1;              
2296         }
2297
2298         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2299         talloc_free(outdata.dptr);
2300         return 0;
2301 }
2302
2303 /*
2304   add a public address to a node
2305  */
2306 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2307                       struct timeval timeout, 
2308                       uint32_t destnode,
2309                       struct ctdb_control_ip_iface *pub)
2310 {
2311         TDB_DATA data;
2312         int32_t res;
2313         int ret;
2314
2315         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2316         data.dptr  = (unsigned char *)pub;
2317
2318         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2319                            NULL, &res, &timeout, NULL);
2320         if (ret != 0 || res != 0) {
2321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2322                 return -1;
2323         }
2324
2325         return 0;
2326 }
2327
2328 /*
2329   delete a public address from a node
2330  */
2331 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2332                       struct timeval timeout, 
2333                       uint32_t destnode,
2334                       struct ctdb_control_ip_iface *pub)
2335 {
2336         TDB_DATA data;
2337         int32_t res;
2338         int ret;
2339
2340         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2341         data.dptr  = (unsigned char *)pub;
2342
2343         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2344                            NULL, &res, &timeout, NULL);
2345         if (ret != 0 || res != 0) {
2346                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2347                 return -1;
2348         }
2349
2350         return 0;
2351 }
2352
2353 /*
2354   kill a tcp connection
2355  */
2356 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2357                       struct timeval timeout, 
2358                       uint32_t destnode,
2359                       struct ctdb_control_killtcp *killtcp)
2360 {
2361         TDB_DATA data;
2362         int32_t res;
2363         int ret;
2364
2365         data.dsize = sizeof(struct ctdb_control_killtcp);
2366         data.dptr  = (unsigned char *)killtcp;
2367
2368         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2369                            NULL, &res, &timeout, NULL);
2370         if (ret != 0 || res != 0) {
2371                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2372                 return -1;
2373         }
2374
2375         return 0;
2376 }
2377
2378 /*
2379   send a gratious arp
2380  */
2381 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2382                       struct timeval timeout, 
2383                       uint32_t destnode,
2384                       ctdb_sock_addr *addr,
2385                       const char *ifname)
2386 {
2387         TDB_DATA data;
2388         int32_t res;
2389         int ret, len;
2390         struct ctdb_control_gratious_arp *gratious_arp;
2391         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2392
2393
2394         len = strlen(ifname)+1;
2395         gratious_arp = talloc_size(tmp_ctx, 
2396                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2397         CTDB_NO_MEMORY(ctdb, gratious_arp);
2398
2399         gratious_arp->addr = *addr;
2400         gratious_arp->len = len;
2401         memcpy(&gratious_arp->iface[0], ifname, len);
2402
2403
2404         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2405         data.dptr  = (unsigned char *)gratious_arp;
2406
2407         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2408                            NULL, &res, &timeout, NULL);
2409         if (ret != 0 || res != 0) {
2410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2411                 talloc_free(tmp_ctx);
2412                 return -1;
2413         }
2414
2415         talloc_free(tmp_ctx);
2416         return 0;
2417 }
2418
2419 /*
2420   get a list of all tcp tickles that a node knows about for a particular vnn
2421  */
2422 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2423                               struct timeval timeout, uint32_t destnode, 
2424                               TALLOC_CTX *mem_ctx, 
2425                               struct sockaddr_in *ip,
2426                               struct ctdb_control_tcp_tickle_list **list)
2427 {
2428         int ret;
2429         TDB_DATA data, outdata;
2430         int32_t status;
2431
2432         data.dptr = (uint8_t*)ip;
2433         data.dsize = sizeof(struct sockaddr_in);
2434
2435         ret = ctdb_control(ctdb, destnode, 0, 
2436                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2437                            mem_ctx, &outdata, &status, NULL, NULL);
2438         if (ret != 0) {
2439                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2440                 return -1;
2441         }
2442
2443         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2444
2445         return status;
2446 }
2447
2448 /*
2449   register a server id
2450  */
2451 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2452                       struct timeval timeout, 
2453                       struct ctdb_server_id *id)
2454 {
2455         TDB_DATA data;
2456         int32_t res;
2457         int ret;
2458
2459         data.dsize = sizeof(struct ctdb_server_id);
2460         data.dptr  = (unsigned char *)id;
2461
2462         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2463                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2464                         0, data, NULL,
2465                         NULL, &res, &timeout, NULL);
2466         if (ret != 0 || res != 0) {
2467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2468                 return -1;
2469         }
2470
2471         return 0;
2472 }
2473
2474 /*
2475   unregister a server id
2476  */
2477 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2478                       struct timeval timeout, 
2479                       struct ctdb_server_id *id)
2480 {
2481         TDB_DATA data;
2482         int32_t res;
2483         int ret;
2484
2485         data.dsize = sizeof(struct ctdb_server_id);
2486         data.dptr  = (unsigned char *)id;
2487
2488         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2489                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2490                         0, data, NULL,
2491                         NULL, &res, &timeout, NULL);
2492         if (ret != 0 || res != 0) {
2493                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2494                 return -1;
2495         }
2496
2497         return 0;
2498 }
2499
2500
2501 /*
2502   check if a server id exists
2503
2504   if a server id does exist, return *status == 1, otherwise *status == 0
2505  */
2506 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2507                       struct timeval timeout, 
2508                       uint32_t destnode,
2509                       struct ctdb_server_id *id,
2510                       uint32_t *status)
2511 {
2512         TDB_DATA data;
2513         int32_t res;
2514         int ret;
2515
2516         data.dsize = sizeof(struct ctdb_server_id);
2517         data.dptr  = (unsigned char *)id;
2518
2519         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2520                         0, data, NULL,
2521                         NULL, &res, &timeout, NULL);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2524                 return -1;
2525         }
2526
2527         if (res) {
2528                 *status = 1;
2529         } else {
2530                 *status = 0;
2531         }
2532
2533         return 0;
2534 }
2535
2536 /*
2537    get the list of server ids that are registered on a node
2538 */
2539 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2540                 TALLOC_CTX *mem_ctx,
2541                 struct timeval timeout, uint32_t destnode, 
2542                 struct ctdb_server_id_list **svid_list)
2543 {
2544         int ret;
2545         TDB_DATA outdata;
2546         int32_t res;
2547
2548         ret = ctdb_control(ctdb, destnode, 0, 
2549                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2550                            mem_ctx, &outdata, &res, &timeout, NULL);
2551         if (ret != 0 || res != 0) {
2552                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2553                 return -1;
2554         }
2555
2556         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2557                     
2558         return 0;
2559 }
2560
2561 /*
2562   initialise the ctdb daemon for client applications
2563
2564   NOTE: In current code the daemon does not fork. This is for testing purposes only
2565   and to simplify the code.
2566 */
2567 struct ctdb_context *ctdb_init(struct event_context *ev)
2568 {
2569         struct ctdb_context *ctdb;
2570
2571         ctdb = talloc_zero(ev, struct ctdb_context);
2572         ctdb->ev  = ev;
2573         ctdb->idr = idr_init(ctdb);
2574         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2575
2576         ctdb_set_socketname(ctdb, CTDB_PATH);
2577
2578         return ctdb;
2579 }
2580
2581
2582 /*
2583   set some ctdb flags
2584 */
2585 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2586 {
2587         ctdb->flags |= flags;
2588 }
2589
2590 /*
2591   setup the local socket name
2592 */
2593 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2594 {
2595         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2596         return 0;
2597 }
2598
2599 /*
2600   return the pnn of this node
2601 */
2602 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2603 {
2604         return ctdb->pnn;
2605 }
2606
2607
2608 /*
2609   get the uptime of a remote node
2610  */
2611 struct ctdb_client_control_state *
2612 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2613 {
2614         return ctdb_control_send(ctdb, destnode, 0, 
2615                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2616                            mem_ctx, &timeout, NULL);
2617 }
2618
2619 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2620 {
2621         int ret;
2622         int32_t res;
2623         TDB_DATA outdata;
2624
2625         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2626         if (ret != 0 || res != 0) {
2627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2628                 return -1;
2629         }
2630
2631         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2632
2633         return 0;
2634 }
2635
2636 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2637 {
2638         struct ctdb_client_control_state *state;
2639
2640         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2641         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2642 }
2643
2644 /*
2645   send a control to execute the "recovered" event script on a node
2646  */
2647 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2648 {
2649         int ret;
2650         int32_t status;
2651
2652         ret = ctdb_control(ctdb, destnode, 0, 
2653                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2654                            NULL, NULL, &status, &timeout, NULL);
2655         if (ret != 0 || status != 0) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2657                 return -1;
2658         }
2659
2660         return 0;
2661 }
2662
2663 /* 
2664   callback for the async helpers used when sending the same control
2665   to multiple nodes in parallell.
2666 */
2667 static void async_callback(struct ctdb_client_control_state *state)
2668 {
2669         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2670         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2671         int ret;
2672         TDB_DATA outdata;
2673         int32_t res;
2674         uint32_t destnode = state->c->hdr.destnode;
2675
2676         /* one more node has responded with recmode data */
2677         data->count--;
2678
2679         /* if we failed to push the db, then return an error and let
2680            the main loop try again.
2681         */
2682         if (state->state != CTDB_CONTROL_DONE) {
2683                 if ( !data->dont_log_errors) {
2684                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2685                 }
2686                 data->fail_count++;
2687                 if (data->fail_callback) {
2688                         data->fail_callback(ctdb, destnode, res, outdata,
2689                                         data->callback_data);
2690                 }
2691                 return;
2692         }
2693         
2694         state->async.fn = NULL;
2695
2696         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2697         if ((ret != 0) || (res != 0)) {
2698                 if ( !data->dont_log_errors) {
2699                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2700                 }
2701                 data->fail_count++;
2702                 if (data->fail_callback) {
2703                         data->fail_callback(ctdb, destnode, res, outdata,
2704                                         data->callback_data);
2705                 }
2706         }
2707         if ((ret == 0) && (data->callback != NULL)) {
2708                 data->callback(ctdb, destnode, res, outdata,
2709                                         data->callback_data);
2710         }
2711 }
2712
2713
2714 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2715 {
2716         /* set up the callback functions */
2717         state->async.fn = async_callback;
2718         state->async.private_data = data;
2719         
2720         /* one more control to wait for to complete */
2721         data->count++;
2722 }
2723
2724
2725 /* wait for up to the maximum number of seconds allowed
2726    or until all nodes we expect a response from has replied
2727 */
2728 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2729 {
2730         while (data->count > 0) {
2731                 event_loop_once(ctdb->ev);
2732         }
2733         if (data->fail_count != 0) {
2734                 if (!data->dont_log_errors) {
2735                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2736                                  data->fail_count));
2737                 }
2738                 return -1;
2739         }
2740         return 0;
2741 }
2742
2743
2744 /* 
2745    perform a simple control on the listed nodes
2746    The control cannot return data
2747  */
2748 int ctdb_client_async_control(struct ctdb_context *ctdb,
2749                                 enum ctdb_controls opcode,
2750                                 uint32_t *nodes,
2751                                 struct timeval timeout,
2752                                 bool dont_log_errors,
2753                                 TDB_DATA data,
2754                                 client_async_callback client_callback,
2755                                 client_async_callback fail_callback,
2756                                 void *callback_data)
2757 {
2758         struct client_async_data *async_data;
2759         struct ctdb_client_control_state *state;
2760         int j, num_nodes;
2761
2762         async_data = talloc_zero(ctdb, struct client_async_data);
2763         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2764         async_data->dont_log_errors = dont_log_errors;
2765         async_data->callback = client_callback;
2766         async_data->fail_callback = fail_callback;
2767         async_data->callback_data = callback_data;
2768         async_data->opcode        = opcode;
2769
2770         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2771
2772         /* loop over all nodes and send an async control to each of them */
2773         for (j=0; j<num_nodes; j++) {
2774                 uint32_t pnn = nodes[j];
2775
2776                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2777                                           0, data, async_data, &timeout, NULL);
2778                 if (state == NULL) {
2779                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2780                         talloc_free(async_data);
2781                         return -1;
2782                 }
2783                 
2784                 ctdb_client_async_add(async_data, state);
2785         }
2786
2787         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2788                 talloc_free(async_data);
2789                 return -1;
2790         }
2791
2792         talloc_free(async_data);
2793         return 0;
2794 }
2795
2796 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2797                                 struct ctdb_vnn_map *vnn_map,
2798                                 TALLOC_CTX *mem_ctx,
2799                                 bool include_self)
2800 {
2801         int i, j, num_nodes;
2802         uint32_t *nodes;
2803
2804         for (i=num_nodes=0;i<vnn_map->size;i++) {
2805                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2806                         continue;
2807                 }
2808                 num_nodes++;
2809         } 
2810
2811         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2812         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2813
2814         for (i=j=0;i<vnn_map->size;i++) {
2815                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2816                         continue;
2817                 }
2818                 nodes[j++] = vnn_map->map[i];
2819         } 
2820
2821         return nodes;
2822 }
2823
2824 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2825                                 struct ctdb_node_map *node_map,
2826                                 TALLOC_CTX *mem_ctx,
2827                                 bool include_self)
2828 {
2829         int i, j, num_nodes;
2830         uint32_t *nodes;
2831
2832         for (i=num_nodes=0;i<node_map->num;i++) {
2833                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2834                         continue;
2835                 }
2836                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2837                         continue;
2838                 }
2839                 num_nodes++;
2840         } 
2841
2842         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2843         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2844
2845         for (i=j=0;i<node_map->num;i++) {
2846                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2847                         continue;
2848                 }
2849                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2850                         continue;
2851                 }
2852                 nodes[j++] = node_map->nodes[i].pnn;
2853         } 
2854
2855         return nodes;
2856 }
2857
2858 /* 
2859   this is used to test if a pnn lock exists and if it exists will return
2860   the number of connections that pnn has reported or -1 if that recovery
2861   daemon is not running.
2862 */
2863 int
2864 ctdb_read_pnn_lock(int fd, int32_t pnn)
2865 {
2866         struct flock lock;
2867         char c;
2868
2869         lock.l_type = F_WRLCK;
2870         lock.l_whence = SEEK_SET;
2871         lock.l_start = pnn;
2872         lock.l_len = 1;
2873         lock.l_pid = 0;
2874
2875         if (fcntl(fd, F_GETLK, &lock) != 0) {
2876                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2877                 return -1;
2878         }
2879
2880         if (lock.l_type == F_UNLCK) {
2881                 return -1;
2882         }
2883
2884         if (pread(fd, &c, 1, pnn) == -1) {
2885                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2886                 return -1;
2887         }
2888
2889         return c;
2890 }
2891
2892 /*
2893   get capabilities of a remote node
2894  */
2895 struct ctdb_client_control_state *
2896 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2897 {
2898         return ctdb_control_send(ctdb, destnode, 0, 
2899                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2900                            mem_ctx, &timeout, NULL);
2901 }
2902
2903 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2904 {
2905         int ret;
2906         int32_t res;
2907         TDB_DATA outdata;
2908
2909         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2910         if ( (ret != 0) || (res != 0) ) {
2911                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2912                 return -1;
2913         }
2914
2915         if (capabilities) {
2916                 *capabilities = *((uint32_t *)outdata.dptr);
2917         }
2918
2919         return 0;
2920 }
2921
2922 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2923 {
2924         struct ctdb_client_control_state *state;
2925         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2926         int ret;
2927
2928         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2929         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2930         talloc_free(tmp_ctx);
2931         return ret;
2932 }
2933
2934 struct ctdb_transaction_handle {
2935         struct ctdb_db_context *ctdb_db;
2936         bool in_replay;
2937         /* we store the reads and writes done under a transaction one
2938            list stores both reads and writes, the other just writes
2939         */
2940         struct ctdb_marshall_buffer *m_all;
2941         struct ctdb_marshall_buffer *m_write;
2942 };
2943
2944 /* start a transaction on a database */
2945 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2946 {
2947         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2948         return 0;
2949 }
2950
2951 /* start a transaction on a database */
2952 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2953 {
2954         struct ctdb_record_handle *rh;
2955         TDB_DATA key;
2956         struct ctdb_ltdb_header header;
2957         TALLOC_CTX *tmp_ctx;
2958         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2959         int ret;
2960         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2961
2962         key.dptr = discard_const(keyname);
2963         key.dsize = strlen(keyname);
2964
2965         if (!ctdb_db->persistent) {
2966                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2967                 return -1;
2968         }
2969
2970 again:
2971         tmp_ctx = talloc_new(h);
2972
2973         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2974         if (rh == NULL) {
2975                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
2976                 talloc_free(tmp_ctx);
2977                 return -1;
2978         }
2979         talloc_free(rh);
2980
2981         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2984                 talloc_free(tmp_ctx);
2985                 return -1;
2986         }
2987
2988         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
2989         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
2990                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2991                 talloc_free(tmp_ctx);
2992                 goto again;
2993         }
2994
2995         talloc_free(tmp_ctx);
2996
2997         return 0;
2998 }
2999
3000
3001 /* start a transaction on a database */
3002 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3003                                                        TALLOC_CTX *mem_ctx)
3004 {
3005         struct ctdb_transaction_handle *h;
3006         int ret;
3007
3008         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3009         if (h == NULL) {
3010                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3011                 return NULL;
3012         }
3013
3014         h->ctdb_db = ctdb_db;
3015
3016         ret = ctdb_transaction_fetch_start(h);
3017         if (ret != 0) {
3018                 talloc_free(h);
3019                 return NULL;
3020         }
3021
3022         talloc_set_destructor(h, ctdb_transaction_destructor);
3023
3024         return h;
3025 }
3026
3027
3028
3029 /*
3030   fetch a record inside a transaction
3031  */
3032 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3033                            TALLOC_CTX *mem_ctx, 
3034                            TDB_DATA key, TDB_DATA *data)
3035 {
3036         struct ctdb_ltdb_header header;
3037         int ret;
3038
3039         ZERO_STRUCT(header);
3040
3041         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3042         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3043                 /* record doesn't exist yet */
3044                 *data = tdb_null;
3045                 ret = 0;
3046         }
3047         
3048         if (ret != 0) {
3049                 return ret;
3050         }
3051
3052         if (!h->in_replay) {
3053                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3054                 if (h->m_all == NULL) {
3055                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3056                         return -1;
3057                 }
3058         }
3059
3060         return 0;
3061 }
3062
3063 /*
3064   stores a record inside a transaction
3065  */
3066 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3067                            TDB_DATA key, TDB_DATA data)
3068 {
3069         TALLOC_CTX *tmp_ctx = talloc_new(h);
3070         struct ctdb_ltdb_header header;
3071         TDB_DATA olddata;
3072         int ret;
3073
3074         ZERO_STRUCT(header);
3075
3076         /* we need the header so we can update the RSN */
3077         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3078         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3079                 /* the record doesn't exist - create one with us as dmaster.
3080                    This is only safe because we are in a transaction and this
3081                    is a persistent database */
3082                 header.dmaster = h->ctdb_db->ctdb->pnn;
3083                 header.rsn = 0;
3084         } else if (ret != 0) {
3085                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3086                 talloc_free(tmp_ctx);
3087                 return ret;
3088         }
3089
3090         if (data.dsize == olddata.dsize &&
3091             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3092                 /* save writing the same data */
3093                 talloc_free(tmp_ctx);
3094                 return 0;
3095         }
3096
3097         header.rsn++;
3098
3099         if (!h->in_replay) {
3100                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3101                 if (h->m_all == NULL) {
3102                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3103                         talloc_free(tmp_ctx);
3104                         return -1;
3105                 }
3106         }               
3107
3108         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3109         if (h->m_write == NULL) {
3110                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3111                 talloc_free(tmp_ctx);
3112                 return -1;
3113         }
3114         
3115         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3116
3117         talloc_free(tmp_ctx);
3118         
3119         return ret;
3120 }
3121
3122 /*
3123   replay a transaction
3124  */
3125 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3126 {
3127         int ret, i;
3128         struct ctdb_rec_data *rec = NULL;
3129
3130         h->in_replay = true;
3131         talloc_free(h->m_write);
3132         h->m_write = NULL;
3133
3134         ret = ctdb_transaction_fetch_start(h);
3135         if (ret != 0) {
3136                 return ret;
3137         }
3138
3139         for (i=0;i<h->m_all->count;i++) {
3140                 TDB_DATA key, data;
3141
3142                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3143                 if (rec == NULL) {
3144                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3145                         goto failed;
3146                 }
3147
3148                 if (rec->reqid == 0) {
3149                         /* its a store */
3150                         if (ctdb_transaction_store(h, key, data) != 0) {
3151                                 goto failed;
3152                         }
3153                 } else {
3154                         TDB_DATA data2;
3155                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3156
3157                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3158                                 talloc_free(tmp_ctx);
3159                                 goto failed;
3160                         }
3161                         if (data2.dsize != data.dsize ||
3162                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3163                                 /* the record has changed on us - we have to give up */
3164                                 talloc_free(tmp_ctx);
3165                                 goto failed;
3166                         }
3167                         talloc_free(tmp_ctx);
3168                 }
3169         }
3170         
3171         return 0;
3172
3173 failed:
3174         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3175         return -1;
3176 }
3177
3178
3179 /*
3180   commit a transaction
3181  */
3182 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3183 {
3184         int ret;
3185         int32_t status;
3186         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3187         struct timeval timeout;
3188
3189         talloc_set_destructor(h, NULL);
3190
3191         /* our commit strategy is quite complex.
3192
3193            - we first try to commit the changes to all other nodes
3194
3195            - if that works, then we commit locally and we are done
3196
3197            - if a commit on another node fails, then we need to cancel
3198              the transaction, then restart the transaction (thus
3199              opening a window of time for a pending recovery to
3200              complete), then replay the transaction, checking all the
3201              reads and writes (checking that reads give the same data,
3202              and writes succeed). Then we retry the transaction to the
3203              other nodes
3204         */
3205
3206 again:
3207         if (h->m_write == NULL) {
3208                 /* no changes were made */
3209                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3210                 talloc_free(h);
3211                 return 0;
3212         }
3213
3214         /* tell ctdbd to commit to the other nodes */
3215         timeout = timeval_current_ofs(1, 0);
3216         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3217                            CTDB_CONTROL_TRANS2_COMMIT, 0, 
3218                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3219                            &timeout, NULL);
3220         if (ret != 0 || status != 0) {
3221                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3222                 sleep(1);
3223                 if (ctdb_replay_transaction(h) != 0) {
3224                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3225                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3226                                      CTDB_CONTROL_TRANS2_ERROR, CTDB_CTRL_FLAG_NOREPLY, 
3227                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3228                         talloc_free(h);
3229                         return -1;
3230                 }
3231                 goto again;
3232         }
3233
3234         /* do the real commit locally */
3235         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3236         if (ret != 0) {
3237                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3238                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3239                              CTDB_CONTROL_TRANS2_ERROR, CTDB_CTRL_FLAG_NOREPLY, 
3240                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3241                 talloc_free(h);
3242                 return ret;
3243         }
3244
3245         /* tell ctdbd that we are finished with our local commit */
3246         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3247                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3248                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3249         talloc_free(h);
3250         return 0;
3251 }