55c32735d91c0282b1fec0f9df5b6088f44c273e
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         /* we need to force the record to be written out if this was a remote access */
109         if (c->new_data == NULL) {
110                 c->new_data = &c->record_data;
111         }
112
113         if (c->new_data) {
114                 /* XXX check that we always have the lock here? */
115                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
117                         talloc_free(c);
118                         return -1;
119                 }
120         }
121
122         if (c->reply_data) {
123                 call->reply_data = *c->reply_data;
124
125                 talloc_steal(call, call->reply_data.dptr);
126                 talloc_set_name_const(call->reply_data.dptr, __location__);
127         } else {
128                 call->reply_data.dptr = NULL;
129                 call->reply_data.dsize = 0;
130         }
131         call->status = c->status;
132
133         talloc_free(c);
134
135         return 0;
136 }
137
138
139 /*
140   queue a packet for sending from client to daemon
141 */
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
143 {
144         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
145 }
146
147
148 /*
149   called when a CTDB_REPLY_CALL packet comes in in the client
150
151   This packet comes in response to a CTDB_REQ_CALL request packet. It
152   contains any reply data from the call
153 */
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
155 {
156         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157         struct ctdb_client_call_state *state;
158
159         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
160         if (state == NULL) {
161                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
162                 return;
163         }
164
165         if (hdr->reqid != state->reqid) {
166                 /* we found a record  but it was the wrong one */
167                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
168                 return;
169         }
170
171         state->call->reply_data.dptr = c->data;
172         state->call->reply_data.dsize = c->datalen;
173         state->call->status = c->status;
174
175         talloc_steal(state, c);
176
177         state->state = CTDB_CALL_DONE;
178
179         if (state->async.fn) {
180                 state->async.fn(state);
181         }
182 }
183
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
185
186 /*
187   this is called in the client, when data comes in from the daemon
188  */
189 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
190 {
191         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
193         TALLOC_CTX *tmp_ctx;
194
195         /* place the packet as a child of a tmp_ctx. We then use
196            talloc_free() below to free it. If any of the calls want
197            to keep it, then they will steal it somewhere else, and the
198            talloc_free() will be a no-op */
199         tmp_ctx = talloc_new(ctdb);
200         talloc_steal(tmp_ctx, hdr);
201
202         if (cnt == 0) {
203                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
204                 exit(0);
205         }
206
207         if (cnt < sizeof(*hdr)) {
208                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
209                 goto done;
210         }
211         if (cnt != hdr->length) {
212                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
213                                (unsigned)hdr->length, (unsigned)cnt);
214                 goto done;
215         }
216
217         if (hdr->ctdb_magic != CTDB_MAGIC) {
218                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
219                 goto done;
220         }
221
222         if (hdr->ctdb_version != CTDB_VERSION) {
223                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
224                 goto done;
225         }
226
227         switch (hdr->operation) {
228         case CTDB_REPLY_CALL:
229                 ctdb_client_reply_call(ctdb, hdr);
230                 break;
231
232         case CTDB_REQ_MESSAGE:
233                 ctdb_request_message(ctdb, hdr);
234                 break;
235
236         case CTDB_REPLY_CONTROL:
237                 ctdb_client_reply_control(ctdb, hdr);
238                 break;
239
240         default:
241                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
242         }
243
244 done:
245         talloc_free(tmp_ctx);
246 }
247
248 /*
249   connect with exponential backoff, thanks Stevens
250 */
251 #define CONNECT_MAXSLEEP 64
252 static int ctdb_connect_retry(struct ctdb_context *ctdb)
253 {
254         struct sockaddr_un addr;
255         int secs;
256         int ret = 0;
257
258         memset(&addr, 0, sizeof(addr));
259         addr.sun_family = AF_UNIX;
260         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
261
262         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
263                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
264                               sizeof(addr));
265                 if ((ret == 0) || (errno != EAGAIN)) {
266                         break;
267                 }
268
269                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
270                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
271                                          strerror(errno), secs));
272                         sleep(secs);
273                 }
274         }
275
276         return ret;
277 }
278
279 /*
280   connect to a unix domain socket
281 */
282 int ctdb_socket_connect(struct ctdb_context *ctdb)
283 {
284         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285         if (ctdb->daemon.sd == -1) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
287                 return -1;
288         }
289
290         set_nonblocking(ctdb->daemon.sd);
291         set_close_on_exec(ctdb->daemon.sd);
292
293         if (ctdb_connect_retry(ctdb) == -1) {
294                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
295                 close(ctdb->daemon.sd);
296                 ctdb->daemon.sd = -1;
297                 return -1;
298         }
299
300         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
301                                               CTDB_DS_ALIGNMENT, 
302                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
303         return 0;
304 }
305
306
307 struct ctdb_record_handle {
308         struct ctdb_db_context *ctdb_db;
309         TDB_DATA key;
310         TDB_DATA *data;
311         struct ctdb_ltdb_header header;
312 };
313
314
315 /*
316   make a recv call to the local ctdb daemon - called from client context
317
318   This is called when the program wants to wait for a ctdb_call to complete and get the 
319   results. This call will block unless the call has already completed.
320 */
321 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
322 {
323         if (state == NULL) {
324                 return -1;
325         }
326
327         while (state->state < CTDB_CALL_DONE) {
328                 event_loop_once(state->ctdb_db->ctdb->ev);
329         }
330         if (state->state != CTDB_CALL_DONE) {
331                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
332                 talloc_free(state);
333                 return -1;
334         }
335
336         if (state->call->reply_data.dsize) {
337                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
338                                                       state->call->reply_data.dptr,
339                                                       state->call->reply_data.dsize);
340                 call->reply_data.dsize = state->call->reply_data.dsize;
341         } else {
342                 call->reply_data.dptr = NULL;
343                 call->reply_data.dsize = 0;
344         }
345         call->status = state->call->status;
346         talloc_free(state);
347
348         return 0;
349 }
350
351
352
353
354 /*
355   destroy a ctdb_call in client
356 */
357 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
358 {
359         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
360         return 0;
361 }
362
363 /*
364   construct an event driven local ctdb_call
365
366   this is used so that locally processed ctdb_call requests are processed
367   in an event driven manner
368 */
369 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
370                                                                   struct ctdb_call *call,
371                                                                   struct ctdb_ltdb_header *header,
372                                                                   TDB_DATA *data)
373 {
374         struct ctdb_client_call_state *state;
375         struct ctdb_context *ctdb = ctdb_db->ctdb;
376         int ret;
377
378         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
379         CTDB_NO_MEMORY_NULL(ctdb, state);
380         state->call = talloc_zero(state, struct ctdb_call);
381         CTDB_NO_MEMORY_NULL(ctdb, state->call);
382
383         talloc_steal(state, data->dptr);
384
385         state->state   = CTDB_CALL_DONE;
386         *(state->call) = *call;
387         state->ctdb_db = ctdb_db;
388
389         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
390
391         return state;
392 }
393
394 /*
395   make a ctdb call to the local daemon - async send. Called from client context.
396
397   This constructs a ctdb_call request and queues it for processing. 
398   This call never blocks.
399 */
400 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
401                                               struct ctdb_call *call)
402 {
403         struct ctdb_client_call_state *state;
404         struct ctdb_context *ctdb = ctdb_db->ctdb;
405         struct ctdb_ltdb_header header;
406         TDB_DATA data;
407         int ret;
408         size_t len;
409         struct ctdb_req_call *c;
410
411         /* if the domain socket is not yet open, open it */
412         if (ctdb->daemon.sd==-1) {
413                 ctdb_socket_connect(ctdb);
414         }
415
416         ret = ctdb_ltdb_lock(ctdb_db, call->key);
417         if (ret != 0) {
418                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
419                 return NULL;
420         }
421
422         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
423
424         if (ret == 0 && header.dmaster == ctdb->pnn) {
425                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
426                 talloc_free(data.dptr);
427                 ctdb_ltdb_unlock(ctdb_db, call->key);
428                 return state;
429         }
430
431         ctdb_ltdb_unlock(ctdb_db, call->key);
432         talloc_free(data.dptr);
433
434         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
435         if (state == NULL) {
436                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
437                 return NULL;
438         }
439         state->call = talloc_zero(state, struct ctdb_call);
440         if (state->call == NULL) {
441                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
442                 return NULL;
443         }
444
445         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
446         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
447         if (c == NULL) {
448                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
449                 return NULL;
450         }
451
452         state->reqid     = ctdb_reqid_new(ctdb, state);
453         state->ctdb_db = ctdb_db;
454         talloc_set_destructor(state, ctdb_client_call_destructor);
455
456         c->hdr.reqid     = state->reqid;
457         c->flags         = call->flags;
458         c->db_id         = ctdb_db->db_id;
459         c->callid        = call->call_id;
460         c->hopcount      = 0;
461         c->keylen        = call->key.dsize;
462         c->calldatalen   = call->call_data.dsize;
463         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
464         memcpy(&c->data[call->key.dsize], 
465                call->call_data.dptr, call->call_data.dsize);
466         *(state->call)              = *call;
467         state->call->call_data.dptr = &c->data[call->key.dsize];
468         state->call->key.dptr       = &c->data[0];
469
470         state->state  = CTDB_CALL_WAIT;
471
472
473         ctdb_client_queue_pkt(ctdb, &c->hdr);
474
475         return state;
476 }
477
478
479 /*
480   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
481 */
482 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
483 {
484         struct ctdb_client_call_state *state;
485
486         state = ctdb_call_send(ctdb_db, call);
487         return ctdb_call_recv(state, call);
488 }
489
490
491 /*
492   tell the daemon what messaging srvid we will use, and register the message
493   handler function in the client
494 */
495 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
496                              ctdb_msg_fn_t handler,
497                              void *private_data)
498                                     
499 {
500         int res;
501         int32_t status;
502         
503         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
504                            tdb_null, NULL, NULL, &status, NULL, NULL);
505         if (res != 0 || status != 0) {
506                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
507                 return -1;
508         }
509
510         /* also need to register the handler with our own ctdb structure */
511         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
512 }
513
514 /*
515   tell the daemon we no longer want a srvid
516 */
517 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
518 {
519         int res;
520         int32_t status;
521         
522         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
523                            tdb_null, NULL, NULL, &status, NULL, NULL);
524         if (res != 0 || status != 0) {
525                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
526                 return -1;
527         }
528
529         /* also need to register the handler with our own ctdb structure */
530         ctdb_deregister_message_handler(ctdb, srvid, private_data);
531         return 0;
532 }
533
534
535 /*
536   send a message - from client context
537  */
538 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
539                       uint64_t srvid, TDB_DATA data)
540 {
541         struct ctdb_req_message *r;
542         int len, res;
543
544         len = offsetof(struct ctdb_req_message, data) + data.dsize;
545         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
546                                len, struct ctdb_req_message);
547         CTDB_NO_MEMORY(ctdb, r);
548
549         r->hdr.destnode  = pnn;
550         r->srvid         = srvid;
551         r->datalen       = data.dsize;
552         memcpy(&r->data[0], data.dptr, data.dsize);
553         
554         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
555         if (res != 0) {
556                 return res;
557         }
558
559         talloc_free(r);
560         return 0;
561 }
562
563
564 /*
565   cancel a ctdb_fetch_lock operation, releasing the lock
566  */
567 static int fetch_lock_destructor(struct ctdb_record_handle *h)
568 {
569         ctdb_ltdb_unlock(h->ctdb_db, h->key);
570         return 0;
571 }
572
573 /*
574   force the migration of a record to this node
575  */
576 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
577 {
578         struct ctdb_call call;
579         ZERO_STRUCT(call);
580         call.call_id = CTDB_NULL_FUNC;
581         call.key = key;
582         call.flags = CTDB_IMMEDIATE_MIGRATION;
583         return ctdb_call(ctdb_db, &call);
584 }
585
586 /*
587   get a lock on a record, and return the records data. Blocks until it gets the lock
588  */
589 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
590                                            TDB_DATA key, TDB_DATA *data)
591 {
592         int ret;
593         struct ctdb_record_handle *h;
594
595         /*
596           procedure is as follows:
597
598           1) get the chain lock. 
599           2) check if we are dmaster
600           3) if we are the dmaster then return handle 
601           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
602              reply from ctdbd
603           5) when we get the reply, goto (1)
604          */
605
606         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
607         if (h == NULL) {
608                 return NULL;
609         }
610
611         h->ctdb_db = ctdb_db;
612         h->key     = key;
613         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
614         if (h->key.dptr == NULL) {
615                 talloc_free(h);
616                 return NULL;
617         }
618         h->data    = data;
619
620         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
621                  (const char *)key.dptr));
622
623 again:
624         /* step 1 - get the chain lock */
625         ret = ctdb_ltdb_lock(ctdb_db, key);
626         if (ret != 0) {
627                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
628                 talloc_free(h);
629                 return NULL;
630         }
631
632         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
633
634         talloc_set_destructor(h, fetch_lock_destructor);
635
636         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
637
638         /* when torturing, ensure we test the remote path */
639         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
640             random() % 5 == 0) {
641                 h->header.dmaster = (uint32_t)-1;
642         }
643
644
645         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
646
647         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
648                 ctdb_ltdb_unlock(ctdb_db, key);
649                 ret = ctdb_client_force_migration(ctdb_db, key);
650                 if (ret != 0) {
651                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
652                         talloc_free(h);
653                         return NULL;
654                 }
655                 goto again;
656         }
657
658         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
659         return h;
660 }
661
662 /*
663   store some data to the record that was locked with ctdb_fetch_lock()
664 */
665 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
666 {
667         if (h->ctdb_db->persistent) {
668                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
669                 return -1;
670         }
671
672         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
673 }
674
675 /*
676   non-locking fetch of a record
677  */
678 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
679                TDB_DATA key, TDB_DATA *data)
680 {
681         struct ctdb_call call;
682         int ret;
683
684         call.call_id = CTDB_FETCH_FUNC;
685         call.call_data.dptr = NULL;
686         call.call_data.dsize = 0;
687
688         ret = ctdb_call(ctdb_db, &call);
689
690         if (ret == 0) {
691                 *data = call.reply_data;
692                 talloc_steal(mem_ctx, data->dptr);
693         }
694
695         return ret;
696 }
697
698
699
700 /*
701    called when a control completes or timesout to invoke the callback
702    function the user provided
703 */
704 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
705         struct timeval t, void *private_data)
706 {
707         struct ctdb_client_control_state *state;
708         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
709         int ret;
710
711         state = talloc_get_type(private_data, struct ctdb_client_control_state);
712         talloc_steal(tmp_ctx, state);
713
714         ret = ctdb_control_recv(state->ctdb, state, state,
715                         NULL, 
716                         NULL, 
717                         NULL);
718
719         talloc_free(tmp_ctx);
720 }
721
722 /*
723   called when a CTDB_REPLY_CONTROL packet comes in in the client
724
725   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
726   contains any reply data from the control
727 */
728 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
729                                       struct ctdb_req_header *hdr)
730 {
731         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
732         struct ctdb_client_control_state *state;
733
734         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
735         if (state == NULL) {
736                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
737                 return;
738         }
739
740         if (hdr->reqid != state->reqid) {
741                 /* we found a record  but it was the wrong one */
742                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
743                 return;
744         }
745
746         state->outdata.dptr = c->data;
747         state->outdata.dsize = c->datalen;
748         state->status = c->status;
749         if (c->errorlen) {
750                 state->errormsg = talloc_strndup(state, 
751                                                  (char *)&c->data[c->datalen], 
752                                                  c->errorlen);
753         }
754
755         /* state->outdata now uses resources from c so we dont want c
756            to just dissappear from under us while state is still alive
757         */
758         talloc_steal(state, c);
759
760         state->state = CTDB_CONTROL_DONE;
761
762         /* if we had a callback registered for this control, pull the response
763            and call the callback.
764         */
765         if (state->async.fn) {
766                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
767         }
768 }
769
770
771 /*
772   destroy a ctdb_control in client
773 */
774 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
775 {
776         ctdb_reqid_remove(state->ctdb, state->reqid);
777         return 0;
778 }
779
780
781 /* time out handler for ctdb_control */
782 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
783         struct timeval t, void *private_data)
784 {
785         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
786
787         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
788                          "dstnode:%u\n", state->reqid, state->c->opcode,
789                          state->c->hdr.destnode));
790
791         state->state = CTDB_CONTROL_TIMEOUT;
792
793         /* if we had a callback registered for this control, pull the response
794            and call the callback.
795         */
796         if (state->async.fn) {
797                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
798         }
799 }
800
801 /* async version of send control request */
802 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
803                 uint32_t destnode, uint64_t srvid, 
804                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
805                 TALLOC_CTX *mem_ctx,
806                 struct timeval *timeout,
807                 char **errormsg)
808 {
809         struct ctdb_client_control_state *state;
810         size_t len;
811         struct ctdb_req_control *c;
812         int ret;
813
814         if (errormsg) {
815                 *errormsg = NULL;
816         }
817
818         /* if the domain socket is not yet open, open it */
819         if (ctdb->daemon.sd==-1) {
820                 ctdb_socket_connect(ctdb);
821         }
822
823         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
824         CTDB_NO_MEMORY_NULL(ctdb, state);
825
826         state->ctdb       = ctdb;
827         state->reqid      = ctdb_reqid_new(ctdb, state);
828         state->state      = CTDB_CONTROL_WAIT;
829         state->errormsg   = NULL;
830
831         talloc_set_destructor(state, ctdb_control_destructor);
832
833         len = offsetof(struct ctdb_req_control, data) + data.dsize;
834         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
835                                len, struct ctdb_req_control);
836         state->c            = c;        
837         CTDB_NO_MEMORY_NULL(ctdb, c);
838         c->hdr.reqid        = state->reqid;
839         c->hdr.destnode     = destnode;
840         c->opcode           = opcode;
841         c->client_id        = 0;
842         c->flags            = flags;
843         c->srvid            = srvid;
844         c->datalen          = data.dsize;
845         if (data.dsize) {
846                 memcpy(&c->data[0], data.dptr, data.dsize);
847         }
848
849         /* timeout */
850         if (timeout && !timeval_is_zero(timeout)) {
851                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
852         }
853
854         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
855         if (ret != 0) {
856                 talloc_free(state);
857                 return NULL;
858         }
859
860         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
861                 talloc_free(state);
862                 return NULL;
863         }
864
865         return state;
866 }
867
868
869 /* async version of receive control reply */
870 int ctdb_control_recv(struct ctdb_context *ctdb, 
871                 struct ctdb_client_control_state *state, 
872                 TALLOC_CTX *mem_ctx,
873                 TDB_DATA *outdata, int32_t *status, char **errormsg)
874 {
875         TALLOC_CTX *tmp_ctx;
876
877         if (status != NULL) {
878                 *status = -1;
879         }
880         if (errormsg != NULL) {
881                 *errormsg = NULL;
882         }
883
884         if (state == NULL) {
885                 return -1;
886         }
887
888         /* prevent double free of state */
889         tmp_ctx = talloc_new(ctdb);
890         talloc_steal(tmp_ctx, state);
891
892         /* loop one event at a time until we either timeout or the control
893            completes.
894         */
895         while (state->state == CTDB_CONTROL_WAIT) {
896                 event_loop_once(ctdb->ev);
897         }
898
899         if (state->state != CTDB_CONTROL_DONE) {
900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
901                 if (state->async.fn) {
902                         state->async.fn(state);
903                 }
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         if (state->errormsg) {
909                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
910                 if (errormsg) {
911                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
912                 }
913                 if (state->async.fn) {
914                         state->async.fn(state);
915                 }
916                 talloc_free(tmp_ctx);
917                 return -1;
918         }
919
920         if (outdata) {
921                 *outdata = state->outdata;
922                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
923         }
924
925         if (status) {
926                 *status = state->status;
927         }
928
929         if (state->async.fn) {
930                 state->async.fn(state);
931         }
932
933         talloc_free(tmp_ctx);
934         return 0;
935 }
936
937
938
939 /*
940   send a ctdb control message
941   timeout specifies how long we should wait for a reply.
942   if timeout is NULL we wait indefinitely
943  */
944 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
945                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
946                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
947                  struct timeval *timeout,
948                  char **errormsg)
949 {
950         struct ctdb_client_control_state *state;
951
952         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
953                         flags, data, mem_ctx,
954                         timeout, errormsg);
955         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
956                         errormsg);
957 }
958
959
960
961
962 /*
963   a process exists call. Returns 0 if process exists, -1 otherwise
964  */
965 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
966 {
967         int ret;
968         TDB_DATA data;
969         int32_t status;
970
971         data.dptr = (uint8_t*)&pid;
972         data.dsize = sizeof(pid);
973
974         ret = ctdb_control(ctdb, destnode, 0, 
975                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
976                            NULL, NULL, &status, NULL, NULL);
977         if (ret != 0) {
978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
979                 return -1;
980         }
981
982         return status;
983 }
984
985 /*
986   get remote statistics
987  */
988 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
989 {
990         int ret;
991         TDB_DATA data;
992         int32_t res;
993
994         ret = ctdb_control(ctdb, destnode, 0, 
995                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
996                            ctdb, &data, &res, NULL, NULL);
997         if (ret != 0 || res != 0) {
998                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
999                 return -1;
1000         }
1001
1002         if (data.dsize != sizeof(struct ctdb_statistics)) {
1003                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1004                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1005                       return -1;
1006         }
1007
1008         *status = *(struct ctdb_statistics *)data.dptr;
1009         talloc_free(data.dptr);
1010                         
1011         return 0;
1012 }
1013
1014 /*
1015   shutdown a remote ctdb node
1016  */
1017 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1018 {
1019         struct ctdb_client_control_state *state;
1020
1021         state = ctdb_control_send(ctdb, destnode, 0, 
1022                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1023                            NULL, &timeout, NULL);
1024         if (state == NULL) {
1025                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1026                 return -1;
1027         }
1028
1029         return 0;
1030 }
1031
1032 /*
1033   get vnn map from a remote node
1034  */
1035 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1036 {
1037         int ret;
1038         TDB_DATA outdata;
1039         int32_t res;
1040         struct ctdb_vnn_map_wire *map;
1041
1042         ret = ctdb_control(ctdb, destnode, 0, 
1043                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1044                            mem_ctx, &outdata, &res, &timeout, NULL);
1045         if (ret != 0 || res != 0) {
1046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1047                 return -1;
1048         }
1049         
1050         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1051         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1052             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1053                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1054                 return -1;
1055         }
1056
1057         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1058         CTDB_NO_MEMORY(ctdb, *vnnmap);
1059         (*vnnmap)->generation = map->generation;
1060         (*vnnmap)->size       = map->size;
1061         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1062
1063         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1064         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1065         talloc_free(outdata.dptr);
1066                     
1067         return 0;
1068 }
1069
1070
1071 /*
1072   get the recovery mode of a remote node
1073  */
1074 struct ctdb_client_control_state *
1075 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1076 {
1077         return ctdb_control_send(ctdb, destnode, 0, 
1078                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1079                            mem_ctx, &timeout, NULL);
1080 }
1081
1082 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1083 {
1084         int ret;
1085         int32_t res;
1086
1087         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1088         if (ret != 0) {
1089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1090                 return -1;
1091         }
1092
1093         if (recmode) {
1094                 *recmode = (uint32_t)res;
1095         }
1096
1097         return 0;
1098 }
1099
1100 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1101 {
1102         struct ctdb_client_control_state *state;
1103
1104         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1105         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1106 }
1107
1108
1109
1110
1111 /*
1112   set the recovery mode of a remote node
1113  */
1114 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1115 {
1116         int ret;
1117         TDB_DATA data;
1118         int32_t res;
1119
1120         data.dsize = sizeof(uint32_t);
1121         data.dptr = (unsigned char *)&recmode;
1122
1123         ret = ctdb_control(ctdb, destnode, 0, 
1124                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1125                            NULL, NULL, &res, &timeout, NULL);
1126         if (ret != 0 || res != 0) {
1127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1128                 return -1;
1129         }
1130
1131         return 0;
1132 }
1133
1134
1135
1136 /*
1137   get the recovery master of a remote node
1138  */
1139 struct ctdb_client_control_state *
1140 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1141                         struct timeval timeout, uint32_t destnode)
1142 {
1143         return ctdb_control_send(ctdb, destnode, 0, 
1144                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1145                            mem_ctx, &timeout, NULL);
1146 }
1147
1148 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1149 {
1150         int ret;
1151         int32_t res;
1152
1153         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1154         if (ret != 0) {
1155                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1156                 return -1;
1157         }
1158
1159         if (recmaster) {
1160                 *recmaster = (uint32_t)res;
1161         }
1162
1163         return 0;
1164 }
1165
1166 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1167 {
1168         struct ctdb_client_control_state *state;
1169
1170         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1171         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1172 }
1173
1174
1175 /*
1176   set the recovery master of a remote node
1177  */
1178 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1179 {
1180         int ret;
1181         TDB_DATA data;
1182         int32_t res;
1183
1184         ZERO_STRUCT(data);
1185         data.dsize = sizeof(uint32_t);
1186         data.dptr = (unsigned char *)&recmaster;
1187
1188         ret = ctdb_control(ctdb, destnode, 0, 
1189                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1190                            NULL, NULL, &res, &timeout, NULL);
1191         if (ret != 0 || res != 0) {
1192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1193                 return -1;
1194         }
1195
1196         return 0;
1197 }
1198
1199
1200 /*
1201   get a list of databases off a remote node
1202  */
1203 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1204                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1205 {
1206         int ret;
1207         TDB_DATA outdata;
1208         int32_t res;
1209
1210         ret = ctdb_control(ctdb, destnode, 0, 
1211                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1212                            mem_ctx, &outdata, &res, &timeout, NULL);
1213         if (ret != 0 || res != 0) {
1214                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1215                 return -1;
1216         }
1217
1218         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1219         talloc_free(outdata.dptr);
1220                     
1221         return 0;
1222 }
1223
1224 /*
1225   get a list of nodes (vnn and flags ) from a remote node
1226  */
1227 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1228                 struct timeval timeout, uint32_t destnode, 
1229                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1230 {
1231         int ret;
1232         TDB_DATA outdata;
1233         int32_t res;
1234
1235         ret = ctdb_control(ctdb, destnode, 0, 
1236                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1237                            mem_ctx, &outdata, &res, &timeout, NULL);
1238         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1239                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1240                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1241         }
1242         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1244                 return -1;
1245         }
1246
1247         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1248         talloc_free(outdata.dptr);
1249                     
1250         return 0;
1251 }
1252
1253 /*
1254   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1255  */
1256 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1257                 struct timeval timeout, uint32_t destnode, 
1258                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1259 {
1260         int ret, i, len;
1261         TDB_DATA outdata;
1262         struct ctdb_node_mapv4 *nodemapv4;
1263         int32_t res;
1264
1265         ret = ctdb_control(ctdb, destnode, 0, 
1266                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1267                            mem_ctx, &outdata, &res, &timeout, NULL);
1268         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1269                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1270                 return -1;
1271         }
1272
1273         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1274
1275         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1276         (*nodemap) = talloc_zero_size(mem_ctx, len);
1277         CTDB_NO_MEMORY(ctdb, (*nodemap));
1278
1279         (*nodemap)->num = nodemapv4->num;
1280         for (i=0; i<nodemapv4->num; i++) {
1281                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1282                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1283                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1284                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1285         }
1286                 
1287         talloc_free(outdata.dptr);
1288                     
1289         return 0;
1290 }
1291
1292 /*
1293   drop the transport, reload the nodes file and restart the transport
1294  */
1295 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1296                     struct timeval timeout, uint32_t destnode)
1297 {
1298         int ret;
1299         int32_t res;
1300
1301         ret = ctdb_control(ctdb, destnode, 0, 
1302                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1303                            NULL, NULL, &res, &timeout, NULL);
1304         if (ret != 0 || res != 0) {
1305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1306                 return -1;
1307         }
1308
1309         return 0;
1310 }
1311
1312
1313 /*
1314   set vnn map on a node
1315  */
1316 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1317                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1318 {
1319         int ret;
1320         TDB_DATA data;
1321         int32_t res;
1322         struct ctdb_vnn_map_wire *map;
1323         size_t len;
1324
1325         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1326         map = talloc_size(mem_ctx, len);
1327         CTDB_NO_MEMORY(ctdb, map);
1328
1329         map->generation = vnnmap->generation;
1330         map->size = vnnmap->size;
1331         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1332         
1333         data.dsize = len;
1334         data.dptr  = (uint8_t *)map;
1335
1336         ret = ctdb_control(ctdb, destnode, 0, 
1337                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1338                            NULL, NULL, &res, &timeout, NULL);
1339         if (ret != 0 || res != 0) {
1340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1341                 return -1;
1342         }
1343
1344         talloc_free(map);
1345
1346         return 0;
1347 }
1348
1349
1350 /*
1351   async send for pull database
1352  */
1353 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1354         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1355         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1356 {
1357         TDB_DATA indata;
1358         struct ctdb_control_pulldb *pull;
1359         struct ctdb_client_control_state *state;
1360
1361         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1362         CTDB_NO_MEMORY_NULL(ctdb, pull);
1363
1364         pull->db_id   = dbid;
1365         pull->lmaster = lmaster;
1366
1367         indata.dsize = sizeof(struct ctdb_control_pulldb);
1368         indata.dptr  = (unsigned char *)pull;
1369
1370         state = ctdb_control_send(ctdb, destnode, 0, 
1371                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1372                                   mem_ctx, &timeout, NULL);
1373         talloc_free(pull);
1374
1375         return state;
1376 }
1377
1378 /*
1379   async recv for pull database
1380  */
1381 int ctdb_ctrl_pulldb_recv(
1382         struct ctdb_context *ctdb, 
1383         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1384         TDB_DATA *outdata)
1385 {
1386         int ret;
1387         int32_t res;
1388
1389         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1390         if ( (ret != 0) || (res != 0) ){
1391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1392                 return -1;
1393         }
1394
1395         return 0;
1396 }
1397
1398 /*
1399   pull all keys and records for a specific database on a node
1400  */
1401 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1402                 uint32_t dbid, uint32_t lmaster, 
1403                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1404                 TDB_DATA *outdata)
1405 {
1406         struct ctdb_client_control_state *state;
1407
1408         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1409                                       timeout);
1410         
1411         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1412 }
1413
1414
1415 /*
1416   change dmaster for all keys in the database to the new value
1417  */
1418 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1419                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1420 {
1421         int ret;
1422         TDB_DATA indata;
1423         int32_t res;
1424
1425         indata.dsize = 2*sizeof(uint32_t);
1426         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1427
1428         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1429         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1430
1431         ret = ctdb_control(ctdb, destnode, 0, 
1432                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1433                            NULL, NULL, &res, &timeout, NULL);
1434         if (ret != 0 || res != 0) {
1435                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1436                 return -1;
1437         }
1438
1439         return 0;
1440 }
1441
1442 /*
1443   ping a node, return number of clients connected
1444  */
1445 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1446 {
1447         int ret;
1448         int32_t res;
1449
1450         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1451                            tdb_null, NULL, NULL, &res, NULL, NULL);
1452         if (ret != 0) {
1453                 return -1;
1454         }
1455         return res;
1456 }
1457
1458 /*
1459   find the real path to a ltdb 
1460  */
1461 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1462                    const char **path)
1463 {
1464         int ret;
1465         int32_t res;
1466         TDB_DATA data;
1467
1468         data.dptr = (uint8_t *)&dbid;
1469         data.dsize = sizeof(dbid);
1470
1471         ret = ctdb_control(ctdb, destnode, 0, 
1472                            CTDB_CONTROL_GETDBPATH, 0, data, 
1473                            mem_ctx, &data, &res, &timeout, NULL);
1474         if (ret != 0 || res != 0) {
1475                 return -1;
1476         }
1477
1478         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1479         if ((*path) == NULL) {
1480                 return -1;
1481         }
1482
1483         talloc_free(data.dptr);
1484
1485         return 0;
1486 }
1487
1488 /*
1489   find the name of a db 
1490  */
1491 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1492                    const char **name)
1493 {
1494         int ret;
1495         int32_t res;
1496         TDB_DATA data;
1497
1498         data.dptr = (uint8_t *)&dbid;
1499         data.dsize = sizeof(dbid);
1500
1501         ret = ctdb_control(ctdb, destnode, 0, 
1502                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1503                            mem_ctx, &data, &res, &timeout, NULL);
1504         if (ret != 0 || res != 0) {
1505                 return -1;
1506         }
1507
1508         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1509         if ((*name) == NULL) {
1510                 return -1;
1511         }
1512
1513         talloc_free(data.dptr);
1514
1515         return 0;
1516 }
1517
1518 /*
1519   get the health status of a db
1520  */
1521 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1522                           struct timeval timeout,
1523                           uint32_t destnode,
1524                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1525                           const char **reason)
1526 {
1527         int ret;
1528         int32_t res;
1529         TDB_DATA data;
1530
1531         data.dptr = (uint8_t *)&dbid;
1532         data.dsize = sizeof(dbid);
1533
1534         ret = ctdb_control(ctdb, destnode, 0,
1535                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1536                            mem_ctx, &data, &res, &timeout, NULL);
1537         if (ret != 0 || res != 0) {
1538                 return -1;
1539         }
1540
1541         if (data.dsize == 0) {
1542                 (*reason) = NULL;
1543                 return 0;
1544         }
1545
1546         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1547         if ((*reason) == NULL) {
1548                 return -1;
1549         }
1550
1551         talloc_free(data.dptr);
1552
1553         return 0;
1554 }
1555
1556 /*
1557   create a database
1558  */
1559 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1560                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1561 {
1562         int ret;
1563         int32_t res;
1564         TDB_DATA data;
1565
1566         data.dptr = discard_const(name);
1567         data.dsize = strlen(name)+1;
1568
1569         ret = ctdb_control(ctdb, destnode, 0, 
1570                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1571                            0, data, 
1572                            mem_ctx, &data, &res, &timeout, NULL);
1573
1574         if (ret != 0 || res != 0) {
1575                 return -1;
1576         }
1577
1578         return 0;
1579 }
1580
1581 /*
1582   get debug level on a node
1583  */
1584 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1585 {
1586         int ret;
1587         int32_t res;
1588         TDB_DATA data;
1589
1590         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1591                            ctdb, &data, &res, NULL, NULL);
1592         if (ret != 0 || res != 0) {
1593                 return -1;
1594         }
1595         if (data.dsize != sizeof(int32_t)) {
1596                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1597                          (unsigned)data.dsize));
1598                 return -1;
1599         }
1600         *level = *(int32_t *)data.dptr;
1601         talloc_free(data.dptr);
1602         return 0;
1603 }
1604
1605 /*
1606   set debug level on a node
1607  */
1608 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1609 {
1610         int ret;
1611         int32_t res;
1612         TDB_DATA data;
1613
1614         data.dptr = (uint8_t *)&level;
1615         data.dsize = sizeof(level);
1616
1617         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1618                            NULL, NULL, &res, NULL, NULL);
1619         if (ret != 0 || res != 0) {
1620                 return -1;
1621         }
1622         return 0;
1623 }
1624
1625
1626 /*
1627   get a list of connected nodes
1628  */
1629 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1630                                 struct timeval timeout,
1631                                 TALLOC_CTX *mem_ctx,
1632                                 uint32_t *num_nodes)
1633 {
1634         struct ctdb_node_map *map=NULL;
1635         int ret, i;
1636         uint32_t *nodes;
1637
1638         *num_nodes = 0;
1639
1640         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1641         if (ret != 0) {
1642                 return NULL;
1643         }
1644
1645         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1646         if (nodes == NULL) {
1647                 return NULL;
1648         }
1649
1650         for (i=0;i<map->num;i++) {
1651                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1652                         nodes[*num_nodes] = map->nodes[i].pnn;
1653                         (*num_nodes)++;
1654                 }
1655         }
1656
1657         return nodes;
1658 }
1659
1660
1661 /*
1662   reset remote status
1663  */
1664 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1665 {
1666         int ret;
1667         int32_t res;
1668
1669         ret = ctdb_control(ctdb, destnode, 0, 
1670                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1671                            NULL, NULL, &res, NULL, NULL);
1672         if (ret != 0 || res != 0) {
1673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1674                 return -1;
1675         }
1676         return 0;
1677 }
1678
1679 /*
1680   this is the dummy null procedure that all databases support
1681 */
1682 static int ctdb_null_func(struct ctdb_call_info *call)
1683 {
1684         return 0;
1685 }
1686
1687 /*
1688   this is a plain fetch procedure that all databases support
1689 */
1690 static int ctdb_fetch_func(struct ctdb_call_info *call)
1691 {
1692         call->reply_data = &call->record_data;
1693         return 0;
1694 }
1695
1696 /*
1697   attach to a specific database - client call
1698 */
1699 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1700 {
1701         struct ctdb_db_context *ctdb_db;
1702         TDB_DATA data;
1703         int ret;
1704         int32_t res;
1705
1706         ctdb_db = ctdb_db_handle(ctdb, name);
1707         if (ctdb_db) {
1708                 return ctdb_db;
1709         }
1710
1711         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1712         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1713
1714         ctdb_db->ctdb = ctdb;
1715         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1716         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1717
1718         data.dptr = discard_const(name);
1719         data.dsize = strlen(name)+1;
1720
1721         /* tell ctdb daemon to attach */
1722         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1723                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1724                            0, data, ctdb_db, &data, &res, NULL, NULL);
1725         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1726                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1727                 talloc_free(ctdb_db);
1728                 return NULL;
1729         }
1730         
1731         ctdb_db->db_id = *(uint32_t *)data.dptr;
1732         talloc_free(data.dptr);
1733
1734         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1735         if (ret != 0) {
1736                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1737                 talloc_free(ctdb_db);
1738                 return NULL;
1739         }
1740
1741         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1742         if (ctdb->valgrinding) {
1743                 tdb_flags |= TDB_NOMMAP;
1744         }
1745         tdb_flags |= TDB_DISALLOW_NESTING;
1746
1747         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1748         if (ctdb_db->ltdb == NULL) {
1749                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1750                 talloc_free(ctdb_db);
1751                 return NULL;
1752         }
1753
1754         ctdb_db->persistent = persistent;
1755
1756         DLIST_ADD(ctdb->db_list, ctdb_db);
1757
1758         /* add well known functions */
1759         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1760         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1761
1762         return ctdb_db;
1763 }
1764
1765
1766 /*
1767   setup a call for a database
1768  */
1769 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1770 {
1771         struct ctdb_registered_call *call;
1772
1773 #if 0
1774         TDB_DATA data;
1775         int32_t status;
1776         struct ctdb_control_set_call c;
1777         int ret;
1778
1779         /* this is no longer valid with the separate daemon architecture */
1780         c.db_id = ctdb_db->db_id;
1781         c.fn    = fn;
1782         c.id    = id;
1783
1784         data.dptr = (uint8_t *)&c;
1785         data.dsize = sizeof(c);
1786
1787         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1788                            data, NULL, NULL, &status, NULL, NULL);
1789         if (ret != 0 || status != 0) {
1790                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1791                 return -1;
1792         }
1793 #endif
1794
1795         /* also register locally */
1796         call = talloc(ctdb_db, struct ctdb_registered_call);
1797         call->fn = fn;
1798         call->id = id;
1799
1800         DLIST_ADD(ctdb_db->calls, call);        
1801         return 0;
1802 }
1803
1804
1805 struct traverse_state {
1806         bool done;
1807         uint32_t count;
1808         ctdb_traverse_func fn;
1809         void *private_data;
1810 };
1811
1812 /*
1813   called on each key during a ctdb_traverse
1814  */
1815 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1816 {
1817         struct traverse_state *state = (struct traverse_state *)p;
1818         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1819         TDB_DATA key;
1820
1821         if (data.dsize < sizeof(uint32_t) ||
1822             d->length != data.dsize) {
1823                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1824                 state->done = True;
1825                 return;
1826         }
1827
1828         key.dsize = d->keylen;
1829         key.dptr  = &d->data[0];
1830         data.dsize = d->datalen;
1831         data.dptr = &d->data[d->keylen];
1832
1833         if (key.dsize == 0 && data.dsize == 0) {
1834                 /* end of traverse */
1835                 state->done = True;
1836                 return;
1837         }
1838
1839         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1840                 /* empty records are deleted records in ctdb */
1841                 return;
1842         }
1843
1844         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1845                 state->done = True;
1846         }
1847
1848         state->count++;
1849 }
1850
1851
1852 /*
1853   start a cluster wide traverse, calling the supplied fn on each record
1854   return the number of records traversed, or -1 on error
1855  */
1856 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1857 {
1858         TDB_DATA data;
1859         struct ctdb_traverse_start t;
1860         int32_t status;
1861         int ret;
1862         uint64_t srvid = (getpid() | 0xFLL<<60);
1863         struct traverse_state state;
1864
1865         state.done = False;
1866         state.count = 0;
1867         state.private_data = private_data;
1868         state.fn = fn;
1869
1870         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1873                 return -1;
1874         }
1875
1876         t.db_id = ctdb_db->db_id;
1877         t.srvid = srvid;
1878         t.reqid = 0;
1879
1880         data.dptr = (uint8_t *)&t;
1881         data.dsize = sizeof(t);
1882
1883         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1884                            data, NULL, NULL, &status, NULL, NULL);
1885         if (ret != 0 || status != 0) {
1886                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1887                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1888                 return -1;
1889         }
1890
1891         while (!state.done) {
1892                 event_loop_once(ctdb_db->ctdb->ev);
1893         }
1894
1895         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1896         if (ret != 0) {
1897                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1898                 return -1;
1899         }
1900
1901         return state.count;
1902 }
1903
1904 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1905 /*
1906   called on each key during a catdb
1907  */
1908 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1909 {
1910         int i;
1911         FILE *f = (FILE *)p;
1912         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1913
1914         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1915         for (i=0;i<key.dsize;i++) {
1916                 if (ISASCII(key.dptr[i])) {
1917                         fprintf(f, "%c", key.dptr[i]);
1918                 } else {
1919                         fprintf(f, "\\%02X", key.dptr[i]);
1920                 }
1921         }
1922         fprintf(f, "\"\n");
1923
1924         fprintf(f, "dmaster: %u\n", h->dmaster);
1925         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1926
1927         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1928         for (i=sizeof(*h);i<data.dsize;i++) {
1929                 if (ISASCII(data.dptr[i])) {
1930                         fprintf(f, "%c", data.dptr[i]);
1931                 } else {
1932                         fprintf(f, "\\%02X", data.dptr[i]);
1933                 }
1934         }
1935         fprintf(f, "\"\n");
1936
1937         fprintf(f, "\n");
1938
1939         return 0;
1940 }
1941
1942 /*
1943   convenience function to list all keys to stdout
1944  */
1945 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1946 {
1947         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1948 }
1949
1950 /*
1951   get the pid of a ctdb daemon
1952  */
1953 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1954 {
1955         int ret;
1956         int32_t res;
1957
1958         ret = ctdb_control(ctdb, destnode, 0, 
1959                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1960                            NULL, NULL, &res, &timeout, NULL);
1961         if (ret != 0) {
1962                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1963                 return -1;
1964         }
1965
1966         *pid = res;
1967
1968         return 0;
1969 }
1970
1971
1972 /*
1973   async freeze send control
1974  */
1975 struct ctdb_client_control_state *
1976 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1977 {
1978         return ctdb_control_send(ctdb, destnode, priority, 
1979                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1980                            mem_ctx, &timeout, NULL);
1981 }
1982
1983 /* 
1984    async freeze recv control
1985 */
1986 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1987 {
1988         int ret;
1989         int32_t res;
1990
1991         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1992         if ( (ret != 0) || (res != 0) ){
1993                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1994                 return -1;
1995         }
1996
1997         return 0;
1998 }
1999
2000 /*
2001   freeze databases of a certain priority
2002  */
2003 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2004 {
2005         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2006         struct ctdb_client_control_state *state;
2007         int ret;
2008
2009         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2010         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2011         talloc_free(tmp_ctx);
2012
2013         return ret;
2014 }
2015
2016 /* Freeze all databases */
2017 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2018 {
2019         int i;
2020
2021         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2022                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2023                         return -1;
2024                 }
2025         }
2026         return 0;
2027 }
2028
2029 /*
2030   thaw databases of a certain priority
2031  */
2032 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2033 {
2034         int ret;
2035         int32_t res;
2036
2037         ret = ctdb_control(ctdb, destnode, priority, 
2038                            CTDB_CONTROL_THAW, 0, tdb_null, 
2039                            NULL, NULL, &res, &timeout, NULL);
2040         if (ret != 0 || res != 0) {
2041                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2042                 return -1;
2043         }
2044
2045         return 0;
2046 }
2047
2048 /* thaw all databases */
2049 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2050 {
2051         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2052 }
2053
2054 /*
2055   get pnn of a node, or -1
2056  */
2057 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2058 {
2059         int ret;
2060         int32_t res;
2061
2062         ret = ctdb_control(ctdb, destnode, 0, 
2063                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2064                            NULL, NULL, &res, &timeout, NULL);
2065         if (ret != 0) {
2066                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2067                 return -1;
2068         }
2069
2070         return res;
2071 }
2072
2073 /*
2074   get the monitoring mode of a remote node
2075  */
2076 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2077 {
2078         int ret;
2079         int32_t res;
2080
2081         ret = ctdb_control(ctdb, destnode, 0, 
2082                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2083                            NULL, NULL, &res, &timeout, NULL);
2084         if (ret != 0) {
2085                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2086                 return -1;
2087         }
2088
2089         *monmode = res;
2090
2091         return 0;
2092 }
2093
2094
2095 /*
2096  set the monitoring mode of a remote node to active
2097  */
2098 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2099 {
2100         int ret;
2101         
2102
2103         ret = ctdb_control(ctdb, destnode, 0, 
2104                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2105                            NULL, NULL,NULL, &timeout, NULL);
2106         if (ret != 0) {
2107                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2108                 return -1;
2109         }
2110
2111         
2112
2113         return 0;
2114 }
2115
2116 /*
2117   set the monitoring mode of a remote node to disable
2118  */
2119 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2120 {
2121         int ret;
2122         
2123
2124         ret = ctdb_control(ctdb, destnode, 0, 
2125                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2126                            NULL, NULL, NULL, &timeout, NULL);
2127         if (ret != 0) {
2128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2129                 return -1;
2130         }
2131
2132         
2133
2134         return 0;
2135 }
2136
2137
2138
2139 /* 
2140   sent to a node to make it take over an ip address
2141 */
2142 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2143                           uint32_t destnode, struct ctdb_public_ip *ip)
2144 {
2145         TDB_DATA data;
2146         struct ctdb_public_ipv4 ipv4;
2147         int ret;
2148         int32_t res;
2149
2150         if (ip->addr.sa.sa_family == AF_INET) {
2151                 ipv4.pnn = ip->pnn;
2152                 ipv4.sin = ip->addr.ip;
2153
2154                 data.dsize = sizeof(ipv4);
2155                 data.dptr  = (uint8_t *)&ipv4;
2156
2157                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2158                            NULL, &res, &timeout, NULL);
2159         } else {
2160                 data.dsize = sizeof(*ip);
2161                 data.dptr  = (uint8_t *)ip;
2162
2163                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2164                            NULL, &res, &timeout, NULL);
2165         }
2166
2167         if (ret != 0 || res != 0) {
2168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2169                 return -1;
2170         }
2171
2172         return 0;       
2173 }
2174
2175
2176 /* 
2177   sent to a node to make it release an ip address
2178 */
2179 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2180                          uint32_t destnode, struct ctdb_public_ip *ip)
2181 {
2182         TDB_DATA data;
2183         struct ctdb_public_ipv4 ipv4;
2184         int ret;
2185         int32_t res;
2186
2187         if (ip->addr.sa.sa_family == AF_INET) {
2188                 ipv4.pnn = ip->pnn;
2189                 ipv4.sin = ip->addr.ip;
2190
2191                 data.dsize = sizeof(ipv4);
2192                 data.dptr  = (uint8_t *)&ipv4;
2193
2194                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2195                                    NULL, &res, &timeout, NULL);
2196         } else {
2197                 data.dsize = sizeof(*ip);
2198                 data.dptr  = (uint8_t *)ip;
2199
2200                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2201                                    NULL, &res, &timeout, NULL);
2202         }
2203
2204         if (ret != 0 || res != 0) {
2205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2206                 return -1;
2207         }
2208
2209         return 0;       
2210 }
2211
2212
2213 /*
2214   get a tunable
2215  */
2216 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2217                           struct timeval timeout, 
2218                           uint32_t destnode,
2219                           const char *name, uint32_t *value)
2220 {
2221         struct ctdb_control_get_tunable *t;
2222         TDB_DATA data, outdata;
2223         int32_t res;
2224         int ret;
2225
2226         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2227         data.dptr  = talloc_size(ctdb, data.dsize);
2228         CTDB_NO_MEMORY(ctdb, data.dptr);
2229
2230         t = (struct ctdb_control_get_tunable *)data.dptr;
2231         t->length = strlen(name)+1;
2232         memcpy(t->name, name, t->length);
2233
2234         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2235                            &outdata, &res, &timeout, NULL);
2236         talloc_free(data.dptr);
2237         if (ret != 0 || res != 0) {
2238                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2239                 return -1;
2240         }
2241
2242         if (outdata.dsize != sizeof(uint32_t)) {
2243                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2244                 talloc_free(outdata.dptr);
2245                 return -1;
2246         }
2247         
2248         *value = *(uint32_t *)outdata.dptr;
2249         talloc_free(outdata.dptr);
2250
2251         return 0;
2252 }
2253
2254 /*
2255   set a tunable
2256  */
2257 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2258                           struct timeval timeout, 
2259                           uint32_t destnode,
2260                           const char *name, uint32_t value)
2261 {
2262         struct ctdb_control_set_tunable *t;
2263         TDB_DATA data;
2264         int32_t res;
2265         int ret;
2266
2267         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2268         data.dptr  = talloc_size(ctdb, data.dsize);
2269         CTDB_NO_MEMORY(ctdb, data.dptr);
2270
2271         t = (struct ctdb_control_set_tunable *)data.dptr;
2272         t->length = strlen(name)+1;
2273         memcpy(t->name, name, t->length);
2274         t->value = value;
2275
2276         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2277                            NULL, &res, &timeout, NULL);
2278         talloc_free(data.dptr);
2279         if (ret != 0 || res != 0) {
2280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2281                 return -1;
2282         }
2283
2284         return 0;
2285 }
2286
2287 /*
2288   list tunables
2289  */
2290 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2291                             struct timeval timeout, 
2292                             uint32_t destnode,
2293                             TALLOC_CTX *mem_ctx,
2294                             const char ***list, uint32_t *count)
2295 {
2296         TDB_DATA outdata;
2297         int32_t res;
2298         int ret;
2299         struct ctdb_control_list_tunable *t;
2300         char *p, *s, *ptr;
2301
2302         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2303                            mem_ctx, &outdata, &res, &timeout, NULL);
2304         if (ret != 0 || res != 0) {
2305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2306                 return -1;
2307         }
2308
2309         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2310         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2311             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2312                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2313                 talloc_free(outdata.dptr);
2314                 return -1;              
2315         }
2316         
2317         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2318         CTDB_NO_MEMORY(ctdb, p);
2319
2320         talloc_free(outdata.dptr);
2321         
2322         (*list) = NULL;
2323         (*count) = 0;
2324
2325         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2326                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2327                 CTDB_NO_MEMORY(ctdb, *list);
2328                 (*list)[*count] = talloc_strdup(*list, s);
2329                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2330                 (*count)++;
2331         }
2332
2333         talloc_free(p);
2334
2335         return 0;
2336 }
2337
2338
2339 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2340                                    struct timeval timeout, uint32_t destnode,
2341                                    TALLOC_CTX *mem_ctx,
2342                                    uint32_t flags,
2343                                    struct ctdb_all_public_ips **ips)
2344 {
2345         int ret;
2346         TDB_DATA outdata;
2347         int32_t res;
2348
2349         ret = ctdb_control(ctdb, destnode, 0, 
2350                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2351                            mem_ctx, &outdata, &res, &timeout, NULL);
2352         if (ret == 0 && res == -1) {
2353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2354                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2355         }
2356         if (ret != 0 || res != 0) {
2357           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2358                 return -1;
2359         }
2360
2361         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2362         talloc_free(outdata.dptr);
2363                     
2364         return 0;
2365 }
2366
2367 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2368                              struct timeval timeout, uint32_t destnode,
2369                              TALLOC_CTX *mem_ctx,
2370                              struct ctdb_all_public_ips **ips)
2371 {
2372         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2373                                               destnode, mem_ctx,
2374                                               0, ips);
2375 }
2376
2377 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2378                         struct timeval timeout, uint32_t destnode, 
2379                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2380 {
2381         int ret, i, len;
2382         TDB_DATA outdata;
2383         int32_t res;
2384         struct ctdb_all_public_ipsv4 *ipsv4;
2385
2386         ret = ctdb_control(ctdb, destnode, 0, 
2387                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2388                            mem_ctx, &outdata, &res, &timeout, NULL);
2389         if (ret != 0 || res != 0) {
2390                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2391                 return -1;
2392         }
2393
2394         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2395         len = offsetof(struct ctdb_all_public_ips, ips) +
2396                 ipsv4->num*sizeof(struct ctdb_public_ip);
2397         *ips = talloc_zero_size(mem_ctx, len);
2398         CTDB_NO_MEMORY(ctdb, *ips);
2399         (*ips)->num = ipsv4->num;
2400         for (i=0; i<ipsv4->num; i++) {
2401                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2402                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2403         }
2404
2405         talloc_free(outdata.dptr);
2406                     
2407         return 0;
2408 }
2409
2410 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2411                                  struct timeval timeout, uint32_t destnode,
2412                                  TALLOC_CTX *mem_ctx,
2413                                  const ctdb_sock_addr *addr,
2414                                  struct ctdb_control_public_ip_info **_info)
2415 {
2416         int ret;
2417         TDB_DATA indata;
2418         TDB_DATA outdata;
2419         int32_t res;
2420         struct ctdb_control_public_ip_info *info;
2421         uint32_t len;
2422         uint32_t i;
2423
2424         indata.dptr = discard_const_p(uint8_t, addr);
2425         indata.dsize = sizeof(*addr);
2426
2427         ret = ctdb_control(ctdb, destnode, 0,
2428                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2429                            mem_ctx, &outdata, &res, &timeout, NULL);
2430         if (ret != 0 || res != 0) {
2431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2432                                 "failed ret:%d res:%d\n",
2433                                 ret, res));
2434                 return -1;
2435         }
2436
2437         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2438         if (len > outdata.dsize) {
2439                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2440                                 "returned invalid data with size %u > %u\n",
2441                                 (unsigned int)outdata.dsize,
2442                                 (unsigned int)len));
2443                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2444                 return -1;
2445         }
2446
2447         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2448         len += info->num*sizeof(struct ctdb_control_iface_info);
2449
2450         if (len > outdata.dsize) {
2451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2452                                 "returned invalid data with size %u > %u\n",
2453                                 (unsigned int)outdata.dsize,
2454                                 (unsigned int)len));
2455                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2456                 return -1;
2457         }
2458
2459         /* make sure we null terminate the returned strings */
2460         for (i=0; i < info->num; i++) {
2461                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2462         }
2463
2464         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2465                                                                 outdata.dptr,
2466                                                                 outdata.dsize);
2467         talloc_free(outdata.dptr);
2468         if (*_info == NULL) {
2469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2470                                 "talloc_memdup size %u failed\n",
2471                                 (unsigned int)outdata.dsize));
2472                 return -1;
2473         }
2474
2475         return 0;
2476 }
2477
2478 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2479                          struct timeval timeout, uint32_t destnode,
2480                          TALLOC_CTX *mem_ctx,
2481                          struct ctdb_control_get_ifaces **_ifaces)
2482 {
2483         int ret;
2484         TDB_DATA outdata;
2485         int32_t res;
2486         struct ctdb_control_get_ifaces *ifaces;
2487         uint32_t len;
2488         uint32_t i;
2489
2490         ret = ctdb_control(ctdb, destnode, 0,
2491                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2492                            mem_ctx, &outdata, &res, &timeout, NULL);
2493         if (ret != 0 || res != 0) {
2494                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2495                                 "failed ret:%d res:%d\n",
2496                                 ret, res));
2497                 return -1;
2498         }
2499
2500         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2501         if (len > outdata.dsize) {
2502                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2503                                 "returned invalid data with size %u > %u\n",
2504                                 (unsigned int)outdata.dsize,
2505                                 (unsigned int)len));
2506                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2507                 return -1;
2508         }
2509
2510         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2511         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2512
2513         if (len > outdata.dsize) {
2514                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2515                                 "returned invalid data with size %u > %u\n",
2516                                 (unsigned int)outdata.dsize,
2517                                 (unsigned int)len));
2518                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2519                 return -1;
2520         }
2521
2522         /* make sure we null terminate the returned strings */
2523         for (i=0; i < ifaces->num; i++) {
2524                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2525         }
2526
2527         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2528                                                                   outdata.dptr,
2529                                                                   outdata.dsize);
2530         talloc_free(outdata.dptr);
2531         if (*_ifaces == NULL) {
2532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2533                                 "talloc_memdup size %u failed\n",
2534                                 (unsigned int)outdata.dsize));
2535                 return -1;
2536         }
2537
2538         return 0;
2539 }
2540
2541 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2542                              struct timeval timeout, uint32_t destnode,
2543                              TALLOC_CTX *mem_ctx,
2544                              const struct ctdb_control_iface_info *info)
2545 {
2546         int ret;
2547         TDB_DATA indata;
2548         int32_t res;
2549
2550         indata.dptr = discard_const_p(uint8_t, info);
2551         indata.dsize = sizeof(*info);
2552
2553         ret = ctdb_control(ctdb, destnode, 0,
2554                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2555                            mem_ctx, NULL, &res, &timeout, NULL);
2556         if (ret != 0 || res != 0) {
2557                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2558                                 "failed ret:%d res:%d\n",
2559                                 ret, res));
2560                 return -1;
2561         }
2562
2563         return 0;
2564 }
2565
2566 /*
2567   set/clear the permanent disabled bit on a remote node
2568  */
2569 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2570                        uint32_t set, uint32_t clear)
2571 {
2572         int ret;
2573         TDB_DATA data;
2574         struct ctdb_node_map *nodemap=NULL;
2575         struct ctdb_node_flag_change c;
2576         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2577         uint32_t recmaster;
2578         uint32_t *nodes;
2579
2580
2581         /* find the recovery master */
2582         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2583         if (ret != 0) {
2584                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2585                 talloc_free(tmp_ctx);
2586                 return ret;
2587         }
2588
2589
2590         /* read the node flags from the recmaster */
2591         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2592         if (ret != 0) {
2593                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2594                 talloc_free(tmp_ctx);
2595                 return -1;
2596         }
2597         if (destnode >= nodemap->num) {
2598                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2599                 talloc_free(tmp_ctx);
2600                 return -1;
2601         }
2602
2603         c.pnn       = destnode;
2604         c.old_flags = nodemap->nodes[destnode].flags;
2605         c.new_flags = c.old_flags;
2606         c.new_flags |= set;
2607         c.new_flags &= ~clear;
2608
2609         data.dsize = sizeof(c);
2610         data.dptr = (unsigned char *)&c;
2611
2612         /* send the flags update to all connected nodes */
2613         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2614
2615         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2616                                         nodes, 0,
2617                                         timeout, false, data,
2618                                         NULL, NULL,
2619                                         NULL) != 0) {
2620                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2621
2622                 talloc_free(tmp_ctx);
2623                 return -1;
2624         }
2625
2626         talloc_free(tmp_ctx);
2627         return 0;
2628 }
2629
2630
2631 /*
2632   get all tunables
2633  */
2634 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2635                                struct timeval timeout, 
2636                                uint32_t destnode,
2637                                struct ctdb_tunable *tunables)
2638 {
2639         TDB_DATA outdata;
2640         int ret;
2641         int32_t res;
2642
2643         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2644                            &outdata, &res, &timeout, NULL);
2645         if (ret != 0 || res != 0) {
2646                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2647                 return -1;
2648         }
2649
2650         if (outdata.dsize != sizeof(*tunables)) {
2651                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2652                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2653                 return -1;              
2654         }
2655
2656         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2657         talloc_free(outdata.dptr);
2658         return 0;
2659 }
2660
2661 /*
2662   add a public address to a node
2663  */
2664 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2665                       struct timeval timeout, 
2666                       uint32_t destnode,
2667                       struct ctdb_control_ip_iface *pub)
2668 {
2669         TDB_DATA data;
2670         int32_t res;
2671         int ret;
2672
2673         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2674         data.dptr  = (unsigned char *)pub;
2675
2676         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2677                            NULL, &res, &timeout, NULL);
2678         if (ret != 0 || res != 0) {
2679                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2680                 return -1;
2681         }
2682
2683         return 0;
2684 }
2685
2686 /*
2687   delete a public address from a node
2688  */
2689 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2690                       struct timeval timeout, 
2691                       uint32_t destnode,
2692                       struct ctdb_control_ip_iface *pub)
2693 {
2694         TDB_DATA data;
2695         int32_t res;
2696         int ret;
2697
2698         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2699         data.dptr  = (unsigned char *)pub;
2700
2701         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2702                            NULL, &res, &timeout, NULL);
2703         if (ret != 0 || res != 0) {
2704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2705                 return -1;
2706         }
2707
2708         return 0;
2709 }
2710
2711 /*
2712   kill a tcp connection
2713  */
2714 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2715                       struct timeval timeout, 
2716                       uint32_t destnode,
2717                       struct ctdb_control_killtcp *killtcp)
2718 {
2719         TDB_DATA data;
2720         int32_t res;
2721         int ret;
2722
2723         data.dsize = sizeof(struct ctdb_control_killtcp);
2724         data.dptr  = (unsigned char *)killtcp;
2725
2726         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2727                            NULL, &res, &timeout, NULL);
2728         if (ret != 0 || res != 0) {
2729                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2730                 return -1;
2731         }
2732
2733         return 0;
2734 }
2735
2736 /*
2737   send a gratious arp
2738  */
2739 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2740                       struct timeval timeout, 
2741                       uint32_t destnode,
2742                       ctdb_sock_addr *addr,
2743                       const char *ifname)
2744 {
2745         TDB_DATA data;
2746         int32_t res;
2747         int ret, len;
2748         struct ctdb_control_gratious_arp *gratious_arp;
2749         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2750
2751
2752         len = strlen(ifname)+1;
2753         gratious_arp = talloc_size(tmp_ctx, 
2754                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2755         CTDB_NO_MEMORY(ctdb, gratious_arp);
2756
2757         gratious_arp->addr = *addr;
2758         gratious_arp->len = len;
2759         memcpy(&gratious_arp->iface[0], ifname, len);
2760
2761
2762         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2763         data.dptr  = (unsigned char *)gratious_arp;
2764
2765         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2766                            NULL, &res, &timeout, NULL);
2767         if (ret != 0 || res != 0) {
2768                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2769                 talloc_free(tmp_ctx);
2770                 return -1;
2771         }
2772
2773         talloc_free(tmp_ctx);
2774         return 0;
2775 }
2776
2777 /*
2778   get a list of all tcp tickles that a node knows about for a particular vnn
2779  */
2780 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2781                               struct timeval timeout, uint32_t destnode, 
2782                               TALLOC_CTX *mem_ctx, 
2783                               ctdb_sock_addr *addr,
2784                               struct ctdb_control_tcp_tickle_list **list)
2785 {
2786         int ret;
2787         TDB_DATA data, outdata;
2788         int32_t status;
2789
2790         data.dptr = (uint8_t*)addr;
2791         data.dsize = sizeof(ctdb_sock_addr);
2792
2793         ret = ctdb_control(ctdb, destnode, 0, 
2794                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2795                            mem_ctx, &outdata, &status, NULL, NULL);
2796         if (ret != 0 || status != 0) {
2797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2798                 return -1;
2799         }
2800
2801         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2802
2803         return status;
2804 }
2805
2806 /*
2807   register a server id
2808  */
2809 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2810                       struct timeval timeout, 
2811                       struct ctdb_server_id *id)
2812 {
2813         TDB_DATA data;
2814         int32_t res;
2815         int ret;
2816
2817         data.dsize = sizeof(struct ctdb_server_id);
2818         data.dptr  = (unsigned char *)id;
2819
2820         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2821                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2822                         0, data, NULL,
2823                         NULL, &res, &timeout, NULL);
2824         if (ret != 0 || res != 0) {
2825                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2826                 return -1;
2827         }
2828
2829         return 0;
2830 }
2831
2832 /*
2833   unregister a server id
2834  */
2835 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2836                       struct timeval timeout, 
2837                       struct ctdb_server_id *id)
2838 {
2839         TDB_DATA data;
2840         int32_t res;
2841         int ret;
2842
2843         data.dsize = sizeof(struct ctdb_server_id);
2844         data.dptr  = (unsigned char *)id;
2845
2846         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2847                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2848                         0, data, NULL,
2849                         NULL, &res, &timeout, NULL);
2850         if (ret != 0 || res != 0) {
2851                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2852                 return -1;
2853         }
2854
2855         return 0;
2856 }
2857
2858
2859 /*
2860   check if a server id exists
2861
2862   if a server id does exist, return *status == 1, otherwise *status == 0
2863  */
2864 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2865                       struct timeval timeout, 
2866                       uint32_t destnode,
2867                       struct ctdb_server_id *id,
2868                       uint32_t *status)
2869 {
2870         TDB_DATA data;
2871         int32_t res;
2872         int ret;
2873
2874         data.dsize = sizeof(struct ctdb_server_id);
2875         data.dptr  = (unsigned char *)id;
2876
2877         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2878                         0, data, NULL,
2879                         NULL, &res, &timeout, NULL);
2880         if (ret != 0) {
2881                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2882                 return -1;
2883         }
2884
2885         if (res) {
2886                 *status = 1;
2887         } else {
2888                 *status = 0;
2889         }
2890
2891         return 0;
2892 }
2893
2894 /*
2895    get the list of server ids that are registered on a node
2896 */
2897 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2898                 TALLOC_CTX *mem_ctx,
2899                 struct timeval timeout, uint32_t destnode, 
2900                 struct ctdb_server_id_list **svid_list)
2901 {
2902         int ret;
2903         TDB_DATA outdata;
2904         int32_t res;
2905
2906         ret = ctdb_control(ctdb, destnode, 0, 
2907                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2908                            mem_ctx, &outdata, &res, &timeout, NULL);
2909         if (ret != 0 || res != 0) {
2910                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2911                 return -1;
2912         }
2913
2914         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2915                     
2916         return 0;
2917 }
2918
2919 /*
2920   initialise the ctdb daemon for client applications
2921
2922   NOTE: In current code the daemon does not fork. This is for testing purposes only
2923   and to simplify the code.
2924 */
2925 struct ctdb_context *ctdb_init(struct event_context *ev)
2926 {
2927         int ret;
2928         struct ctdb_context *ctdb;
2929
2930         ctdb = talloc_zero(ev, struct ctdb_context);
2931         if (ctdb == NULL) {
2932                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2933                 return NULL;
2934         }
2935         ctdb->ev  = ev;
2936         ctdb->idr = idr_init(ctdb);
2937         /* Wrap early to exercise code. */
2938         ctdb->lastid = INT_MAX-200;
2939         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2940
2941         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2942         if (ret != 0) {
2943                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2944                 talloc_free(ctdb);
2945                 return NULL;
2946         }
2947
2948         ctdb->statistics.statistics_start_time = timeval_current();
2949
2950         return ctdb;
2951 }
2952
2953
2954 /*
2955   set some ctdb flags
2956 */
2957 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2958 {
2959         ctdb->flags |= flags;
2960 }
2961
2962 /*
2963   setup the local socket name
2964 */
2965 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2966 {
2967         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2968         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2969
2970         return 0;
2971 }
2972
2973 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2974 {
2975         return ctdb->daemon.name;
2976 }
2977
2978 /*
2979   return the pnn of this node
2980 */
2981 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2982 {
2983         return ctdb->pnn;
2984 }
2985
2986
2987 /*
2988   get the uptime of a remote node
2989  */
2990 struct ctdb_client_control_state *
2991 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2992 {
2993         return ctdb_control_send(ctdb, destnode, 0, 
2994                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2995                            mem_ctx, &timeout, NULL);
2996 }
2997
2998 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2999 {
3000         int ret;
3001         int32_t res;
3002         TDB_DATA outdata;
3003
3004         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3005         if (ret != 0 || res != 0) {
3006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3007                 return -1;
3008         }
3009
3010         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3011
3012         return 0;
3013 }
3014
3015 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3016 {
3017         struct ctdb_client_control_state *state;
3018
3019         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3020         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3021 }
3022
3023 /*
3024   send a control to execute the "recovered" event script on a node
3025  */
3026 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3027 {
3028         int ret;
3029         int32_t status;
3030
3031         ret = ctdb_control(ctdb, destnode, 0, 
3032                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3033                            NULL, NULL, &status, &timeout, NULL);
3034         if (ret != 0 || status != 0) {
3035                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3036                 return -1;
3037         }
3038
3039         return 0;
3040 }
3041
3042 /* 
3043   callback for the async helpers used when sending the same control
3044   to multiple nodes in parallell.
3045 */
3046 static void async_callback(struct ctdb_client_control_state *state)
3047 {
3048         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3049         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3050         int ret;
3051         TDB_DATA outdata;
3052         int32_t res;
3053         uint32_t destnode = state->c->hdr.destnode;
3054
3055         /* one more node has responded with recmode data */
3056         data->count--;
3057
3058         /* if we failed to push the db, then return an error and let
3059            the main loop try again.
3060         */
3061         if (state->state != CTDB_CONTROL_DONE) {
3062                 if ( !data->dont_log_errors) {
3063                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3064                 }
3065                 data->fail_count++;
3066                 if (data->fail_callback) {
3067                         data->fail_callback(ctdb, destnode, res, outdata,
3068                                         data->callback_data);
3069                 }
3070                 return;
3071         }
3072         
3073         state->async.fn = NULL;
3074
3075         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3076         if ((ret != 0) || (res != 0)) {
3077                 if ( !data->dont_log_errors) {
3078                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3079                 }
3080                 data->fail_count++;
3081                 if (data->fail_callback) {
3082                         data->fail_callback(ctdb, destnode, res, outdata,
3083                                         data->callback_data);
3084                 }
3085         }
3086         if ((ret == 0) && (data->callback != NULL)) {
3087                 data->callback(ctdb, destnode, res, outdata,
3088                                         data->callback_data);
3089         }
3090 }
3091
3092
3093 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3094 {
3095         /* set up the callback functions */
3096         state->async.fn = async_callback;
3097         state->async.private_data = data;
3098         
3099         /* one more control to wait for to complete */
3100         data->count++;
3101 }
3102
3103
3104 /* wait for up to the maximum number of seconds allowed
3105    or until all nodes we expect a response from has replied
3106 */
3107 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3108 {
3109         while (data->count > 0) {
3110                 event_loop_once(ctdb->ev);
3111         }
3112         if (data->fail_count != 0) {
3113                 if (!data->dont_log_errors) {
3114                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3115                                  data->fail_count));
3116                 }
3117                 return -1;
3118         }
3119         return 0;
3120 }
3121
3122
3123 /* 
3124    perform a simple control on the listed nodes
3125    The control cannot return data
3126  */
3127 int ctdb_client_async_control(struct ctdb_context *ctdb,
3128                                 enum ctdb_controls opcode,
3129                                 uint32_t *nodes,
3130                                 uint64_t srvid,
3131                                 struct timeval timeout,
3132                                 bool dont_log_errors,
3133                                 TDB_DATA data,
3134                                 client_async_callback client_callback,
3135                                 client_async_callback fail_callback,
3136                                 void *callback_data)
3137 {
3138         struct client_async_data *async_data;
3139         struct ctdb_client_control_state *state;
3140         int j, num_nodes;
3141
3142         async_data = talloc_zero(ctdb, struct client_async_data);
3143         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3144         async_data->dont_log_errors = dont_log_errors;
3145         async_data->callback = client_callback;
3146         async_data->fail_callback = fail_callback;
3147         async_data->callback_data = callback_data;
3148         async_data->opcode        = opcode;
3149
3150         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3151
3152         /* loop over all nodes and send an async control to each of them */
3153         for (j=0; j<num_nodes; j++) {
3154                 uint32_t pnn = nodes[j];
3155
3156                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3157                                           0, data, async_data, &timeout, NULL);
3158                 if (state == NULL) {
3159                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3160                         talloc_free(async_data);
3161                         return -1;
3162                 }
3163                 
3164                 ctdb_client_async_add(async_data, state);
3165         }
3166
3167         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3168                 talloc_free(async_data);
3169                 return -1;
3170         }
3171
3172         talloc_free(async_data);
3173         return 0;
3174 }
3175
3176 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3177                                 struct ctdb_vnn_map *vnn_map,
3178                                 TALLOC_CTX *mem_ctx,
3179                                 bool include_self)
3180 {
3181         int i, j, num_nodes;
3182         uint32_t *nodes;
3183
3184         for (i=num_nodes=0;i<vnn_map->size;i++) {
3185                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3186                         continue;
3187                 }
3188                 num_nodes++;
3189         } 
3190
3191         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3192         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3193
3194         for (i=j=0;i<vnn_map->size;i++) {
3195                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3196                         continue;
3197                 }
3198                 nodes[j++] = vnn_map->map[i];
3199         } 
3200
3201         return nodes;
3202 }
3203
3204 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3205                                 struct ctdb_node_map *node_map,
3206                                 TALLOC_CTX *mem_ctx,
3207                                 bool include_self)
3208 {
3209         int i, j, num_nodes;
3210         uint32_t *nodes;
3211
3212         for (i=num_nodes=0;i<node_map->num;i++) {
3213                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3214                         continue;
3215                 }
3216                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3217                         continue;
3218                 }
3219                 num_nodes++;
3220         } 
3221
3222         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3223         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3224
3225         for (i=j=0;i<node_map->num;i++) {
3226                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3227                         continue;
3228                 }
3229                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3230                         continue;
3231                 }
3232                 nodes[j++] = node_map->nodes[i].pnn;
3233         } 
3234
3235         return nodes;
3236 }
3237
3238 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3239                                 struct ctdb_node_map *node_map,
3240                                 TALLOC_CTX *mem_ctx,
3241                                 uint32_t pnn)
3242 {
3243         int i, j, num_nodes;
3244         uint32_t *nodes;
3245
3246         for (i=num_nodes=0;i<node_map->num;i++) {
3247                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3248                         continue;
3249                 }
3250                 if (node_map->nodes[i].pnn == pnn) {
3251                         continue;
3252                 }
3253                 num_nodes++;
3254         } 
3255
3256         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3257         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3258
3259         for (i=j=0;i<node_map->num;i++) {
3260                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3261                         continue;
3262                 }
3263                 if (node_map->nodes[i].pnn == pnn) {
3264                         continue;
3265                 }
3266                 nodes[j++] = node_map->nodes[i].pnn;
3267         } 
3268
3269         return nodes;
3270 }
3271
3272 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3273                                 struct ctdb_node_map *node_map,
3274                                 TALLOC_CTX *mem_ctx,
3275                                 bool include_self)
3276 {
3277         int i, j, num_nodes;
3278         uint32_t *nodes;
3279
3280         for (i=num_nodes=0;i<node_map->num;i++) {
3281                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3282                         continue;
3283                 }
3284                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3285                         continue;
3286                 }
3287                 num_nodes++;
3288         } 
3289
3290         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3291         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3292
3293         for (i=j=0;i<node_map->num;i++) {
3294                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3295                         continue;
3296                 }
3297                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3298                         continue;
3299                 }
3300                 nodes[j++] = node_map->nodes[i].pnn;
3301         } 
3302
3303         return nodes;
3304 }
3305
3306 /* 
3307   this is used to test if a pnn lock exists and if it exists will return
3308   the number of connections that pnn has reported or -1 if that recovery
3309   daemon is not running.
3310 */
3311 int
3312 ctdb_read_pnn_lock(int fd, int32_t pnn)
3313 {
3314         struct flock lock;
3315         char c;
3316
3317         lock.l_type = F_WRLCK;
3318         lock.l_whence = SEEK_SET;
3319         lock.l_start = pnn;
3320         lock.l_len = 1;
3321         lock.l_pid = 0;
3322
3323         if (fcntl(fd, F_GETLK, &lock) != 0) {
3324                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3325                 return -1;
3326         }
3327
3328         if (lock.l_type == F_UNLCK) {
3329                 return -1;
3330         }
3331
3332         if (pread(fd, &c, 1, pnn) == -1) {
3333                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3334                 return -1;
3335         }
3336
3337         return c;
3338 }
3339
3340 /*
3341   get capabilities of a remote node
3342  */
3343 struct ctdb_client_control_state *
3344 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3345 {
3346         return ctdb_control_send(ctdb, destnode, 0, 
3347                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3348                            mem_ctx, &timeout, NULL);
3349 }
3350
3351 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3352 {
3353         int ret;
3354         int32_t res;
3355         TDB_DATA outdata;
3356
3357         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3358         if ( (ret != 0) || (res != 0) ) {
3359                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3360                 return -1;
3361         }
3362
3363         if (capabilities) {
3364                 *capabilities = *((uint32_t *)outdata.dptr);
3365         }
3366
3367         return 0;
3368 }
3369
3370 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3371 {
3372         struct ctdb_client_control_state *state;
3373         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3374         int ret;
3375
3376         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3377         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3378         talloc_free(tmp_ctx);
3379         return ret;
3380 }
3381
3382 /**
3383  * check whether a transaction is active on a given db on a given node
3384  */
3385 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3386                                      uint32_t destnode,
3387                                      uint32_t db_id)
3388 {
3389         int32_t status;
3390         int ret;
3391         TDB_DATA indata;
3392
3393         indata.dptr = (uint8_t *)&db_id;
3394         indata.dsize = sizeof(db_id);
3395
3396         ret = ctdb_control(ctdb, destnode, 0,
3397                            CTDB_CONTROL_TRANS2_ACTIVE,
3398                            0, indata, NULL, NULL, &status,
3399                            NULL, NULL);
3400
3401         if (ret != 0) {
3402                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3403                 return -1;
3404         }
3405
3406         return status;
3407 }
3408
3409
3410 struct ctdb_transaction_handle {
3411         struct ctdb_db_context *ctdb_db;
3412         bool in_replay;
3413         /*
3414          * we store the reads and writes done under a transaction:
3415          * - one list stores both reads and writes (m_all),
3416          * - the other just writes (m_write)
3417          */
3418         struct ctdb_marshall_buffer *m_all;
3419         struct ctdb_marshall_buffer *m_write;
3420 };
3421
3422 /* start a transaction on a database */
3423 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3424 {
3425         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3426         return 0;
3427 }
3428
3429 /* start a transaction on a database */
3430 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3431 {
3432         struct ctdb_record_handle *rh;
3433         TDB_DATA key;
3434         TDB_DATA data;
3435         struct ctdb_ltdb_header header;
3436         TALLOC_CTX *tmp_ctx;
3437         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3438         int ret;
3439         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3440         pid_t pid;
3441         int32_t status;
3442
3443         key.dptr = discard_const(keyname);
3444         key.dsize = strlen(keyname);
3445
3446         if (!ctdb_db->persistent) {
3447                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3448                 return -1;
3449         }
3450
3451 again:
3452         tmp_ctx = talloc_new(h);
3453
3454         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3455         if (rh == NULL) {
3456                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3457                 talloc_free(tmp_ctx);
3458                 return -1;
3459         }
3460
3461         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3462                                               CTDB_CURRENT_NODE,
3463                                               ctdb_db->db_id);
3464         if (status == 1) {
3465                 unsigned long int usec = (1000 + random()) % 100000;
3466                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3467                                     "on db_id[0x%08x]. waiting for %lu "
3468                                     "microseconds\n",
3469                                     ctdb_db->db_id, usec));
3470                 talloc_free(tmp_ctx);
3471                 usleep(usec);
3472                 goto again;
3473         }
3474
3475         /*
3476          * store the pid in the database:
3477          * it is not enough that the node is dmaster...
3478          */
3479         pid = getpid();
3480         data.dptr = (unsigned char *)&pid;
3481         data.dsize = sizeof(pid_t);
3482         rh->header.rsn++;
3483         rh->header.dmaster = ctdb_db->ctdb->pnn;
3484         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3485         if (ret != 0) {
3486                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3487                                   "transaction record\n"));
3488                 talloc_free(tmp_ctx);
3489                 return -1;
3490         }
3491
3492         talloc_free(rh);
3493
3494         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3495         if (ret != 0) {
3496                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3497                 talloc_free(tmp_ctx);
3498                 return -1;
3499         }
3500
3501         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3502         if (ret != 0) {
3503                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3504                                  "lock record inside transaction\n"));
3505                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3506                 talloc_free(tmp_ctx);
3507                 goto again;
3508         }
3509
3510         if (header.dmaster != ctdb_db->ctdb->pnn) {
3511                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3512                                    "transaction lock record\n"));
3513                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3514                 talloc_free(tmp_ctx);
3515                 goto again;
3516         }
3517
3518         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3519                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3520                                     "the transaction lock record\n"));
3521                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3522                 talloc_free(tmp_ctx);
3523                 goto again;
3524         }
3525
3526         talloc_free(tmp_ctx);
3527
3528         return 0;
3529 }
3530
3531
3532 /* start a transaction on a database */
3533 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3534                                                        TALLOC_CTX *mem_ctx)
3535 {
3536         struct ctdb_transaction_handle *h;
3537         int ret;
3538
3539         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3540         if (h == NULL) {
3541                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3542                 return NULL;
3543         }
3544
3545         h->ctdb_db = ctdb_db;
3546
3547         ret = ctdb_transaction_fetch_start(h);
3548         if (ret != 0) {
3549                 talloc_free(h);
3550                 return NULL;
3551         }
3552
3553         talloc_set_destructor(h, ctdb_transaction_destructor);
3554
3555         return h;
3556 }
3557
3558
3559
3560 /*
3561   fetch a record inside a transaction
3562  */
3563 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3564                            TALLOC_CTX *mem_ctx, 
3565                            TDB_DATA key, TDB_DATA *data)
3566 {
3567         struct ctdb_ltdb_header header;
3568         int ret;
3569
3570         ZERO_STRUCT(header);
3571
3572         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3573         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3574                 /* record doesn't exist yet */
3575                 *data = tdb_null;
3576                 ret = 0;
3577         }
3578         
3579         if (ret != 0) {
3580                 return ret;
3581         }
3582
3583         if (!h->in_replay) {
3584                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3585                 if (h->m_all == NULL) {
3586                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3587                         return -1;
3588                 }
3589         }
3590
3591         return 0;
3592 }
3593
3594 /*
3595   stores a record inside a transaction
3596  */
3597 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3598                            TDB_DATA key, TDB_DATA data)
3599 {
3600         TALLOC_CTX *tmp_ctx = talloc_new(h);
3601         struct ctdb_ltdb_header header;
3602         TDB_DATA olddata;
3603         int ret;
3604
3605         ZERO_STRUCT(header);
3606
3607         /* we need the header so we can update the RSN */
3608         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3609         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3610                 /* the record doesn't exist - create one with us as dmaster.
3611                    This is only safe because we are in a transaction and this
3612                    is a persistent database */
3613                 ZERO_STRUCT(header);
3614         } else if (ret != 0) {
3615                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3616                 talloc_free(tmp_ctx);
3617                 return ret;
3618         }
3619
3620         if (data.dsize == olddata.dsize &&
3621             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3622                 /* save writing the same data */
3623                 talloc_free(tmp_ctx);
3624                 return 0;
3625         }
3626
3627         header.dmaster = h->ctdb_db->ctdb->pnn;
3628         header.rsn++;
3629
3630         if (!h->in_replay) {
3631                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3632                 if (h->m_all == NULL) {
3633                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3634                         talloc_free(tmp_ctx);
3635                         return -1;
3636                 }
3637         }               
3638
3639         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3640         if (h->m_write == NULL) {
3641                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3642                 talloc_free(tmp_ctx);
3643                 return -1;
3644         }
3645         
3646         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3647
3648         talloc_free(tmp_ctx);
3649         
3650         return ret;
3651 }
3652
3653 /*
3654   replay a transaction
3655  */
3656 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3657 {
3658         int ret, i;
3659         struct ctdb_rec_data *rec = NULL;
3660
3661         h->in_replay = true;
3662         talloc_free(h->m_write);
3663         h->m_write = NULL;
3664
3665         ret = ctdb_transaction_fetch_start(h);
3666         if (ret != 0) {
3667                 return ret;
3668         }
3669
3670         for (i=0;i<h->m_all->count;i++) {
3671                 TDB_DATA key, data;
3672
3673                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3674                 if (rec == NULL) {
3675                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3676                         goto failed;
3677                 }
3678
3679                 if (rec->reqid == 0) {
3680                         /* its a store */
3681                         if (ctdb_transaction_store(h, key, data) != 0) {
3682                                 goto failed;
3683                         }
3684                 } else {
3685                         TDB_DATA data2;
3686                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3687
3688                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3689                                 talloc_free(tmp_ctx);
3690                                 goto failed;
3691                         }
3692                         if (data2.dsize != data.dsize ||
3693                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3694                                 /* the record has changed on us - we have to give up */
3695                                 talloc_free(tmp_ctx);
3696                                 goto failed;
3697                         }
3698                         talloc_free(tmp_ctx);
3699                 }
3700         }
3701         
3702         return 0;
3703
3704 failed:
3705         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3706         return -1;
3707 }
3708
3709
3710 /*
3711   commit a transaction
3712  */
3713 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3714 {
3715         int ret, retries=0;
3716         int32_t status;
3717         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3718         struct timeval timeout;
3719         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3720
3721         talloc_set_destructor(h, NULL);
3722
3723         /* our commit strategy is quite complex.
3724
3725            - we first try to commit the changes to all other nodes
3726
3727            - if that works, then we commit locally and we are done
3728
3729            - if a commit on another node fails, then we need to cancel
3730              the transaction, then restart the transaction (thus
3731              opening a window of time for a pending recovery to
3732              complete), then replay the transaction, checking all the
3733              reads and writes (checking that reads give the same data,
3734              and writes succeed). Then we retry the transaction to the
3735              other nodes
3736         */
3737
3738 again:
3739         if (h->m_write == NULL) {
3740                 /* no changes were made */
3741                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3742                 talloc_free(h);
3743                 return 0;
3744         }
3745
3746         /* tell ctdbd to commit to the other nodes */
3747         timeout = timeval_current_ofs(1, 0);
3748         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3749                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3750                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3751                            &timeout, NULL);
3752         if (ret != 0 || status != 0) {
3753                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3754                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3755                                      ", retrying after 1 second...\n",
3756                                      (retries==0)?"":"retry "));
3757                 sleep(1);
3758
3759                 if (ret != 0) {
3760                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3761                 } else {
3762                         /* work out what error code we will give if we 
3763                            have to fail the operation */
3764                         switch ((enum ctdb_trans2_commit_error)status) {
3765                         case CTDB_TRANS2_COMMIT_SUCCESS:
3766                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3767                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3768                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3769                                 break;
3770                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3771                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3772                                 break;
3773                         }
3774                 }
3775
3776                 if (++retries == 100) {
3777                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3778                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3779                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3780                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3781                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3782                         talloc_free(h);
3783                         return -1;
3784                 }               
3785
3786                 if (ctdb_replay_transaction(h) != 0) {
3787                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3788                                           "transaction on db 0x%08x, "
3789                                           "failure control =%u\n",
3790                                           h->ctdb_db->db_id,
3791                                           (unsigned)failure_control));
3792                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3793                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3794                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3795                         talloc_free(h);
3796                         return -1;
3797                 }
3798                 goto again;
3799         } else {
3800                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3801         }
3802
3803         /* do the real commit locally */
3804         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3805         if (ret != 0) {
3806                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3807                                   "on db id 0x%08x locally, "
3808                                   "failure_control=%u\n",
3809                                   h->ctdb_db->db_id,
3810                                   (unsigned)failure_control));
3811                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3812                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3813                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3814                 talloc_free(h);
3815                 return ret;
3816         }
3817
3818         /* tell ctdbd that we are finished with our local commit */
3819         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3820                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3821                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3822         talloc_free(h);
3823         return 0;
3824 }
3825
3826 /*
3827   recovery daemon ping to main daemon
3828  */
3829 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3830 {
3831         int ret;
3832         int32_t res;
3833
3834         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3835                            ctdb, NULL, &res, NULL, NULL);
3836         if (ret != 0 || res != 0) {
3837                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3838                 return -1;
3839         }
3840
3841         return 0;
3842 }
3843
3844 /* when forking the main daemon and the child process needs to connect back
3845  * to the daemon as a client process, this function can be used to change
3846  * the ctdb context from daemon into client mode
3847  */
3848 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3849 {
3850         int ret;
3851         va_list ap;
3852
3853         /* Add extra information so we can identify this in the logs */
3854         va_start(ap, fmt);
3855         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3856         va_end(ap);
3857
3858         /* shutdown the transport */
3859         if (ctdb->methods) {
3860                 ctdb->methods->shutdown(ctdb);
3861         }
3862
3863         /* get a new event context */
3864         talloc_free(ctdb->ev);
3865         ctdb->ev = event_context_init(ctdb);
3866         tevent_loop_allow_nesting(ctdb->ev);
3867
3868         close(ctdb->daemon.sd);
3869         ctdb->daemon.sd = -1;
3870
3871         /* the client does not need to be realtime */
3872         if (ctdb->do_setsched) {
3873                 ctdb_restore_scheduler(ctdb);
3874         }
3875
3876         /* initialise ctdb */
3877         ret = ctdb_socket_connect(ctdb);
3878         if (ret != 0) {
3879                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3880                 return -1;
3881         }
3882
3883          return 0;
3884 }
3885
3886 /*
3887   get the status of running the monitor eventscripts: NULL means never run.
3888  */
3889 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3890                 struct timeval timeout, uint32_t destnode, 
3891                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3892                 struct ctdb_scripts_wire **script_status)
3893 {
3894         int ret;
3895         TDB_DATA outdata, indata;
3896         int32_t res;
3897         uint32_t uinttype = type;
3898
3899         indata.dptr = (uint8_t *)&uinttype;
3900         indata.dsize = sizeof(uinttype);
3901
3902         ret = ctdb_control(ctdb, destnode, 0, 
3903                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3904                            mem_ctx, &outdata, &res, &timeout, NULL);
3905         if (ret != 0 || res != 0) {
3906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3907                 return -1;
3908         }
3909
3910         if (outdata.dsize == 0) {
3911                 *script_status = NULL;
3912         } else {
3913                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3914                 talloc_free(outdata.dptr);
3915         }
3916                     
3917         return 0;
3918 }
3919
3920 /*
3921   tell the main daemon how long it took to lock the reclock file
3922  */
3923 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3924 {
3925         int ret;
3926         int32_t res;
3927         TDB_DATA data;
3928
3929         data.dptr = (uint8_t *)&latency;
3930         data.dsize = sizeof(latency);
3931
3932         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3933                            ctdb, NULL, &res, NULL, NULL);
3934         if (ret != 0 || res != 0) {
3935                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3936                 return -1;
3937         }
3938
3939         return 0;
3940 }
3941
3942 /*
3943   get the name of the reclock file
3944  */
3945 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3946                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3947                          const char **name)
3948 {
3949         int ret;
3950         int32_t res;
3951         TDB_DATA data;
3952
3953         ret = ctdb_control(ctdb, destnode, 0, 
3954                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3955                            mem_ctx, &data, &res, &timeout, NULL);
3956         if (ret != 0 || res != 0) {
3957                 return -1;
3958         }
3959
3960         if (data.dsize == 0) {
3961                 *name = NULL;
3962         } else {
3963                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3964         }
3965         talloc_free(data.dptr);
3966
3967         return 0;
3968 }
3969
3970 /*
3971   set the reclock filename for a node
3972  */
3973 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3974 {
3975         int ret;
3976         TDB_DATA data;
3977         int32_t res;
3978
3979         if (reclock == NULL) {
3980                 data.dsize = 0;
3981                 data.dptr  = NULL;
3982         } else {
3983                 data.dsize = strlen(reclock) + 1;
3984                 data.dptr  = discard_const(reclock);
3985         }
3986
3987         ret = ctdb_control(ctdb, destnode, 0, 
3988                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3989                            NULL, NULL, &res, &timeout, NULL);
3990         if (ret != 0 || res != 0) {
3991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3992                 return -1;
3993         }
3994
3995         return 0;
3996 }
3997
3998 /*
3999   stop a node
4000  */
4001 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4002 {
4003         int ret;
4004         int32_t res;
4005
4006         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4007                            ctdb, NULL, &res, &timeout, NULL);
4008         if (ret != 0 || res != 0) {
4009                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4010                 return -1;
4011         }
4012
4013         return 0;
4014 }
4015
4016 /*
4017   continue a node
4018  */
4019 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4020 {
4021         int ret;
4022
4023         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4024                            ctdb, NULL, NULL, &timeout, NULL);
4025         if (ret != 0) {
4026                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4027                 return -1;
4028         }
4029
4030         return 0;
4031 }
4032
4033 /*
4034   set the natgw state for a node
4035  */
4036 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4037 {
4038         int ret;
4039         TDB_DATA data;
4040         int32_t res;
4041
4042         data.dsize = sizeof(natgwstate);
4043         data.dptr  = (uint8_t *)&natgwstate;
4044
4045         ret = ctdb_control(ctdb, destnode, 0, 
4046                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4047                            NULL, NULL, &res, &timeout, NULL);
4048         if (ret != 0 || res != 0) {
4049                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4050                 return -1;
4051         }
4052
4053         return 0;
4054 }
4055
4056 /*
4057   set the lmaster role for a node
4058  */
4059 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4060 {
4061         int ret;
4062         TDB_DATA data;
4063         int32_t res;
4064
4065         data.dsize = sizeof(lmasterrole);
4066         data.dptr  = (uint8_t *)&lmasterrole;
4067
4068         ret = ctdb_control(ctdb, destnode, 0, 
4069                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4070                            NULL, NULL, &res, &timeout, NULL);
4071         if (ret != 0 || res != 0) {
4072                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4073                 return -1;
4074         }
4075
4076         return 0;
4077 }
4078
4079 /*
4080   set the recmaster role for a node
4081  */
4082 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4083 {
4084         int ret;
4085         TDB_DATA data;
4086         int32_t res;
4087
4088         data.dsize = sizeof(recmasterrole);
4089         data.dptr  = (uint8_t *)&recmasterrole;
4090
4091         ret = ctdb_control(ctdb, destnode, 0, 
4092                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4093                            NULL, NULL, &res, &timeout, NULL);
4094         if (ret != 0 || res != 0) {
4095                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4096                 return -1;
4097         }
4098
4099         return 0;
4100 }
4101
4102 /* enable an eventscript
4103  */
4104 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4105 {
4106         int ret;
4107         TDB_DATA data;
4108         int32_t res;
4109
4110         data.dsize = strlen(script) + 1;
4111         data.dptr  = discard_const(script);
4112
4113         ret = ctdb_control(ctdb, destnode, 0, 
4114                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4115                            NULL, NULL, &res, &timeout, NULL);
4116         if (ret != 0 || res != 0) {
4117                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4118                 return -1;
4119         }
4120
4121         return 0;
4122 }
4123
4124 /* disable an eventscript
4125  */
4126 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4127 {
4128         int ret;
4129         TDB_DATA data;
4130         int32_t res;
4131
4132         data.dsize = strlen(script) + 1;
4133         data.dptr  = discard_const(script);
4134
4135         ret = ctdb_control(ctdb, destnode, 0, 
4136                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4137                            NULL, NULL, &res, &timeout, NULL);
4138         if (ret != 0 || res != 0) {
4139                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4140                 return -1;
4141         }
4142
4143         return 0;
4144 }
4145
4146
4147 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4148 {
4149         int ret;
4150         TDB_DATA data;
4151         int32_t res;
4152
4153         data.dsize = sizeof(*bantime);
4154         data.dptr  = (uint8_t *)bantime;
4155
4156         ret = ctdb_control(ctdb, destnode, 0, 
4157                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4158                            NULL, NULL, &res, &timeout, NULL);
4159         if (ret != 0 || res != 0) {
4160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4161                 return -1;
4162         }
4163
4164         return 0;
4165 }
4166
4167
4168 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4169 {
4170         int ret;
4171         TDB_DATA outdata;
4172         int32_t res;
4173         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4174
4175         ret = ctdb_control(ctdb, destnode, 0, 
4176                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4177                            tmp_ctx, &outdata, &res, &timeout, NULL);
4178         if (ret != 0 || res != 0) {
4179                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4180                 talloc_free(tmp_ctx);
4181                 return -1;
4182         }
4183
4184         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4185         talloc_free(tmp_ctx);
4186
4187         return 0;
4188 }
4189
4190
4191 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4192 {
4193         int ret;
4194         int32_t res;
4195         TDB_DATA data;
4196         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4197
4198         data.dptr = (uint8_t*)db_prio;
4199         data.dsize = sizeof(*db_prio);
4200
4201         ret = ctdb_control(ctdb, destnode, 0, 
4202                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4203                            tmp_ctx, NULL, &res, &timeout, NULL);
4204         if (ret != 0 || res != 0) {
4205                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4206                 talloc_free(tmp_ctx);
4207                 return -1;
4208         }
4209
4210         talloc_free(tmp_ctx);
4211
4212         return 0;
4213 }
4214
4215 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4216 {
4217         int ret;
4218         int32_t res;
4219         TDB_DATA data;
4220         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4221
4222         data.dptr = (uint8_t*)&db_id;
4223         data.dsize = sizeof(db_id);
4224
4225         ret = ctdb_control(ctdb, destnode, 0, 
4226                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4227                            tmp_ctx, NULL, &res, &timeout, NULL);
4228         if (ret != 0 || res < 0) {
4229                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4230                 talloc_free(tmp_ctx);
4231                 return -1;
4232         }
4233
4234         if (priority) {
4235                 *priority = res;
4236         }
4237
4238         talloc_free(tmp_ctx);
4239
4240         return 0;
4241 }
4242
4243 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4244 {
4245         int ret;
4246         TDB_DATA outdata;
4247         int32_t res;
4248
4249         ret = ctdb_control(ctdb, destnode, 0, 
4250                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4251                            mem_ctx, &outdata, &res, &timeout, NULL);
4252         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4253                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4254                 return -1;
4255         }
4256
4257         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4258         talloc_free(outdata.dptr);
4259                     
4260         return 0;
4261 }
4262
4263 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4264 {
4265         if (h == NULL) {
4266                 return NULL;
4267         }
4268
4269         return &h->header;
4270 }