recoverd: eliminate some trailing spaces from ctdb_election_win()
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         hdr->length       = length;
58         hdr->operation    = operation;
59         hdr->ctdb_magic   = CTDB_MAGIC;
60         hdr->ctdb_version = CTDB_VERSION;
61         hdr->srcnode      = ctdb->pnn;
62         if (ctdb->vnn_map) {
63                 hdr->generation = ctdb->vnn_map->generation;
64         }
65
66         return hdr;
67 }
68
69 /*
70   local version of ctdb_call
71 */
72 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
73                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
74                     TDB_DATA *data)
75 {
76         struct ctdb_call_info *c;
77         struct ctdb_registered_call *fn;
78         struct ctdb_context *ctdb = ctdb_db->ctdb;
79         
80         c = talloc(ctdb, struct ctdb_call_info);
81         CTDB_NO_MEMORY(ctdb, c);
82
83         c->key = call->key;
84         c->call_data = &call->call_data;
85         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
86         c->record_data.dsize = data->dsize;
87         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
88         c->new_data = NULL;
89         c->reply_data = NULL;
90         c->status = 0;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
203                 exit(0);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         set_nonblocking(ctdb->daemon.sd);
265         set_close_on_exec(ctdb->daemon.sd);
266         
267         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
268                 close(ctdb->daemon.sd);
269                 ctdb->daemon.sd = -1;
270                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
271                 return -1;
272         }
273
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb);
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return 0;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
364
365         return state;
366 }
367
368 /*
369   make a ctdb call to the local daemon - async send. Called from client context.
370
371   This constructs a ctdb_call request and queues it for processing. 
372   This call never blocks.
373 */
374 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
375                                               struct ctdb_call *call)
376 {
377         struct ctdb_client_call_state *state;
378         struct ctdb_context *ctdb = ctdb_db->ctdb;
379         struct ctdb_ltdb_header header;
380         TDB_DATA data;
381         int ret;
382         size_t len;
383         struct ctdb_req_call *c;
384
385         /* if the domain socket is not yet open, open it */
386         if (ctdb->daemon.sd==-1) {
387                 ctdb_socket_connect(ctdb);
388         }
389
390         ret = ctdb_ltdb_lock(ctdb_db, call->key);
391         if (ret != 0) {
392                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
393                 return NULL;
394         }
395
396         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
397
398         if (ret == 0 && header.dmaster == ctdb->pnn) {
399                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
400                 talloc_free(data.dptr);
401                 ctdb_ltdb_unlock(ctdb_db, call->key);
402                 return state;
403         }
404
405         ctdb_ltdb_unlock(ctdb_db, call->key);
406         talloc_free(data.dptr);
407
408         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
409         if (state == NULL) {
410                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
411                 return NULL;
412         }
413         state->call = talloc_zero(state, struct ctdb_call);
414         if (state->call == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
416                 return NULL;
417         }
418
419         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
420         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
421         if (c == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
423                 return NULL;
424         }
425
426         state->reqid     = ctdb_reqid_new(ctdb, state);
427         state->ctdb_db = ctdb_db;
428         talloc_set_destructor(state, ctdb_client_call_destructor);
429
430         c->hdr.reqid     = state->reqid;
431         c->flags         = call->flags;
432         c->db_id         = ctdb_db->db_id;
433         c->callid        = call->call_id;
434         c->hopcount      = 0;
435         c->keylen        = call->key.dsize;
436         c->calldatalen   = call->call_data.dsize;
437         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
438         memcpy(&c->data[call->key.dsize], 
439                call->call_data.dptr, call->call_data.dsize);
440         *(state->call)              = *call;
441         state->call->call_data.dptr = &c->data[call->key.dsize];
442         state->call->key.dptr       = &c->data[0];
443
444         state->state  = CTDB_CALL_WAIT;
445
446
447         ctdb_client_queue_pkt(ctdb, &c->hdr);
448
449         return state;
450 }
451
452
453 /*
454   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
455 */
456 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
457 {
458         struct ctdb_client_call_state *state;
459
460         state = ctdb_call_send(ctdb_db, call);
461         return ctdb_call_recv(state, call);
462 }
463
464
465 /*
466   tell the daemon what messaging srvid we will use, and register the message
467   handler function in the client
468 */
469 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
470                              ctdb_message_fn_t handler,
471                              void *private_data)
472                                     
473 {
474         int res;
475         int32_t status;
476         
477         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
478                            tdb_null, NULL, NULL, &status, NULL, NULL);
479         if (res != 0 || status != 0) {
480                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
481                 return -1;
482         }
483
484         /* also need to register the handler with our own ctdb structure */
485         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
486 }
487
488 /*
489   tell the daemon we no longer want a srvid
490 */
491 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
492 {
493         int res;
494         int32_t status;
495         
496         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
497                            tdb_null, NULL, NULL, &status, NULL, NULL);
498         if (res != 0 || status != 0) {
499                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
500                 return -1;
501         }
502
503         /* also need to register the handler with our own ctdb structure */
504         ctdb_deregister_message_handler(ctdb, srvid, private_data);
505         return 0;
506 }
507
508
509 /*
510   send a message - from client context
511  */
512 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
513                       uint64_t srvid, TDB_DATA data)
514 {
515         struct ctdb_req_message *r;
516         int len, res;
517
518         len = offsetof(struct ctdb_req_message, data) + data.dsize;
519         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
520                                len, struct ctdb_req_message);
521         CTDB_NO_MEMORY(ctdb, r);
522
523         r->hdr.destnode  = pnn;
524         r->srvid         = srvid;
525         r->datalen       = data.dsize;
526         memcpy(&r->data[0], data.dptr, data.dsize);
527         
528         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
529         talloc_free(r);
530         return res;
531 }
532
533
534 /*
535   cancel a ctdb_fetch_lock operation, releasing the lock
536  */
537 static int fetch_lock_destructor(struct ctdb_record_handle *h)
538 {
539         ctdb_ltdb_unlock(h->ctdb_db, h->key);
540         return 0;
541 }
542
543 /*
544   force the migration of a record to this node
545  */
546 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
547 {
548         struct ctdb_call call;
549         ZERO_STRUCT(call);
550         call.call_id = CTDB_NULL_FUNC;
551         call.key = key;
552         call.flags = CTDB_IMMEDIATE_MIGRATION;
553         return ctdb_call(ctdb_db, &call);
554 }
555
556 /*
557   get a lock on a record, and return the records data. Blocks until it gets the lock
558  */
559 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
560                                            TDB_DATA key, TDB_DATA *data)
561 {
562         int ret;
563         struct ctdb_record_handle *h;
564
565         /*
566           procedure is as follows:
567
568           1) get the chain lock. 
569           2) check if we are dmaster
570           3) if we are the dmaster then return handle 
571           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
572              reply from ctdbd
573           5) when we get the reply, goto (1)
574          */
575
576         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
577         if (h == NULL) {
578                 return NULL;
579         }
580
581         h->ctdb_db = ctdb_db;
582         h->key     = key;
583         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584         if (h->key.dptr == NULL) {
585                 talloc_free(h);
586                 return NULL;
587         }
588         h->data    = data;
589
590         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
591                  (const char *)key.dptr));
592
593 again:
594         /* step 1 - get the chain lock */
595         ret = ctdb_ltdb_lock(ctdb_db, key);
596         if (ret != 0) {
597                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
598                 talloc_free(h);
599                 return NULL;
600         }
601
602         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
603
604         talloc_set_destructor(h, fetch_lock_destructor);
605
606         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
607
608         /* when torturing, ensure we test the remote path */
609         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
610             random() % 5 == 0) {
611                 h->header.dmaster = (uint32_t)-1;
612         }
613
614
615         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
616
617         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
618                 ctdb_ltdb_unlock(ctdb_db, key);
619                 ret = ctdb_client_force_migration(ctdb_db, key);
620                 if (ret != 0) {
621                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
622                         talloc_free(h);
623                         return NULL;
624                 }
625                 goto again;
626         }
627
628         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
629         return h;
630 }
631
632 /*
633   store some data to the record that was locked with ctdb_fetch_lock()
634 */
635 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
636 {
637         if (h->ctdb_db->persistent) {
638                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
639                 return -1;
640         }
641
642         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
643 }
644
645 /*
646   non-locking fetch of a record
647  */
648 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
649                TDB_DATA key, TDB_DATA *data)
650 {
651         struct ctdb_call call;
652         int ret;
653
654         call.call_id = CTDB_FETCH_FUNC;
655         call.call_data.dptr = NULL;
656         call.call_data.dsize = 0;
657
658         ret = ctdb_call(ctdb_db, &call);
659
660         if (ret == 0) {
661                 *data = call.reply_data;
662                 talloc_steal(mem_ctx, data->dptr);
663         }
664
665         return ret;
666 }
667
668
669
670 /*
671    called when a control completes or timesout to invoke the callback
672    function the user provided
673 */
674 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
675         struct timeval t, void *private_data)
676 {
677         struct ctdb_client_control_state *state;
678         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
679         int ret;
680
681         state = talloc_get_type(private_data, struct ctdb_client_control_state);
682         talloc_steal(tmp_ctx, state);
683
684         ret = ctdb_control_recv(state->ctdb, state, state,
685                         NULL, 
686                         NULL, 
687                         NULL);
688
689         talloc_free(tmp_ctx);
690 }
691
692 /*
693   called when a CTDB_REPLY_CONTROL packet comes in in the client
694
695   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
696   contains any reply data from the control
697 */
698 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
699                                       struct ctdb_req_header *hdr)
700 {
701         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
702         struct ctdb_client_control_state *state;
703
704         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
705         if (state == NULL) {
706                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
707                 return;
708         }
709
710         if (hdr->reqid != state->reqid) {
711                 /* we found a record  but it was the wrong one */
712                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
713                 return;
714         }
715
716         state->outdata.dptr = c->data;
717         state->outdata.dsize = c->datalen;
718         state->status = c->status;
719         if (c->errorlen) {
720                 state->errormsg = talloc_strndup(state, 
721                                                  (char *)&c->data[c->datalen], 
722                                                  c->errorlen);
723         }
724
725         /* state->outdata now uses resources from c so we dont want c
726            to just dissappear from under us while state is still alive
727         */
728         talloc_steal(state, c);
729
730         state->state = CTDB_CONTROL_DONE;
731
732         /* if we had a callback registered for this control, pull the response
733            and call the callback.
734         */
735         if (state->async.fn) {
736                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
737         }
738 }
739
740
741 /*
742   destroy a ctdb_control in client
743 */
744 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
745 {
746         ctdb_reqid_remove(state->ctdb, state->reqid);
747         return 0;
748 }
749
750
751 /* time out handler for ctdb_control */
752 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
753         struct timeval t, void *private_data)
754 {
755         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
756
757         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
758                          "dstnode:%u\n", state->reqid, state->c->opcode,
759                          state->c->hdr.destnode));
760
761         state->state = CTDB_CONTROL_TIMEOUT;
762
763         /* if we had a callback registered for this control, pull the response
764            and call the callback.
765         */
766         if (state->async.fn) {
767                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
768         }
769 }
770
771 /* async version of send control request */
772 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
773                 uint32_t destnode, uint64_t srvid, 
774                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
775                 TALLOC_CTX *mem_ctx,
776                 struct timeval *timeout,
777                 char **errormsg)
778 {
779         struct ctdb_client_control_state *state;
780         size_t len;
781         struct ctdb_req_control *c;
782         int ret;
783
784         if (errormsg) {
785                 *errormsg = NULL;
786         }
787
788         /* if the domain socket is not yet open, open it */
789         if (ctdb->daemon.sd==-1) {
790                 ctdb_socket_connect(ctdb);
791         }
792
793         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
794         CTDB_NO_MEMORY_NULL(ctdb, state);
795
796         state->ctdb       = ctdb;
797         state->reqid      = ctdb_reqid_new(ctdb, state);
798         state->state      = CTDB_CONTROL_WAIT;
799         state->errormsg   = NULL;
800
801         talloc_set_destructor(state, ctdb_control_destructor);
802
803         len = offsetof(struct ctdb_req_control, data) + data.dsize;
804         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
805                                len, struct ctdb_req_control);
806         state->c            = c;        
807         CTDB_NO_MEMORY_NULL(ctdb, c);
808         c->hdr.reqid        = state->reqid;
809         c->hdr.destnode     = destnode;
810         c->opcode           = opcode;
811         c->client_id        = 0;
812         c->flags            = flags;
813         c->srvid            = srvid;
814         c->datalen          = data.dsize;
815         if (data.dsize) {
816                 memcpy(&c->data[0], data.dptr, data.dsize);
817         }
818
819         /* timeout */
820         if (timeout && !timeval_is_zero(timeout)) {
821                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
822         }
823
824         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
825         if (ret != 0) {
826                 talloc_free(state);
827                 return NULL;
828         }
829
830         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
831                 talloc_free(state);
832                 return NULL;
833         }
834
835         return state;
836 }
837
838
839 /* async version of receive control reply */
840 int ctdb_control_recv(struct ctdb_context *ctdb, 
841                 struct ctdb_client_control_state *state, 
842                 TALLOC_CTX *mem_ctx,
843                 TDB_DATA *outdata, int32_t *status, char **errormsg)
844 {
845         TALLOC_CTX *tmp_ctx;
846
847         if (status != NULL) {
848                 *status = -1;
849         }
850         if (errormsg != NULL) {
851                 *errormsg = NULL;
852         }
853
854         if (state == NULL) {
855                 return -1;
856         }
857
858         /* prevent double free of state */
859         tmp_ctx = talloc_new(ctdb);
860         talloc_steal(tmp_ctx, state);
861
862         /* loop one event at a time until we either timeout or the control
863            completes.
864         */
865         while (state->state == CTDB_CONTROL_WAIT) {
866                 event_loop_once(ctdb->ev);
867         }
868
869         if (state->state != CTDB_CONTROL_DONE) {
870                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
871                 if (state->async.fn) {
872                         state->async.fn(state);
873                 }
874                 talloc_free(tmp_ctx);
875                 return -1;
876         }
877
878         if (state->errormsg) {
879                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
880                 if (errormsg) {
881                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
882                 }
883                 if (state->async.fn) {
884                         state->async.fn(state);
885                 }
886                 talloc_free(tmp_ctx);
887                 return -1;
888         }
889
890         if (outdata) {
891                 *outdata = state->outdata;
892                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
893         }
894
895         if (status) {
896                 *status = state->status;
897         }
898
899         if (state->async.fn) {
900                 state->async.fn(state);
901         }
902
903         talloc_free(tmp_ctx);
904         return 0;
905 }
906
907
908
909 /*
910   send a ctdb control message
911   timeout specifies how long we should wait for a reply.
912   if timeout is NULL we wait indefinitely
913  */
914 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
915                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
916                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
917                  struct timeval *timeout,
918                  char **errormsg)
919 {
920         struct ctdb_client_control_state *state;
921
922         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
923                         flags, data, mem_ctx,
924                         timeout, errormsg);
925
926         /* FIXME: Error conditions in ctdb_control_send return NULL without
927          * setting errormsg.  So, there is no way to distinguish between sucess
928          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
929         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
930                 if (status != NULL) {
931                         *status = 0;
932                 }
933                 return 0;
934         }
935
936         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
937                         errormsg);
938 }
939
940
941
942
943 /*
944   a process exists call. Returns 0 if process exists, -1 otherwise
945  */
946 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
947 {
948         int ret;
949         TDB_DATA data;
950         int32_t status;
951
952         data.dptr = (uint8_t*)&pid;
953         data.dsize = sizeof(pid);
954
955         ret = ctdb_control(ctdb, destnode, 0, 
956                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
957                            NULL, NULL, &status, NULL, NULL);
958         if (ret != 0) {
959                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
960                 return -1;
961         }
962
963         return status;
964 }
965
966 /*
967   get remote statistics
968  */
969 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
970 {
971         int ret;
972         TDB_DATA data;
973         int32_t res;
974
975         ret = ctdb_control(ctdb, destnode, 0, 
976                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
977                            ctdb, &data, &res, NULL, NULL);
978         if (ret != 0 || res != 0) {
979                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
980                 return -1;
981         }
982
983         if (data.dsize != sizeof(struct ctdb_statistics)) {
984                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
985                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
986                       return -1;
987         }
988
989         *status = *(struct ctdb_statistics *)data.dptr;
990         talloc_free(data.dptr);
991                         
992         return 0;
993 }
994
995 /*
996   shutdown a remote ctdb node
997  */
998 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
999 {
1000         struct ctdb_client_control_state *state;
1001
1002         state = ctdb_control_send(ctdb, destnode, 0, 
1003                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1004                            NULL, &timeout, NULL);
1005         if (state == NULL) {
1006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1007                 return -1;
1008         }
1009
1010         return 0;
1011 }
1012
1013 /*
1014   get vnn map from a remote node
1015  */
1016 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1017 {
1018         int ret;
1019         TDB_DATA outdata;
1020         int32_t res;
1021         struct ctdb_vnn_map_wire *map;
1022
1023         ret = ctdb_control(ctdb, destnode, 0, 
1024                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1025                            mem_ctx, &outdata, &res, &timeout, NULL);
1026         if (ret != 0 || res != 0) {
1027                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1028                 return -1;
1029         }
1030         
1031         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1032         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1033             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1034                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1035                 return -1;
1036         }
1037
1038         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1039         CTDB_NO_MEMORY(ctdb, *vnnmap);
1040         (*vnnmap)->generation = map->generation;
1041         (*vnnmap)->size       = map->size;
1042         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1043
1044         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1045         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1046         talloc_free(outdata.dptr);
1047                     
1048         return 0;
1049 }
1050
1051
1052 /*
1053   get the recovery mode of a remote node
1054  */
1055 struct ctdb_client_control_state *
1056 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1057 {
1058         return ctdb_control_send(ctdb, destnode, 0, 
1059                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1060                            mem_ctx, &timeout, NULL);
1061 }
1062
1063 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1064 {
1065         int ret;
1066         int32_t res;
1067
1068         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1071                 return -1;
1072         }
1073
1074         if (recmode) {
1075                 *recmode = (uint32_t)res;
1076         }
1077
1078         return 0;
1079 }
1080
1081 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1082 {
1083         struct ctdb_client_control_state *state;
1084
1085         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1086         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1087 }
1088
1089
1090
1091
1092 /*
1093   set the recovery mode of a remote node
1094  */
1095 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1096 {
1097         int ret;
1098         TDB_DATA data;
1099         int32_t res;
1100
1101         data.dsize = sizeof(uint32_t);
1102         data.dptr = (unsigned char *)&recmode;
1103
1104         ret = ctdb_control(ctdb, destnode, 0, 
1105                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1106                            NULL, NULL, &res, &timeout, NULL);
1107         if (ret != 0 || res != 0) {
1108                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1109                 return -1;
1110         }
1111
1112         return 0;
1113 }
1114
1115
1116
1117 /*
1118   get the recovery master of a remote node
1119  */
1120 struct ctdb_client_control_state *
1121 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1122                         struct timeval timeout, uint32_t destnode)
1123 {
1124         return ctdb_control_send(ctdb, destnode, 0, 
1125                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1126                            mem_ctx, &timeout, NULL);
1127 }
1128
1129 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1130 {
1131         int ret;
1132         int32_t res;
1133
1134         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1135         if (ret != 0) {
1136                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1137                 return -1;
1138         }
1139
1140         if (recmaster) {
1141                 *recmaster = (uint32_t)res;
1142         }
1143
1144         return 0;
1145 }
1146
1147 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1148 {
1149         struct ctdb_client_control_state *state;
1150
1151         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1152         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1153 }
1154
1155
1156 /*
1157   set the recovery master of a remote node
1158  */
1159 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1160 {
1161         int ret;
1162         TDB_DATA data;
1163         int32_t res;
1164
1165         ZERO_STRUCT(data);
1166         data.dsize = sizeof(uint32_t);
1167         data.dptr = (unsigned char *)&recmaster;
1168
1169         ret = ctdb_control(ctdb, destnode, 0, 
1170                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1171                            NULL, NULL, &res, &timeout, NULL);
1172         if (ret != 0 || res != 0) {
1173                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1174                 return -1;
1175         }
1176
1177         return 0;
1178 }
1179
1180
1181 /*
1182   get a list of databases off a remote node
1183  */
1184 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1185                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1186 {
1187         int ret;
1188         TDB_DATA outdata;
1189         int32_t res;
1190
1191         ret = ctdb_control(ctdb, destnode, 0, 
1192                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1193                            mem_ctx, &outdata, &res, &timeout, NULL);
1194         if (ret != 0 || res != 0) {
1195                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1196                 return -1;
1197         }
1198
1199         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1200         talloc_free(outdata.dptr);
1201                     
1202         return 0;
1203 }
1204
1205 /*
1206   get a list of nodes (vnn and flags ) from a remote node
1207  */
1208 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1209                 struct timeval timeout, uint32_t destnode, 
1210                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1211 {
1212         int ret;
1213         TDB_DATA outdata;
1214         int32_t res;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1218                            mem_ctx, &outdata, &res, &timeout, NULL);
1219         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1221                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1222         }
1223         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1225                 return -1;
1226         }
1227
1228         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1229         talloc_free(outdata.dptr);
1230                     
1231         return 0;
1232 }
1233
1234 /*
1235   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1236  */
1237 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1238                 struct timeval timeout, uint32_t destnode, 
1239                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1240 {
1241         int ret, i, len;
1242         TDB_DATA outdata;
1243         struct ctdb_node_mapv4 *nodemapv4;
1244         int32_t res;
1245
1246         ret = ctdb_control(ctdb, destnode, 0, 
1247                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1248                            mem_ctx, &outdata, &res, &timeout, NULL);
1249         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1251                 return -1;
1252         }
1253
1254         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1255
1256         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1257         (*nodemap) = talloc_zero_size(mem_ctx, len);
1258         CTDB_NO_MEMORY(ctdb, (*nodemap));
1259
1260         (*nodemap)->num = nodemapv4->num;
1261         for (i=0; i<nodemapv4->num; i++) {
1262                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1263                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1264                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1265                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1266         }
1267                 
1268         talloc_free(outdata.dptr);
1269                     
1270         return 0;
1271 }
1272
1273 /*
1274   drop the transport, reload the nodes file and restart the transport
1275  */
1276 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1277                     struct timeval timeout, uint32_t destnode)
1278 {
1279         int ret;
1280         int32_t res;
1281
1282         ret = ctdb_control(ctdb, destnode, 0, 
1283                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1284                            NULL, NULL, &res, &timeout, NULL);
1285         if (ret != 0 || res != 0) {
1286                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1287                 return -1;
1288         }
1289
1290         return 0;
1291 }
1292
1293
1294 /*
1295   set vnn map on a node
1296  */
1297 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1298                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1299 {
1300         int ret;
1301         TDB_DATA data;
1302         int32_t res;
1303         struct ctdb_vnn_map_wire *map;
1304         size_t len;
1305
1306         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1307         map = talloc_size(mem_ctx, len);
1308         CTDB_NO_MEMORY(ctdb, map);
1309
1310         map->generation = vnnmap->generation;
1311         map->size = vnnmap->size;
1312         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1313         
1314         data.dsize = len;
1315         data.dptr  = (uint8_t *)map;
1316
1317         ret = ctdb_control(ctdb, destnode, 0, 
1318                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1319                            NULL, NULL, &res, &timeout, NULL);
1320         if (ret != 0 || res != 0) {
1321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1322                 return -1;
1323         }
1324
1325         talloc_free(map);
1326
1327         return 0;
1328 }
1329
1330
1331 /*
1332   async send for pull database
1333  */
1334 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1335         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1336         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1337 {
1338         TDB_DATA indata;
1339         struct ctdb_control_pulldb *pull;
1340         struct ctdb_client_control_state *state;
1341
1342         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1343         CTDB_NO_MEMORY_NULL(ctdb, pull);
1344
1345         pull->db_id   = dbid;
1346         pull->lmaster = lmaster;
1347
1348         indata.dsize = sizeof(struct ctdb_control_pulldb);
1349         indata.dptr  = (unsigned char *)pull;
1350
1351         state = ctdb_control_send(ctdb, destnode, 0, 
1352                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1353                                   mem_ctx, &timeout, NULL);
1354         talloc_free(pull);
1355
1356         return state;
1357 }
1358
1359 /*
1360   async recv for pull database
1361  */
1362 int ctdb_ctrl_pulldb_recv(
1363         struct ctdb_context *ctdb, 
1364         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1365         TDB_DATA *outdata)
1366 {
1367         int ret;
1368         int32_t res;
1369
1370         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1371         if ( (ret != 0) || (res != 0) ){
1372                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1373                 return -1;
1374         }
1375
1376         return 0;
1377 }
1378
1379 /*
1380   pull all keys and records for a specific database on a node
1381  */
1382 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1383                 uint32_t dbid, uint32_t lmaster, 
1384                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1385                 TDB_DATA *outdata)
1386 {
1387         struct ctdb_client_control_state *state;
1388
1389         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1390                                       timeout);
1391         
1392         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1393 }
1394
1395
1396 /*
1397   change dmaster for all keys in the database to the new value
1398  */
1399 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1400                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1401 {
1402         int ret;
1403         TDB_DATA indata;
1404         int32_t res;
1405
1406         indata.dsize = 2*sizeof(uint32_t);
1407         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1408
1409         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1410         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1411
1412         ret = ctdb_control(ctdb, destnode, 0, 
1413                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1414                            NULL, NULL, &res, &timeout, NULL);
1415         if (ret != 0 || res != 0) {
1416                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1417                 return -1;
1418         }
1419
1420         return 0;
1421 }
1422
1423 /*
1424   ping a node, return number of clients connected
1425  */
1426 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1427 {
1428         int ret;
1429         int32_t res;
1430
1431         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1432                            tdb_null, NULL, NULL, &res, NULL, NULL);
1433         if (ret != 0) {
1434                 return -1;
1435         }
1436         return res;
1437 }
1438
1439 /*
1440   find the real path to a ltdb 
1441  */
1442 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1443                    const char **path)
1444 {
1445         int ret;
1446         int32_t res;
1447         TDB_DATA data;
1448
1449         data.dptr = (uint8_t *)&dbid;
1450         data.dsize = sizeof(dbid);
1451
1452         ret = ctdb_control(ctdb, destnode, 0, 
1453                            CTDB_CONTROL_GETDBPATH, 0, data, 
1454                            mem_ctx, &data, &res, &timeout, NULL);
1455         if (ret != 0 || res != 0) {
1456                 return -1;
1457         }
1458
1459         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1460         if ((*path) == NULL) {
1461                 return -1;
1462         }
1463
1464         talloc_free(data.dptr);
1465
1466         return 0;
1467 }
1468
1469 /*
1470   find the name of a db 
1471  */
1472 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1473                    const char **name)
1474 {
1475         int ret;
1476         int32_t res;
1477         TDB_DATA data;
1478
1479         data.dptr = (uint8_t *)&dbid;
1480         data.dsize = sizeof(dbid);
1481
1482         ret = ctdb_control(ctdb, destnode, 0, 
1483                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1484                            mem_ctx, &data, &res, &timeout, NULL);
1485         if (ret != 0 || res != 0) {
1486                 return -1;
1487         }
1488
1489         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1490         if ((*name) == NULL) {
1491                 return -1;
1492         }
1493
1494         talloc_free(data.dptr);
1495
1496         return 0;
1497 }
1498
1499 /*
1500   get the health status of a db
1501  */
1502 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1503                           struct timeval timeout,
1504                           uint32_t destnode,
1505                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1506                           const char **reason)
1507 {
1508         int ret;
1509         int32_t res;
1510         TDB_DATA data;
1511
1512         data.dptr = (uint8_t *)&dbid;
1513         data.dsize = sizeof(dbid);
1514
1515         ret = ctdb_control(ctdb, destnode, 0,
1516                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1517                            mem_ctx, &data, &res, &timeout, NULL);
1518         if (ret != 0 || res != 0) {
1519                 return -1;
1520         }
1521
1522         if (data.dsize == 0) {
1523                 (*reason) = NULL;
1524                 return 0;
1525         }
1526
1527         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1528         if ((*reason) == NULL) {
1529                 return -1;
1530         }
1531
1532         talloc_free(data.dptr);
1533
1534         return 0;
1535 }
1536
1537 /*
1538   create a database
1539  */
1540 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1541                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1542 {
1543         int ret;
1544         int32_t res;
1545         TDB_DATA data;
1546
1547         data.dptr = discard_const(name);
1548         data.dsize = strlen(name)+1;
1549
1550         ret = ctdb_control(ctdb, destnode, 0, 
1551                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1552                            0, data, 
1553                            mem_ctx, &data, &res, &timeout, NULL);
1554
1555         if (ret != 0 || res != 0) {
1556                 return -1;
1557         }
1558
1559         return 0;
1560 }
1561
1562 /*
1563   get debug level on a node
1564  */
1565 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1566 {
1567         int ret;
1568         int32_t res;
1569         TDB_DATA data;
1570
1571         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1572                            ctdb, &data, &res, NULL, NULL);
1573         if (ret != 0 || res != 0) {
1574                 return -1;
1575         }
1576         if (data.dsize != sizeof(int32_t)) {
1577                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1578                          (unsigned)data.dsize));
1579                 return -1;
1580         }
1581         *level = *(int32_t *)data.dptr;
1582         talloc_free(data.dptr);
1583         return 0;
1584 }
1585
1586 /*
1587   set debug level on a node
1588  */
1589 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1590 {
1591         int ret;
1592         int32_t res;
1593         TDB_DATA data;
1594
1595         data.dptr = (uint8_t *)&level;
1596         data.dsize = sizeof(level);
1597
1598         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1599                            NULL, NULL, &res, NULL, NULL);
1600         if (ret != 0 || res != 0) {
1601                 return -1;
1602         }
1603         return 0;
1604 }
1605
1606
1607 /*
1608   get a list of connected nodes
1609  */
1610 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1611                                 struct timeval timeout,
1612                                 TALLOC_CTX *mem_ctx,
1613                                 uint32_t *num_nodes)
1614 {
1615         struct ctdb_node_map *map=NULL;
1616         int ret, i;
1617         uint32_t *nodes;
1618
1619         *num_nodes = 0;
1620
1621         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1622         if (ret != 0) {
1623                 return NULL;
1624         }
1625
1626         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1627         if (nodes == NULL) {
1628                 return NULL;
1629         }
1630
1631         for (i=0;i<map->num;i++) {
1632                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1633                         nodes[*num_nodes] = map->nodes[i].pnn;
1634                         (*num_nodes)++;
1635                 }
1636         }
1637
1638         return nodes;
1639 }
1640
1641
1642 /*
1643   reset remote status
1644  */
1645 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1646 {
1647         int ret;
1648         int32_t res;
1649
1650         ret = ctdb_control(ctdb, destnode, 0, 
1651                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1652                            NULL, NULL, &res, NULL, NULL);
1653         if (ret != 0 || res != 0) {
1654                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1655                 return -1;
1656         }
1657         return 0;
1658 }
1659
1660 /*
1661   this is the dummy null procedure that all databases support
1662 */
1663 static int ctdb_null_func(struct ctdb_call_info *call)
1664 {
1665         return 0;
1666 }
1667
1668 /*
1669   this is a plain fetch procedure that all databases support
1670 */
1671 static int ctdb_fetch_func(struct ctdb_call_info *call)
1672 {
1673         call->reply_data = &call->record_data;
1674         return 0;
1675 }
1676
1677 /*
1678   attach to a specific database - client call
1679 */
1680 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1681 {
1682         struct ctdb_db_context *ctdb_db;
1683         TDB_DATA data;
1684         int ret;
1685         int32_t res;
1686
1687         ctdb_db = ctdb_db_handle(ctdb, name);
1688         if (ctdb_db) {
1689                 return ctdb_db;
1690         }
1691
1692         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1693         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1694
1695         ctdb_db->ctdb = ctdb;
1696         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1697         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1698
1699         data.dptr = discard_const(name);
1700         data.dsize = strlen(name)+1;
1701
1702         /* tell ctdb daemon to attach */
1703         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1704                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1705                            0, data, ctdb_db, &data, &res, NULL, NULL);
1706         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1707                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1708                 talloc_free(ctdb_db);
1709                 return NULL;
1710         }
1711         
1712         ctdb_db->db_id = *(uint32_t *)data.dptr;
1713         talloc_free(data.dptr);
1714
1715         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1718                 talloc_free(ctdb_db);
1719                 return NULL;
1720         }
1721
1722         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1723         if (ctdb->valgrinding) {
1724                 tdb_flags |= TDB_NOMMAP;
1725         }
1726         tdb_flags |= TDB_DISALLOW_NESTING;
1727
1728         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1729         if (ctdb_db->ltdb == NULL) {
1730                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1731                 talloc_free(ctdb_db);
1732                 return NULL;
1733         }
1734
1735         ctdb_db->persistent = persistent;
1736
1737         DLIST_ADD(ctdb->db_list, ctdb_db);
1738
1739         /* add well known functions */
1740         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1741         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1742
1743         return ctdb_db;
1744 }
1745
1746
1747 /*
1748   setup a call for a database
1749  */
1750 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1751 {
1752         struct ctdb_registered_call *call;
1753
1754 #if 0
1755         TDB_DATA data;
1756         int32_t status;
1757         struct ctdb_control_set_call c;
1758         int ret;
1759
1760         /* this is no longer valid with the separate daemon architecture */
1761         c.db_id = ctdb_db->db_id;
1762         c.fn    = fn;
1763         c.id    = id;
1764
1765         data.dptr = (uint8_t *)&c;
1766         data.dsize = sizeof(c);
1767
1768         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1769                            data, NULL, NULL, &status, NULL, NULL);
1770         if (ret != 0 || status != 0) {
1771                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1772                 return -1;
1773         }
1774 #endif
1775
1776         /* also register locally */
1777         call = talloc(ctdb_db, struct ctdb_registered_call);
1778         call->fn = fn;
1779         call->id = id;
1780
1781         DLIST_ADD(ctdb_db->calls, call);        
1782         return 0;
1783 }
1784
1785
1786 struct traverse_state {
1787         bool done;
1788         uint32_t count;
1789         ctdb_traverse_func fn;
1790         void *private_data;
1791         bool listemptyrecords;
1792 };
1793
1794 /*
1795   called on each key during a ctdb_traverse
1796  */
1797 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1798 {
1799         struct traverse_state *state = (struct traverse_state *)p;
1800         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1801         TDB_DATA key;
1802
1803         if (data.dsize < sizeof(uint32_t) ||
1804             d->length != data.dsize) {
1805                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1806                 state->done = True;
1807                 return;
1808         }
1809
1810         key.dsize = d->keylen;
1811         key.dptr  = &d->data[0];
1812         data.dsize = d->datalen;
1813         data.dptr = &d->data[d->keylen];
1814
1815         if (key.dsize == 0 && data.dsize == 0) {
1816                 /* end of traverse */
1817                 state->done = True;
1818                 return;
1819         }
1820
1821         if (!state->listemptyrecords &&
1822             data.dsize == sizeof(struct ctdb_ltdb_header))
1823         {
1824                 /* empty records are deleted records in ctdb */
1825                 return;
1826         }
1827
1828         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1829                 state->done = True;
1830         }
1831
1832         state->count++;
1833 }
1834
1835 /**
1836  * start a cluster wide traverse, calling the supplied fn on each record
1837  * return the number of records traversed, or -1 on error
1838  *
1839  * Extendet variant with a flag to signal whether empty records should
1840  * be listed.
1841  */
1842 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
1843                              ctdb_traverse_func fn,
1844                              bool withemptyrecords,
1845                              void *private_data)
1846 {
1847         TDB_DATA data;
1848         struct ctdb_traverse_start_ext t;
1849         int32_t status;
1850         int ret;
1851         uint64_t srvid = (getpid() | 0xFLL<<60);
1852         struct traverse_state state;
1853
1854         state.done = False;
1855         state.count = 0;
1856         state.private_data = private_data;
1857         state.fn = fn;
1858         state.listemptyrecords = withemptyrecords;
1859
1860         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1861         if (ret != 0) {
1862                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1863                 return -1;
1864         }
1865
1866         t.db_id = ctdb_db->db_id;
1867         t.srvid = srvid;
1868         t.reqid = 0;
1869         t.withemptyrecords = withemptyrecords;
1870
1871         data.dptr = (uint8_t *)&t;
1872         data.dsize = sizeof(t);
1873
1874         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
1875                            data, NULL, NULL, &status, NULL, NULL);
1876         if (ret != 0 || status != 0) {
1877                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1878                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1879                 return -1;
1880         }
1881
1882         while (!state.done) {
1883                 event_loop_once(ctdb_db->ctdb->ev);
1884         }
1885
1886         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1887         if (ret != 0) {
1888                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1889                 return -1;
1890         }
1891
1892         return state.count;
1893 }
1894
1895 /**
1896  * start a cluster wide traverse, calling the supplied fn on each record
1897  * return the number of records traversed, or -1 on error
1898  *
1899  * Standard version which does not list the empty records:
1900  * These are considered deleted.
1901  */
1902 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1903 {
1904         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
1905 }
1906
1907 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1908 /*
1909   called on each key during a catdb
1910  */
1911 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1912 {
1913         int i;
1914         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
1915         FILE *f = c->f;
1916         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1917
1918         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1919         for (i=0;i<key.dsize;i++) {
1920                 if (ISASCII(key.dptr[i])) {
1921                         fprintf(f, "%c", key.dptr[i]);
1922                 } else {
1923                         fprintf(f, "\\%02X", key.dptr[i]);
1924                 }
1925         }
1926         fprintf(f, "\"\n");
1927
1928         fprintf(f, "dmaster: %u\n", h->dmaster);
1929         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1930
1931         if (c->printlmaster && ctdb->vnn_map != NULL) {
1932                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
1933         }
1934
1935         if (c->printhash) {
1936                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
1937                 fprintf(f, "jenkins hash: 0x%08x\n", (uint32_t)tdb_jenkins_hash(&key));
1938         }
1939
1940         if (c->printrecordflags) {
1941                 fprintf(f, "flags: 0x%08x", h->flags);
1942                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
1943                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
1944                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
1945                 fprintf(f, "\n");
1946         }
1947
1948         if (c->printdatasize) {
1949                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
1950         } else {
1951                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1952                 for (i=sizeof(*h);i<data.dsize;i++) {
1953                         if (ISASCII(data.dptr[i])) {
1954                                 fprintf(f, "%c", data.dptr[i]);
1955                         } else {
1956                                 fprintf(f, "\\%02X", data.dptr[i]);
1957                         }
1958                 }
1959                 fprintf(f, "\"\n");
1960         }
1961
1962         fprintf(f, "\n");
1963
1964         return 0;
1965 }
1966
1967 /*
1968   convenience function to list all keys to stdout
1969  */
1970 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
1971                  struct ctdb_dump_db_context *ctx)
1972 {
1973         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
1974                                  ctx->printemptyrecords, ctx);
1975 }
1976
1977 /*
1978   get the pid of a ctdb daemon
1979  */
1980 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1981 {
1982         int ret;
1983         int32_t res;
1984
1985         ret = ctdb_control(ctdb, destnode, 0, 
1986                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1987                            NULL, NULL, &res, &timeout, NULL);
1988         if (ret != 0) {
1989                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1990                 return -1;
1991         }
1992
1993         *pid = res;
1994
1995         return 0;
1996 }
1997
1998
1999 /*
2000   async freeze send control
2001  */
2002 struct ctdb_client_control_state *
2003 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2004 {
2005         return ctdb_control_send(ctdb, destnode, priority, 
2006                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2007                            mem_ctx, &timeout, NULL);
2008 }
2009
2010 /* 
2011    async freeze recv control
2012 */
2013 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2014 {
2015         int ret;
2016         int32_t res;
2017
2018         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2019         if ( (ret != 0) || (res != 0) ){
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2021                 return -1;
2022         }
2023
2024         return 0;
2025 }
2026
2027 /*
2028   freeze databases of a certain priority
2029  */
2030 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2031 {
2032         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2033         struct ctdb_client_control_state *state;
2034         int ret;
2035
2036         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2037         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2038         talloc_free(tmp_ctx);
2039
2040         return ret;
2041 }
2042
2043 /* Freeze all databases */
2044 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2045 {
2046         int i;
2047
2048         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2049                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2050                         return -1;
2051                 }
2052         }
2053         return 0;
2054 }
2055
2056 /*
2057   thaw databases of a certain priority
2058  */
2059 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2060 {
2061         int ret;
2062         int32_t res;
2063
2064         ret = ctdb_control(ctdb, destnode, priority, 
2065                            CTDB_CONTROL_THAW, 0, tdb_null, 
2066                            NULL, NULL, &res, &timeout, NULL);
2067         if (ret != 0 || res != 0) {
2068                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2069                 return -1;
2070         }
2071
2072         return 0;
2073 }
2074
2075 /* thaw all databases */
2076 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2077 {
2078         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2079 }
2080
2081 /*
2082   get pnn of a node, or -1
2083  */
2084 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2085 {
2086         int ret;
2087         int32_t res;
2088
2089         ret = ctdb_control(ctdb, destnode, 0, 
2090                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2091                            NULL, NULL, &res, &timeout, NULL);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2094                 return -1;
2095         }
2096
2097         return res;
2098 }
2099
2100 /*
2101   get the monitoring mode of a remote node
2102  */
2103 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2104 {
2105         int ret;
2106         int32_t res;
2107
2108         ret = ctdb_control(ctdb, destnode, 0, 
2109                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2110                            NULL, NULL, &res, &timeout, NULL);
2111         if (ret != 0) {
2112                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2113                 return -1;
2114         }
2115
2116         *monmode = res;
2117
2118         return 0;
2119 }
2120
2121
2122 /*
2123  set the monitoring mode of a remote node to active
2124  */
2125 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2126 {
2127         int ret;
2128         
2129
2130         ret = ctdb_control(ctdb, destnode, 0, 
2131                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2132                            NULL, NULL,NULL, &timeout, NULL);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2135                 return -1;
2136         }
2137
2138         
2139
2140         return 0;
2141 }
2142
2143 /*
2144   set the monitoring mode of a remote node to disable
2145  */
2146 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2147 {
2148         int ret;
2149         
2150
2151         ret = ctdb_control(ctdb, destnode, 0, 
2152                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2153                            NULL, NULL, NULL, &timeout, NULL);
2154         if (ret != 0) {
2155                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2156                 return -1;
2157         }
2158
2159         
2160
2161         return 0;
2162 }
2163
2164
2165
2166 /* 
2167   sent to a node to make it take over an ip address
2168 */
2169 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2170                           uint32_t destnode, struct ctdb_public_ip *ip)
2171 {
2172         TDB_DATA data;
2173         struct ctdb_public_ipv4 ipv4;
2174         int ret;
2175         int32_t res;
2176
2177         if (ip->addr.sa.sa_family == AF_INET) {
2178                 ipv4.pnn = ip->pnn;
2179                 ipv4.sin = ip->addr.ip;
2180
2181                 data.dsize = sizeof(ipv4);
2182                 data.dptr  = (uint8_t *)&ipv4;
2183
2184                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2185                            NULL, &res, &timeout, NULL);
2186         } else {
2187                 data.dsize = sizeof(*ip);
2188                 data.dptr  = (uint8_t *)ip;
2189
2190                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2191                            NULL, &res, &timeout, NULL);
2192         }
2193
2194         if (ret != 0 || res != 0) {
2195                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2196                 return -1;
2197         }
2198
2199         return 0;       
2200 }
2201
2202
2203 /* 
2204   sent to a node to make it release an ip address
2205 */
2206 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2207                          uint32_t destnode, struct ctdb_public_ip *ip)
2208 {
2209         TDB_DATA data;
2210         struct ctdb_public_ipv4 ipv4;
2211         int ret;
2212         int32_t res;
2213
2214         if (ip->addr.sa.sa_family == AF_INET) {
2215                 ipv4.pnn = ip->pnn;
2216                 ipv4.sin = ip->addr.ip;
2217
2218                 data.dsize = sizeof(ipv4);
2219                 data.dptr  = (uint8_t *)&ipv4;
2220
2221                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2222                                    NULL, &res, &timeout, NULL);
2223         } else {
2224                 data.dsize = sizeof(*ip);
2225                 data.dptr  = (uint8_t *)ip;
2226
2227                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2228                                    NULL, &res, &timeout, NULL);
2229         }
2230
2231         if (ret != 0 || res != 0) {
2232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2233                 return -1;
2234         }
2235
2236         return 0;       
2237 }
2238
2239
2240 /*
2241   get a tunable
2242  */
2243 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2244                           struct timeval timeout, 
2245                           uint32_t destnode,
2246                           const char *name, uint32_t *value)
2247 {
2248         struct ctdb_control_get_tunable *t;
2249         TDB_DATA data, outdata;
2250         int32_t res;
2251         int ret;
2252
2253         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2254         data.dptr  = talloc_size(ctdb, data.dsize);
2255         CTDB_NO_MEMORY(ctdb, data.dptr);
2256
2257         t = (struct ctdb_control_get_tunable *)data.dptr;
2258         t->length = strlen(name)+1;
2259         memcpy(t->name, name, t->length);
2260
2261         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2262                            &outdata, &res, &timeout, NULL);
2263         talloc_free(data.dptr);
2264         if (ret != 0 || res != 0) {
2265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2266                 return -1;
2267         }
2268
2269         if (outdata.dsize != sizeof(uint32_t)) {
2270                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2271                 talloc_free(outdata.dptr);
2272                 return -1;
2273         }
2274         
2275         *value = *(uint32_t *)outdata.dptr;
2276         talloc_free(outdata.dptr);
2277
2278         return 0;
2279 }
2280
2281 /*
2282   set a tunable
2283  */
2284 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2285                           struct timeval timeout, 
2286                           uint32_t destnode,
2287                           const char *name, uint32_t value)
2288 {
2289         struct ctdb_control_set_tunable *t;
2290         TDB_DATA data;
2291         int32_t res;
2292         int ret;
2293
2294         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2295         data.dptr  = talloc_size(ctdb, data.dsize);
2296         CTDB_NO_MEMORY(ctdb, data.dptr);
2297
2298         t = (struct ctdb_control_set_tunable *)data.dptr;
2299         t->length = strlen(name)+1;
2300         memcpy(t->name, name, t->length);
2301         t->value = value;
2302
2303         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2304                            NULL, &res, &timeout, NULL);
2305         talloc_free(data.dptr);
2306         if (ret != 0 || res != 0) {
2307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2308                 return -1;
2309         }
2310
2311         return 0;
2312 }
2313
2314 /*
2315   list tunables
2316  */
2317 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2318                             struct timeval timeout, 
2319                             uint32_t destnode,
2320                             TALLOC_CTX *mem_ctx,
2321                             const char ***list, uint32_t *count)
2322 {
2323         TDB_DATA outdata;
2324         int32_t res;
2325         int ret;
2326         struct ctdb_control_list_tunable *t;
2327         char *p, *s, *ptr;
2328
2329         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2330                            mem_ctx, &outdata, &res, &timeout, NULL);
2331         if (ret != 0 || res != 0) {
2332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2333                 return -1;
2334         }
2335
2336         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2337         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2338             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2339                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2340                 talloc_free(outdata.dptr);
2341                 return -1;              
2342         }
2343         
2344         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2345         CTDB_NO_MEMORY(ctdb, p);
2346
2347         talloc_free(outdata.dptr);
2348         
2349         (*list) = NULL;
2350         (*count) = 0;
2351
2352         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2353                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2354                 CTDB_NO_MEMORY(ctdb, *list);
2355                 (*list)[*count] = talloc_strdup(*list, s);
2356                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2357                 (*count)++;
2358         }
2359
2360         talloc_free(p);
2361
2362         return 0;
2363 }
2364
2365
2366 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2367                                    struct timeval timeout, uint32_t destnode,
2368                                    TALLOC_CTX *mem_ctx,
2369                                    uint32_t flags,
2370                                    struct ctdb_all_public_ips **ips)
2371 {
2372         int ret;
2373         TDB_DATA outdata;
2374         int32_t res;
2375
2376         ret = ctdb_control(ctdb, destnode, 0, 
2377                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2378                            mem_ctx, &outdata, &res, &timeout, NULL);
2379         if (ret == 0 && res == -1) {
2380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2381                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2382         }
2383         if (ret != 0 || res != 0) {
2384           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2385                 return -1;
2386         }
2387
2388         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2389         talloc_free(outdata.dptr);
2390                     
2391         return 0;
2392 }
2393
2394 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2395                              struct timeval timeout, uint32_t destnode,
2396                              TALLOC_CTX *mem_ctx,
2397                              struct ctdb_all_public_ips **ips)
2398 {
2399         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2400                                               destnode, mem_ctx,
2401                                               0, ips);
2402 }
2403
2404 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2405                         struct timeval timeout, uint32_t destnode, 
2406                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2407 {
2408         int ret, i, len;
2409         TDB_DATA outdata;
2410         int32_t res;
2411         struct ctdb_all_public_ipsv4 *ipsv4;
2412
2413         ret = ctdb_control(ctdb, destnode, 0, 
2414                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2415                            mem_ctx, &outdata, &res, &timeout, NULL);
2416         if (ret != 0 || res != 0) {
2417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2418                 return -1;
2419         }
2420
2421         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2422         len = offsetof(struct ctdb_all_public_ips, ips) +
2423                 ipsv4->num*sizeof(struct ctdb_public_ip);
2424         *ips = talloc_zero_size(mem_ctx, len);
2425         CTDB_NO_MEMORY(ctdb, *ips);
2426         (*ips)->num = ipsv4->num;
2427         for (i=0; i<ipsv4->num; i++) {
2428                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2429                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2430         }
2431
2432         talloc_free(outdata.dptr);
2433                     
2434         return 0;
2435 }
2436
2437 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2438                                  struct timeval timeout, uint32_t destnode,
2439                                  TALLOC_CTX *mem_ctx,
2440                                  const ctdb_sock_addr *addr,
2441                                  struct ctdb_control_public_ip_info **_info)
2442 {
2443         int ret;
2444         TDB_DATA indata;
2445         TDB_DATA outdata;
2446         int32_t res;
2447         struct ctdb_control_public_ip_info *info;
2448         uint32_t len;
2449         uint32_t i;
2450
2451         indata.dptr = discard_const_p(uint8_t, addr);
2452         indata.dsize = sizeof(*addr);
2453
2454         ret = ctdb_control(ctdb, destnode, 0,
2455                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2456                            mem_ctx, &outdata, &res, &timeout, NULL);
2457         if (ret != 0 || res != 0) {
2458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2459                                 "failed ret:%d res:%d\n",
2460                                 ret, res));
2461                 return -1;
2462         }
2463
2464         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2465         if (len > outdata.dsize) {
2466                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2467                                 "returned invalid data with size %u > %u\n",
2468                                 (unsigned int)outdata.dsize,
2469                                 (unsigned int)len));
2470                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2471                 return -1;
2472         }
2473
2474         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2475         len += info->num*sizeof(struct ctdb_control_iface_info);
2476
2477         if (len > outdata.dsize) {
2478                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2479                                 "returned invalid data with size %u > %u\n",
2480                                 (unsigned int)outdata.dsize,
2481                                 (unsigned int)len));
2482                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2483                 return -1;
2484         }
2485
2486         /* make sure we null terminate the returned strings */
2487         for (i=0; i < info->num; i++) {
2488                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2489         }
2490
2491         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2492                                                                 outdata.dptr,
2493                                                                 outdata.dsize);
2494         talloc_free(outdata.dptr);
2495         if (*_info == NULL) {
2496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2497                                 "talloc_memdup size %u failed\n",
2498                                 (unsigned int)outdata.dsize));
2499                 return -1;
2500         }
2501
2502         return 0;
2503 }
2504
2505 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2506                          struct timeval timeout, uint32_t destnode,
2507                          TALLOC_CTX *mem_ctx,
2508                          struct ctdb_control_get_ifaces **_ifaces)
2509 {
2510         int ret;
2511         TDB_DATA outdata;
2512         int32_t res;
2513         struct ctdb_control_get_ifaces *ifaces;
2514         uint32_t len;
2515         uint32_t i;
2516
2517         ret = ctdb_control(ctdb, destnode, 0,
2518                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2519                            mem_ctx, &outdata, &res, &timeout, NULL);
2520         if (ret != 0 || res != 0) {
2521                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2522                                 "failed ret:%d res:%d\n",
2523                                 ret, res));
2524                 return -1;
2525         }
2526
2527         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2528         if (len > outdata.dsize) {
2529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2530                                 "returned invalid data with size %u > %u\n",
2531                                 (unsigned int)outdata.dsize,
2532                                 (unsigned int)len));
2533                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2534                 return -1;
2535         }
2536
2537         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2538         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2539
2540         if (len > outdata.dsize) {
2541                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2542                                 "returned invalid data with size %u > %u\n",
2543                                 (unsigned int)outdata.dsize,
2544                                 (unsigned int)len));
2545                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2546                 return -1;
2547         }
2548
2549         /* make sure we null terminate the returned strings */
2550         for (i=0; i < ifaces->num; i++) {
2551                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2552         }
2553
2554         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2555                                                                   outdata.dptr,
2556                                                                   outdata.dsize);
2557         talloc_free(outdata.dptr);
2558         if (*_ifaces == NULL) {
2559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2560                                 "talloc_memdup size %u failed\n",
2561                                 (unsigned int)outdata.dsize));
2562                 return -1;
2563         }
2564
2565         return 0;
2566 }
2567
2568 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2569                              struct timeval timeout, uint32_t destnode,
2570                              TALLOC_CTX *mem_ctx,
2571                              const struct ctdb_control_iface_info *info)
2572 {
2573         int ret;
2574         TDB_DATA indata;
2575         int32_t res;
2576
2577         indata.dptr = discard_const_p(uint8_t, info);
2578         indata.dsize = sizeof(*info);
2579
2580         ret = ctdb_control(ctdb, destnode, 0,
2581                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2582                            mem_ctx, NULL, &res, &timeout, NULL);
2583         if (ret != 0 || res != 0) {
2584                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2585                                 "failed ret:%d res:%d\n",
2586                                 ret, res));
2587                 return -1;
2588         }
2589
2590         return 0;
2591 }
2592
2593 /*
2594   set/clear the permanent disabled bit on a remote node
2595  */
2596 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2597                        uint32_t set, uint32_t clear)
2598 {
2599         int ret;
2600         TDB_DATA data;
2601         struct ctdb_node_map *nodemap=NULL;
2602         struct ctdb_node_flag_change c;
2603         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2604         uint32_t recmaster;
2605         uint32_t *nodes;
2606
2607
2608         /* find the recovery master */
2609         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2610         if (ret != 0) {
2611                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2612                 talloc_free(tmp_ctx);
2613                 return ret;
2614         }
2615
2616
2617         /* read the node flags from the recmaster */
2618         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2619         if (ret != 0) {
2620                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2621                 talloc_free(tmp_ctx);
2622                 return -1;
2623         }
2624         if (destnode >= nodemap->num) {
2625                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2626                 talloc_free(tmp_ctx);
2627                 return -1;
2628         }
2629
2630         c.pnn       = destnode;
2631         c.old_flags = nodemap->nodes[destnode].flags;
2632         c.new_flags = c.old_flags;
2633         c.new_flags |= set;
2634         c.new_flags &= ~clear;
2635
2636         data.dsize = sizeof(c);
2637         data.dptr = (unsigned char *)&c;
2638
2639         /* send the flags update to all connected nodes */
2640         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2641
2642         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2643                                         nodes, 0,
2644                                         timeout, false, data,
2645                                         NULL, NULL,
2646                                         NULL) != 0) {
2647                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2648
2649                 talloc_free(tmp_ctx);
2650                 return -1;
2651         }
2652
2653         talloc_free(tmp_ctx);
2654         return 0;
2655 }
2656
2657
2658 /*
2659   get all tunables
2660  */
2661 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2662                                struct timeval timeout, 
2663                                uint32_t destnode,
2664                                struct ctdb_tunable *tunables)
2665 {
2666         TDB_DATA outdata;
2667         int ret;
2668         int32_t res;
2669
2670         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2671                            &outdata, &res, &timeout, NULL);
2672         if (ret != 0 || res != 0) {
2673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2674                 return -1;
2675         }
2676
2677         if (outdata.dsize != sizeof(*tunables)) {
2678                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2679                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2680                 return -1;              
2681         }
2682
2683         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2684         talloc_free(outdata.dptr);
2685         return 0;
2686 }
2687
2688 /*
2689   add a public address to a node
2690  */
2691 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2692                       struct timeval timeout, 
2693                       uint32_t destnode,
2694                       struct ctdb_control_ip_iface *pub)
2695 {
2696         TDB_DATA data;
2697         int32_t res;
2698         int ret;
2699
2700         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2701         data.dptr  = (unsigned char *)pub;
2702
2703         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2704                            NULL, &res, &timeout, NULL);
2705         if (ret != 0 || res != 0) {
2706                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2707                 return -1;
2708         }
2709
2710         return 0;
2711 }
2712
2713 /*
2714   delete a public address from a node
2715  */
2716 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2717                       struct timeval timeout, 
2718                       uint32_t destnode,
2719                       struct ctdb_control_ip_iface *pub)
2720 {
2721         TDB_DATA data;
2722         int32_t res;
2723         int ret;
2724
2725         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2726         data.dptr  = (unsigned char *)pub;
2727
2728         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2729                            NULL, &res, &timeout, NULL);
2730         if (ret != 0 || res != 0) {
2731                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2732                 return -1;
2733         }
2734
2735         return 0;
2736 }
2737
2738 /*
2739   kill a tcp connection
2740  */
2741 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2742                       struct timeval timeout, 
2743                       uint32_t destnode,
2744                       struct ctdb_control_killtcp *killtcp)
2745 {
2746         TDB_DATA data;
2747         int32_t res;
2748         int ret;
2749
2750         data.dsize = sizeof(struct ctdb_control_killtcp);
2751         data.dptr  = (unsigned char *)killtcp;
2752
2753         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2754                            NULL, &res, &timeout, NULL);
2755         if (ret != 0 || res != 0) {
2756                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2757                 return -1;
2758         }
2759
2760         return 0;
2761 }
2762
2763 /*
2764   send a gratious arp
2765  */
2766 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2767                       struct timeval timeout, 
2768                       uint32_t destnode,
2769                       ctdb_sock_addr *addr,
2770                       const char *ifname)
2771 {
2772         TDB_DATA data;
2773         int32_t res;
2774         int ret, len;
2775         struct ctdb_control_gratious_arp *gratious_arp;
2776         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2777
2778
2779         len = strlen(ifname)+1;
2780         gratious_arp = talloc_size(tmp_ctx, 
2781                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2782         CTDB_NO_MEMORY(ctdb, gratious_arp);
2783
2784         gratious_arp->addr = *addr;
2785         gratious_arp->len = len;
2786         memcpy(&gratious_arp->iface[0], ifname, len);
2787
2788
2789         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2790         data.dptr  = (unsigned char *)gratious_arp;
2791
2792         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2793                            NULL, &res, &timeout, NULL);
2794         if (ret != 0 || res != 0) {
2795                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2796                 talloc_free(tmp_ctx);
2797                 return -1;
2798         }
2799
2800         talloc_free(tmp_ctx);
2801         return 0;
2802 }
2803
2804 /*
2805   get a list of all tcp tickles that a node knows about for a particular vnn
2806  */
2807 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2808                               struct timeval timeout, uint32_t destnode, 
2809                               TALLOC_CTX *mem_ctx, 
2810                               ctdb_sock_addr *addr,
2811                               struct ctdb_control_tcp_tickle_list **list)
2812 {
2813         int ret;
2814         TDB_DATA data, outdata;
2815         int32_t status;
2816
2817         data.dptr = (uint8_t*)addr;
2818         data.dsize = sizeof(ctdb_sock_addr);
2819
2820         ret = ctdb_control(ctdb, destnode, 0, 
2821                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2822                            mem_ctx, &outdata, &status, NULL, NULL);
2823         if (ret != 0 || status != 0) {
2824                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2825                 return -1;
2826         }
2827
2828         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2829
2830         return status;
2831 }
2832
2833 /*
2834   register a server id
2835  */
2836 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2837                       struct timeval timeout, 
2838                       struct ctdb_server_id *id)
2839 {
2840         TDB_DATA data;
2841         int32_t res;
2842         int ret;
2843
2844         data.dsize = sizeof(struct ctdb_server_id);
2845         data.dptr  = (unsigned char *)id;
2846
2847         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2848                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2849                         0, data, NULL,
2850                         NULL, &res, &timeout, NULL);
2851         if (ret != 0 || res != 0) {
2852                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2853                 return -1;
2854         }
2855
2856         return 0;
2857 }
2858
2859 /*
2860   unregister a server id
2861  */
2862 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2863                       struct timeval timeout, 
2864                       struct ctdb_server_id *id)
2865 {
2866         TDB_DATA data;
2867         int32_t res;
2868         int ret;
2869
2870         data.dsize = sizeof(struct ctdb_server_id);
2871         data.dptr  = (unsigned char *)id;
2872
2873         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2874                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2875                         0, data, NULL,
2876                         NULL, &res, &timeout, NULL);
2877         if (ret != 0 || res != 0) {
2878                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2879                 return -1;
2880         }
2881
2882         return 0;
2883 }
2884
2885
2886 /*
2887   check if a server id exists
2888
2889   if a server id does exist, return *status == 1, otherwise *status == 0
2890  */
2891 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2892                       struct timeval timeout, 
2893                       uint32_t destnode,
2894                       struct ctdb_server_id *id,
2895                       uint32_t *status)
2896 {
2897         TDB_DATA data;
2898         int32_t res;
2899         int ret;
2900
2901         data.dsize = sizeof(struct ctdb_server_id);
2902         data.dptr  = (unsigned char *)id;
2903
2904         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2905                         0, data, NULL,
2906                         NULL, &res, &timeout, NULL);
2907         if (ret != 0) {
2908                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2909                 return -1;
2910         }
2911
2912         if (res) {
2913                 *status = 1;
2914         } else {
2915                 *status = 0;
2916         }
2917
2918         return 0;
2919 }
2920
2921 /*
2922    get the list of server ids that are registered on a node
2923 */
2924 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2925                 TALLOC_CTX *mem_ctx,
2926                 struct timeval timeout, uint32_t destnode, 
2927                 struct ctdb_server_id_list **svid_list)
2928 {
2929         int ret;
2930         TDB_DATA outdata;
2931         int32_t res;
2932
2933         ret = ctdb_control(ctdb, destnode, 0, 
2934                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2935                            mem_ctx, &outdata, &res, &timeout, NULL);
2936         if (ret != 0 || res != 0) {
2937                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2938                 return -1;
2939         }
2940
2941         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2942                     
2943         return 0;
2944 }
2945
2946 /*
2947   initialise the ctdb daemon for client applications
2948
2949   NOTE: In current code the daemon does not fork. This is for testing purposes only
2950   and to simplify the code.
2951 */
2952 struct ctdb_context *ctdb_init(struct event_context *ev)
2953 {
2954         int ret;
2955         struct ctdb_context *ctdb;
2956
2957         ctdb = talloc_zero(ev, struct ctdb_context);
2958         if (ctdb == NULL) {
2959                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2960                 return NULL;
2961         }
2962         ctdb->ev  = ev;
2963         ctdb->idr = idr_init(ctdb);
2964         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2965
2966         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2967         if (ret != 0) {
2968                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2969                 talloc_free(ctdb);
2970                 return NULL;
2971         }
2972
2973         return ctdb;
2974 }
2975
2976
2977 /*
2978   set some ctdb flags
2979 */
2980 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2981 {
2982         ctdb->flags |= flags;
2983 }
2984
2985 /*
2986   setup the local socket name
2987 */
2988 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2989 {
2990         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2991         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2992
2993         return 0;
2994 }
2995
2996 /*
2997   return the pnn of this node
2998 */
2999 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3000 {
3001         return ctdb->pnn;
3002 }
3003
3004
3005 /*
3006   get the uptime of a remote node
3007  */
3008 struct ctdb_client_control_state *
3009 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3010 {
3011         return ctdb_control_send(ctdb, destnode, 0, 
3012                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3013                            mem_ctx, &timeout, NULL);
3014 }
3015
3016 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3017 {
3018         int ret;
3019         int32_t res;
3020         TDB_DATA outdata;
3021
3022         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3023         if (ret != 0 || res != 0) {
3024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3025                 return -1;
3026         }
3027
3028         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3029
3030         return 0;
3031 }
3032
3033 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3034 {
3035         struct ctdb_client_control_state *state;
3036
3037         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3038         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3039 }
3040
3041 /*
3042   send a control to execute the "recovered" event script on a node
3043  */
3044 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3045 {
3046         int ret;
3047         int32_t status;
3048
3049         ret = ctdb_control(ctdb, destnode, 0, 
3050                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3051                            NULL, NULL, &status, &timeout, NULL);
3052         if (ret != 0 || status != 0) {
3053                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3054                 return -1;
3055         }
3056
3057         return 0;
3058 }
3059
3060 /* 
3061   callback for the async helpers used when sending the same control
3062   to multiple nodes in parallell.
3063 */
3064 static void async_callback(struct ctdb_client_control_state *state)
3065 {
3066         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3067         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3068         int ret;
3069         TDB_DATA outdata;
3070         int32_t res = -1;
3071         uint32_t destnode = state->c->hdr.destnode;
3072
3073         /* one more node has responded with recmode data */
3074         data->count--;
3075
3076         /* if we failed to push the db, then return an error and let
3077            the main loop try again.
3078         */
3079         if (state->state != CTDB_CONTROL_DONE) {
3080                 if ( !data->dont_log_errors) {
3081                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3082                 }
3083                 data->fail_count++;
3084                 if (data->fail_callback) {
3085                         data->fail_callback(ctdb, destnode, res, outdata,
3086                                         data->callback_data);
3087                 }
3088                 return;
3089         }
3090         
3091         state->async.fn = NULL;
3092
3093         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3094         if ((ret != 0) || (res != 0)) {
3095                 if ( !data->dont_log_errors) {
3096                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3097                 }
3098                 data->fail_count++;
3099                 if (data->fail_callback) {
3100                         data->fail_callback(ctdb, destnode, res, outdata,
3101                                         data->callback_data);
3102                 }
3103         }
3104         if ((ret == 0) && (data->callback != NULL)) {
3105                 data->callback(ctdb, destnode, res, outdata,
3106                                         data->callback_data);
3107         }
3108 }
3109
3110
3111 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3112 {
3113         /* set up the callback functions */
3114         state->async.fn = async_callback;
3115         state->async.private_data = data;
3116         
3117         /* one more control to wait for to complete */
3118         data->count++;
3119 }
3120
3121
3122 /* wait for up to the maximum number of seconds allowed
3123    or until all nodes we expect a response from has replied
3124 */
3125 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3126 {
3127         while (data->count > 0) {
3128                 event_loop_once(ctdb->ev);
3129         }
3130         if (data->fail_count != 0) {
3131                 if (!data->dont_log_errors) {
3132                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3133                                  data->fail_count));
3134                 }
3135                 return -1;
3136         }
3137         return 0;
3138 }
3139
3140
3141 /* 
3142    perform a simple control on the listed nodes
3143    The control cannot return data
3144  */
3145 int ctdb_client_async_control(struct ctdb_context *ctdb,
3146                                 enum ctdb_controls opcode,
3147                                 uint32_t *nodes,
3148                                 uint64_t srvid,
3149                                 struct timeval timeout,
3150                                 bool dont_log_errors,
3151                                 TDB_DATA data,
3152                                 client_async_callback client_callback,
3153                                 client_async_callback fail_callback,
3154                                 void *callback_data)
3155 {
3156         struct client_async_data *async_data;
3157         struct ctdb_client_control_state *state;
3158         int j, num_nodes;
3159
3160         async_data = talloc_zero(ctdb, struct client_async_data);
3161         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3162         async_data->dont_log_errors = dont_log_errors;
3163         async_data->callback = client_callback;
3164         async_data->fail_callback = fail_callback;
3165         async_data->callback_data = callback_data;
3166         async_data->opcode        = opcode;
3167
3168         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3169
3170         /* loop over all nodes and send an async control to each of them */
3171         for (j=0; j<num_nodes; j++) {
3172                 uint32_t pnn = nodes[j];
3173
3174                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3175                                           0, data, async_data, &timeout, NULL);
3176                 if (state == NULL) {
3177                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3178                         talloc_free(async_data);
3179                         return -1;
3180                 }
3181                 
3182                 ctdb_client_async_add(async_data, state);
3183         }
3184
3185         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3186                 talloc_free(async_data);
3187                 return -1;
3188         }
3189
3190         talloc_free(async_data);
3191         return 0;
3192 }
3193
3194 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3195                                 struct ctdb_vnn_map *vnn_map,
3196                                 TALLOC_CTX *mem_ctx,
3197                                 bool include_self)
3198 {
3199         int i, j, num_nodes;
3200         uint32_t *nodes;
3201
3202         for (i=num_nodes=0;i<vnn_map->size;i++) {
3203                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3204                         continue;
3205                 }
3206                 num_nodes++;
3207         } 
3208
3209         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3210         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3211
3212         for (i=j=0;i<vnn_map->size;i++) {
3213                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3214                         continue;
3215                 }
3216                 nodes[j++] = vnn_map->map[i];
3217         } 
3218
3219         return nodes;
3220 }
3221
3222 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3223                                 struct ctdb_node_map *node_map,
3224                                 TALLOC_CTX *mem_ctx,
3225                                 bool include_self)
3226 {
3227         int i, j, num_nodes;
3228         uint32_t *nodes;
3229
3230         for (i=num_nodes=0;i<node_map->num;i++) {
3231                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3232                         continue;
3233                 }
3234                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3235                         continue;
3236                 }
3237                 num_nodes++;
3238         } 
3239
3240         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3241         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3242
3243         for (i=j=0;i<node_map->num;i++) {
3244                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3245                         continue;
3246                 }
3247                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3248                         continue;
3249                 }
3250                 nodes[j++] = node_map->nodes[i].pnn;
3251         } 
3252
3253         return nodes;
3254 }
3255
3256 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3257                                 struct ctdb_node_map *node_map,
3258                                 TALLOC_CTX *mem_ctx,
3259                                 uint32_t pnn)
3260 {
3261         int i, j, num_nodes;
3262         uint32_t *nodes;
3263
3264         for (i=num_nodes=0;i<node_map->num;i++) {
3265                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3266                         continue;
3267                 }
3268                 if (node_map->nodes[i].pnn == pnn) {
3269                         continue;
3270                 }
3271                 num_nodes++;
3272         } 
3273
3274         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3275         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3276
3277         for (i=j=0;i<node_map->num;i++) {
3278                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3279                         continue;
3280                 }
3281                 if (node_map->nodes[i].pnn == pnn) {
3282                         continue;
3283                 }
3284                 nodes[j++] = node_map->nodes[i].pnn;
3285         } 
3286
3287         return nodes;
3288 }
3289
3290 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3291                                 struct ctdb_node_map *node_map,
3292                                 TALLOC_CTX *mem_ctx,
3293                                 bool include_self)
3294 {
3295         int i, j, num_nodes;
3296         uint32_t *nodes;
3297
3298         for (i=num_nodes=0;i<node_map->num;i++) {
3299                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3300                         continue;
3301                 }
3302                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3303                         continue;
3304                 }
3305                 num_nodes++;
3306         } 
3307
3308         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3309         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3310
3311         for (i=j=0;i<node_map->num;i++) {
3312                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3313                         continue;
3314                 }
3315                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3316                         continue;
3317                 }
3318                 nodes[j++] = node_map->nodes[i].pnn;
3319         } 
3320
3321         return nodes;
3322 }
3323
3324 /* 
3325   this is used to test if a pnn lock exists and if it exists will return
3326   the number of connections that pnn has reported or -1 if that recovery
3327   daemon is not running.
3328 */
3329 int
3330 ctdb_read_pnn_lock(int fd, int32_t pnn)
3331 {
3332         struct flock lock;
3333         char c;
3334
3335         lock.l_type = F_WRLCK;
3336         lock.l_whence = SEEK_SET;
3337         lock.l_start = pnn;
3338         lock.l_len = 1;
3339         lock.l_pid = 0;
3340
3341         if (fcntl(fd, F_GETLK, &lock) != 0) {
3342                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3343                 return -1;
3344         }
3345
3346         if (lock.l_type == F_UNLCK) {
3347                 return -1;
3348         }
3349
3350         if (pread(fd, &c, 1, pnn) == -1) {
3351                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3352                 return -1;
3353         }
3354
3355         return c;
3356 }
3357
3358 /*
3359   get capabilities of a remote node
3360  */
3361 struct ctdb_client_control_state *
3362 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3363 {
3364         return ctdb_control_send(ctdb, destnode, 0, 
3365                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3366                            mem_ctx, &timeout, NULL);
3367 }
3368
3369 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3370 {
3371         int ret;
3372         int32_t res;
3373         TDB_DATA outdata;
3374
3375         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3376         if ( (ret != 0) || (res != 0) ) {
3377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3378                 return -1;
3379         }
3380
3381         if (capabilities) {
3382                 *capabilities = *((uint32_t *)outdata.dptr);
3383         }
3384
3385         return 0;
3386 }
3387
3388 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3389 {
3390         struct ctdb_client_control_state *state;
3391         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3392         int ret;
3393
3394         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3395         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3396         talloc_free(tmp_ctx);
3397         return ret;
3398 }
3399
3400 /**
3401  * check whether a transaction is active on a given db on a given node
3402  */
3403 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3404                                      uint32_t destnode,
3405                                      uint32_t db_id)
3406 {
3407         int32_t status;
3408         int ret;
3409         TDB_DATA indata;
3410
3411         indata.dptr = (uint8_t *)&db_id;
3412         indata.dsize = sizeof(db_id);
3413
3414         ret = ctdb_control(ctdb, destnode, 0,
3415                            CTDB_CONTROL_TRANS2_ACTIVE,
3416                            0, indata, NULL, NULL, &status,
3417                            NULL, NULL);
3418
3419         if (ret != 0) {
3420                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3421                 return -1;
3422         }
3423
3424         return status;
3425 }
3426
3427
3428 struct ctdb_transaction_handle {
3429         struct ctdb_db_context *ctdb_db;
3430         bool in_replay;
3431         /*
3432          * we store the reads and writes done under a transaction:
3433          * - one list stores both reads and writes (m_all),
3434          * - the other just writes (m_write)
3435          */
3436         struct ctdb_marshall_buffer *m_all;
3437         struct ctdb_marshall_buffer *m_write;
3438 };
3439
3440 /* start a transaction on a database */
3441 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3442 {
3443         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3444         return 0;
3445 }
3446
3447 /* start a transaction on a database */
3448 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3449 {
3450         struct ctdb_record_handle *rh;
3451         TDB_DATA key;
3452         TDB_DATA data;
3453         struct ctdb_ltdb_header header;
3454         TALLOC_CTX *tmp_ctx;
3455         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3456         int ret;
3457         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3458         pid_t pid;
3459         int32_t status;
3460
3461         key.dptr = discard_const(keyname);
3462         key.dsize = strlen(keyname);
3463
3464         if (!ctdb_db->persistent) {
3465                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3466                 return -1;
3467         }
3468
3469 again:
3470         tmp_ctx = talloc_new(h);
3471
3472         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3473         if (rh == NULL) {
3474                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3475                 talloc_free(tmp_ctx);
3476                 return -1;
3477         }
3478
3479         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3480                                               CTDB_CURRENT_NODE,
3481                                               ctdb_db->db_id);
3482         if (status == 1) {
3483                 unsigned long int usec = (1000 + random()) % 100000;
3484                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3485                                     "on db_id[0x%08x]. waiting for %lu "
3486                                     "microseconds\n",
3487                                     ctdb_db->db_id, usec));
3488                 talloc_free(tmp_ctx);
3489                 usleep(usec);
3490                 goto again;
3491         }
3492
3493         /*
3494          * store the pid in the database:
3495          * it is not enough that the node is dmaster...
3496          */
3497         pid = getpid();
3498         data.dptr = (unsigned char *)&pid;
3499         data.dsize = sizeof(pid_t);
3500         rh->header.rsn++;
3501         rh->header.dmaster = ctdb_db->ctdb->pnn;
3502         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3503         if (ret != 0) {
3504                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3505                                   "transaction record\n"));
3506                 talloc_free(tmp_ctx);
3507                 return -1;
3508         }
3509
3510         talloc_free(rh);
3511
3512         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3513         if (ret != 0) {
3514                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3515                 talloc_free(tmp_ctx);
3516                 return -1;
3517         }
3518
3519         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3520         if (ret != 0) {
3521                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3522                                  "lock record inside transaction\n"));
3523                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3524                 talloc_free(tmp_ctx);
3525                 goto again;
3526         }
3527
3528         if (header.dmaster != ctdb_db->ctdb->pnn) {
3529                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3530                                    "transaction lock record\n"));
3531                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3532                 talloc_free(tmp_ctx);
3533                 goto again;
3534         }
3535
3536         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3537                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3538                                     "the transaction lock record\n"));
3539                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3540                 talloc_free(tmp_ctx);
3541                 goto again;
3542         }
3543
3544         talloc_free(tmp_ctx);
3545
3546         return 0;
3547 }
3548
3549
3550 /* start a transaction on a database */
3551 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3552                                                        TALLOC_CTX *mem_ctx)
3553 {
3554         struct ctdb_transaction_handle *h;
3555         int ret;
3556
3557         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3558         if (h == NULL) {
3559                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3560                 return NULL;
3561         }
3562
3563         h->ctdb_db = ctdb_db;
3564
3565         ret = ctdb_transaction_fetch_start(h);
3566         if (ret != 0) {
3567                 talloc_free(h);
3568                 return NULL;
3569         }
3570
3571         talloc_set_destructor(h, ctdb_transaction_destructor);
3572
3573         return h;
3574 }
3575
3576
3577
3578 /*
3579   fetch a record inside a transaction
3580  */
3581 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3582                            TALLOC_CTX *mem_ctx, 
3583                            TDB_DATA key, TDB_DATA *data)
3584 {
3585         struct ctdb_ltdb_header header;
3586         int ret;
3587
3588         ZERO_STRUCT(header);
3589
3590         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3591         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3592                 /* record doesn't exist yet */
3593                 *data = tdb_null;
3594                 ret = 0;
3595         }
3596         
3597         if (ret != 0) {
3598                 return ret;
3599         }
3600
3601         if (!h->in_replay) {
3602                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3603                 if (h->m_all == NULL) {
3604                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3605                         return -1;
3606                 }
3607         }
3608
3609         return 0;
3610 }
3611
3612 /*
3613   stores a record inside a transaction
3614  */
3615 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3616                            TDB_DATA key, TDB_DATA data)
3617 {
3618         TALLOC_CTX *tmp_ctx = talloc_new(h);
3619         struct ctdb_ltdb_header header;
3620         TDB_DATA olddata;
3621         int ret;
3622
3623         ZERO_STRUCT(header);
3624
3625         /* we need the header so we can update the RSN */
3626         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3627         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3628                 /* the record doesn't exist - create one with us as dmaster.
3629                    This is only safe because we are in a transaction and this
3630                    is a persistent database */
3631                 ZERO_STRUCT(header);
3632         } else if (ret != 0) {
3633                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3634                 talloc_free(tmp_ctx);
3635                 return ret;
3636         }
3637
3638         if (data.dsize == olddata.dsize &&
3639             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3640                 /* save writing the same data */
3641                 talloc_free(tmp_ctx);
3642                 return 0;
3643         }
3644
3645         header.dmaster = h->ctdb_db->ctdb->pnn;
3646         header.rsn++;
3647
3648         if (!h->in_replay) {
3649                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3650                 if (h->m_all == NULL) {
3651                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3652                         talloc_free(tmp_ctx);
3653                         return -1;
3654                 }
3655         }               
3656
3657         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3658         if (h->m_write == NULL) {
3659                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3660                 talloc_free(tmp_ctx);
3661                 return -1;
3662         }
3663         
3664         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3665
3666         talloc_free(tmp_ctx);
3667         
3668         return ret;
3669 }
3670
3671 /*
3672   replay a transaction
3673  */
3674 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3675 {
3676         int ret, i;
3677         struct ctdb_rec_data *rec = NULL;
3678
3679         h->in_replay = true;
3680         talloc_free(h->m_write);
3681         h->m_write = NULL;
3682
3683         ret = ctdb_transaction_fetch_start(h);
3684         if (ret != 0) {
3685                 return ret;
3686         }
3687
3688         for (i=0;i<h->m_all->count;i++) {
3689                 TDB_DATA key, data;
3690
3691                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3692                 if (rec == NULL) {
3693                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3694                         goto failed;
3695                 }
3696
3697                 if (rec->reqid == 0) {
3698                         /* its a store */
3699                         if (ctdb_transaction_store(h, key, data) != 0) {
3700                                 goto failed;
3701                         }
3702                 } else {
3703                         TDB_DATA data2;
3704                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3705
3706                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3707                                 talloc_free(tmp_ctx);
3708                                 goto failed;
3709                         }
3710                         if (data2.dsize != data.dsize ||
3711                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3712                                 /* the record has changed on us - we have to give up */
3713                                 talloc_free(tmp_ctx);
3714                                 goto failed;
3715                         }
3716                         talloc_free(tmp_ctx);
3717                 }
3718         }
3719         
3720         return 0;
3721
3722 failed:
3723         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3724         return -1;
3725 }
3726
3727
3728 /*
3729   commit a transaction
3730  */
3731 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3732 {
3733         int ret, retries=0;
3734         int32_t status;
3735         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3736         struct timeval timeout;
3737         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3738
3739         talloc_set_destructor(h, NULL);
3740
3741         /* our commit strategy is quite complex.
3742
3743            - we first try to commit the changes to all other nodes
3744
3745            - if that works, then we commit locally and we are done
3746
3747            - if a commit on another node fails, then we need to cancel
3748              the transaction, then restart the transaction (thus
3749              opening a window of time for a pending recovery to
3750              complete), then replay the transaction, checking all the
3751              reads and writes (checking that reads give the same data,
3752              and writes succeed). Then we retry the transaction to the
3753              other nodes
3754         */
3755
3756 again:
3757         if (h->m_write == NULL) {
3758                 /* no changes were made */
3759                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3760                 talloc_free(h);
3761                 return 0;
3762         }
3763
3764         /* tell ctdbd to commit to the other nodes */
3765         timeout = timeval_current_ofs(1, 0);
3766         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3767                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3768                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3769                            &timeout, NULL);
3770         if (ret != 0 || status != 0) {
3771                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3772                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3773                                      ", retrying after 1 second...\n",
3774                                      (retries==0)?"":"retry "));
3775                 sleep(1);
3776
3777                 if (ret != 0) {
3778                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3779                 } else {
3780                         /* work out what error code we will give if we 
3781                            have to fail the operation */
3782                         switch ((enum ctdb_trans2_commit_error)status) {
3783                         case CTDB_TRANS2_COMMIT_SUCCESS:
3784                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3785                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3786                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3787                                 break;
3788                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3789                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3790                                 break;
3791                         }
3792                 }
3793
3794                 if (++retries == 100) {
3795                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3796                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3797                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3798                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3799                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3800                         talloc_free(h);
3801                         return -1;
3802                 }               
3803
3804                 if (ctdb_replay_transaction(h) != 0) {
3805                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3806                                           "transaction on db 0x%08x, "
3807                                           "failure control =%u\n",
3808                                           h->ctdb_db->db_id,
3809                                           (unsigned)failure_control));
3810                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3811                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3812                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3813                         talloc_free(h);
3814                         return -1;
3815                 }
3816                 goto again;
3817         } else {
3818                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3819         }
3820
3821         /* do the real commit locally */
3822         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3823         if (ret != 0) {
3824                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3825                                   "on db id 0x%08x locally, "
3826                                   "failure_control=%u\n",
3827                                   h->ctdb_db->db_id,
3828                                   (unsigned)failure_control));
3829                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3830                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3831                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3832                 talloc_free(h);
3833                 return ret;
3834         }
3835
3836         /* tell ctdbd that we are finished with our local commit */
3837         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3838                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3839                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3840         talloc_free(h);
3841         return 0;
3842 }
3843
3844 /*
3845   recovery daemon ping to main daemon
3846  */
3847 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3848 {
3849         int ret;
3850         int32_t res;
3851
3852         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3853                            ctdb, NULL, &res, NULL, NULL);
3854         if (ret != 0 || res != 0) {
3855                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3856                 return -1;
3857         }
3858
3859         return 0;
3860 }
3861
3862 /* when forking the main daemon and the child process needs to connect back
3863  * to the daemon as a client process, this function can be used to change
3864  * the ctdb context from daemon into client mode
3865  */
3866 int switch_from_server_to_client(struct ctdb_context *ctdb)
3867 {
3868         int ret;
3869
3870         /* shutdown the transport */
3871         if (ctdb->methods) {
3872                 ctdb->methods->shutdown(ctdb);
3873         }
3874
3875         /* get a new event context */
3876         talloc_free(ctdb->ev);
3877         ctdb->ev = event_context_init(ctdb);
3878
3879         close(ctdb->daemon.sd);
3880         ctdb->daemon.sd = -1;
3881
3882         /* initialise ctdb */
3883         ret = ctdb_socket_connect(ctdb);
3884         if (ret != 0) {
3885                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3886                 return -1;
3887         }
3888
3889          return 0;
3890 }
3891
3892 /*
3893   get the status of running the monitor eventscripts: NULL means never run.
3894  */
3895 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3896                 struct timeval timeout, uint32_t destnode, 
3897                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3898                 struct ctdb_scripts_wire **script_status)
3899 {
3900         int ret;
3901         TDB_DATA outdata, indata;
3902         int32_t res;
3903         uint32_t uinttype = type;
3904
3905         indata.dptr = (uint8_t *)&uinttype;
3906         indata.dsize = sizeof(uinttype);
3907
3908         ret = ctdb_control(ctdb, destnode, 0, 
3909                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3910                            mem_ctx, &outdata, &res, &timeout, NULL);
3911         if (ret != 0 || res != 0) {
3912                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3913                 return -1;
3914         }
3915
3916         if (outdata.dsize == 0) {
3917                 *script_status = NULL;
3918         } else {
3919                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3920                 talloc_free(outdata.dptr);
3921         }
3922                     
3923         return 0;
3924 }
3925
3926 /*
3927   tell the main daemon how long it took to lock the reclock file
3928  */
3929 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3930 {
3931         int ret;
3932         int32_t res;
3933         TDB_DATA data;
3934
3935         data.dptr = (uint8_t *)&latency;
3936         data.dsize = sizeof(latency);
3937
3938         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3939                            ctdb, NULL, &res, NULL, NULL);
3940         if (ret != 0 || res != 0) {
3941                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3942                 return -1;
3943         }
3944
3945         return 0;
3946 }
3947
3948 /*
3949   get the name of the reclock file
3950  */
3951 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3952                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3953                          const char **name)
3954 {
3955         int ret;
3956         int32_t res;
3957         TDB_DATA data;
3958
3959         ret = ctdb_control(ctdb, destnode, 0, 
3960                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3961                            mem_ctx, &data, &res, &timeout, NULL);
3962         if (ret != 0 || res != 0) {
3963                 return -1;
3964         }
3965
3966         if (data.dsize == 0) {
3967                 *name = NULL;
3968         } else {
3969                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3970         }
3971         talloc_free(data.dptr);
3972
3973         return 0;
3974 }
3975
3976 /*
3977   set the reclock filename for a node
3978  */
3979 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3980 {
3981         int ret;
3982         TDB_DATA data;
3983         int32_t res;
3984
3985         if (reclock == NULL) {
3986                 data.dsize = 0;
3987                 data.dptr  = NULL;
3988         } else {
3989                 data.dsize = strlen(reclock) + 1;
3990                 data.dptr  = discard_const(reclock);
3991         }
3992
3993         ret = ctdb_control(ctdb, destnode, 0, 
3994                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3995                            NULL, NULL, &res, &timeout, NULL);
3996         if (ret != 0 || res != 0) {
3997                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3998                 return -1;
3999         }
4000
4001         return 0;
4002 }
4003
4004 /*
4005   stop a node
4006  */
4007 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4008 {
4009         int ret;
4010         int32_t res;
4011
4012         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4013                            ctdb, NULL, &res, &timeout, NULL);
4014         if (ret != 0 || res != 0) {
4015                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4016                 return -1;
4017         }
4018
4019         return 0;
4020 }
4021
4022 /*
4023   continue a node
4024  */
4025 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4026 {
4027         int ret;
4028
4029         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4030                            ctdb, NULL, NULL, &timeout, NULL);
4031         if (ret != 0) {
4032                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4033                 return -1;
4034         }
4035
4036         return 0;
4037 }
4038
4039 /*
4040   set the natgw state for a node
4041  */
4042 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4043 {
4044         int ret;
4045         TDB_DATA data;
4046         int32_t res;
4047
4048         data.dsize = sizeof(natgwstate);
4049         data.dptr  = (uint8_t *)&natgwstate;
4050
4051         ret = ctdb_control(ctdb, destnode, 0, 
4052                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4053                            NULL, NULL, &res, &timeout, NULL);
4054         if (ret != 0 || res != 0) {
4055                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4056                 return -1;
4057         }
4058
4059         return 0;
4060 }
4061
4062 /*
4063   set the lmaster role for a node
4064  */
4065 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4066 {
4067         int ret;
4068         TDB_DATA data;
4069         int32_t res;
4070
4071         data.dsize = sizeof(lmasterrole);
4072         data.dptr  = (uint8_t *)&lmasterrole;
4073
4074         ret = ctdb_control(ctdb, destnode, 0, 
4075                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4076                            NULL, NULL, &res, &timeout, NULL);
4077         if (ret != 0 || res != 0) {
4078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4079                 return -1;
4080         }
4081
4082         return 0;
4083 }
4084
4085 /*
4086   set the recmaster role for a node
4087  */
4088 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4089 {
4090         int ret;
4091         TDB_DATA data;
4092         int32_t res;
4093
4094         data.dsize = sizeof(recmasterrole);
4095         data.dptr  = (uint8_t *)&recmasterrole;
4096
4097         ret = ctdb_control(ctdb, destnode, 0, 
4098                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4099                            NULL, NULL, &res, &timeout, NULL);
4100         if (ret != 0 || res != 0) {
4101                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4102                 return -1;
4103         }
4104
4105         return 0;
4106 }
4107
4108 /* enable an eventscript
4109  */
4110 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4111 {
4112         int ret;
4113         TDB_DATA data;
4114         int32_t res;
4115
4116         data.dsize = strlen(script) + 1;
4117         data.dptr  = discard_const(script);
4118
4119         ret = ctdb_control(ctdb, destnode, 0, 
4120                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4121                            NULL, NULL, &res, &timeout, NULL);
4122         if (ret != 0 || res != 0) {
4123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4124                 return -1;
4125         }
4126
4127         return 0;
4128 }
4129
4130 /* disable an eventscript
4131  */
4132 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4133 {
4134         int ret;
4135         TDB_DATA data;
4136         int32_t res;
4137
4138         data.dsize = strlen(script) + 1;
4139         data.dptr  = discard_const(script);
4140
4141         ret = ctdb_control(ctdb, destnode, 0, 
4142                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4143                            NULL, NULL, &res, &timeout, NULL);
4144         if (ret != 0 || res != 0) {
4145                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4146                 return -1;
4147         }
4148
4149         return 0;
4150 }
4151
4152
4153 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4154 {
4155         int ret;
4156         TDB_DATA data;
4157         int32_t res;
4158
4159         data.dsize = sizeof(*bantime);
4160         data.dptr  = (uint8_t *)bantime;
4161
4162         ret = ctdb_control(ctdb, destnode, 0, 
4163                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4164                            NULL, NULL, &res, &timeout, NULL);
4165         if (ret != 0 || res != 0) {
4166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4167                 return -1;
4168         }
4169
4170         return 0;
4171 }
4172
4173
4174 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4175 {
4176         int ret;
4177         TDB_DATA outdata;
4178         int32_t res;
4179         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4180
4181         ret = ctdb_control(ctdb, destnode, 0, 
4182                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4183                            tmp_ctx, &outdata, &res, &timeout, NULL);
4184         if (ret != 0 || res != 0) {
4185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4186                 talloc_free(tmp_ctx);
4187                 return -1;
4188         }
4189
4190         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4191         talloc_free(tmp_ctx);
4192
4193         return 0;
4194 }
4195
4196
4197 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4198 {
4199         int ret;
4200         int32_t res;
4201         TDB_DATA data;
4202         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4203
4204         data.dptr = (uint8_t*)db_prio;
4205         data.dsize = sizeof(*db_prio);
4206
4207         ret = ctdb_control(ctdb, destnode, 0, 
4208                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4209                            tmp_ctx, NULL, &res, &timeout, NULL);
4210         if (ret != 0 || res != 0) {
4211                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4212                 talloc_free(tmp_ctx);
4213                 return -1;
4214         }
4215
4216         talloc_free(tmp_ctx);
4217
4218         return 0;
4219 }
4220
4221 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4222 {
4223         int ret;
4224         int32_t res;
4225         TDB_DATA data;
4226         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4227
4228         data.dptr = (uint8_t*)&db_id;
4229         data.dsize = sizeof(db_id);
4230
4231         ret = ctdb_control(ctdb, destnode, 0, 
4232                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4233                            tmp_ctx, NULL, &res, &timeout, NULL);
4234         if (ret != 0 || res < 0) {
4235                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4236                 talloc_free(tmp_ctx);
4237                 return -1;
4238         }
4239
4240         if (priority) {
4241                 *priority = res;
4242         }
4243
4244         talloc_free(tmp_ctx);
4245
4246         return 0;
4247 }
4248
4249 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4250 {
4251         if (h == NULL) {
4252                 return NULL;
4253         }
4254
4255         return &h->header;
4256 }