client: add timeout argument to ctdb_attach
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         /* we need to force the record to be written out if this was a remote access */
109         if (c->new_data == NULL) {
110                 c->new_data = &c->record_data;
111         }
112
113         if (c->new_data) {
114                 /* XXX check that we always have the lock here? */
115                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
117                         talloc_free(c);
118                         return -1;
119                 }
120         }
121
122         if (c->reply_data) {
123                 call->reply_data = *c->reply_data;
124
125                 talloc_steal(call, call->reply_data.dptr);
126                 talloc_set_name_const(call->reply_data.dptr, __location__);
127         } else {
128                 call->reply_data.dptr = NULL;
129                 call->reply_data.dsize = 0;
130         }
131         call->status = c->status;
132
133         talloc_free(c);
134
135         return 0;
136 }
137
138
139 /*
140   queue a packet for sending from client to daemon
141 */
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
143 {
144         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
145 }
146
147
148 /*
149   called when a CTDB_REPLY_CALL packet comes in in the client
150
151   This packet comes in response to a CTDB_REQ_CALL request packet. It
152   contains any reply data from the call
153 */
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
155 {
156         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157         struct ctdb_client_call_state *state;
158
159         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
160         if (state == NULL) {
161                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
162                 return;
163         }
164
165         if (hdr->reqid != state->reqid) {
166                 /* we found a record  but it was the wrong one */
167                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
168                 return;
169         }
170
171         state->call->reply_data.dptr = c->data;
172         state->call->reply_data.dsize = c->datalen;
173         state->call->status = c->status;
174
175         talloc_steal(state, c);
176
177         state->state = CTDB_CALL_DONE;
178
179         if (state->async.fn) {
180                 state->async.fn(state);
181         }
182 }
183
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
185
186 /*
187   this is called in the client, when data comes in from the daemon
188  */
189 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
190 {
191         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
193         TALLOC_CTX *tmp_ctx;
194
195         /* place the packet as a child of a tmp_ctx. We then use
196            talloc_free() below to free it. If any of the calls want
197            to keep it, then they will steal it somewhere else, and the
198            talloc_free() will be a no-op */
199         tmp_ctx = talloc_new(ctdb);
200         talloc_steal(tmp_ctx, hdr);
201
202         if (cnt == 0) {
203                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
204                 exit(0);
205         }
206
207         if (cnt < sizeof(*hdr)) {
208                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
209                 goto done;
210         }
211         if (cnt != hdr->length) {
212                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
213                                (unsigned)hdr->length, (unsigned)cnt);
214                 goto done;
215         }
216
217         if (hdr->ctdb_magic != CTDB_MAGIC) {
218                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
219                 goto done;
220         }
221
222         if (hdr->ctdb_version != CTDB_VERSION) {
223                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
224                 goto done;
225         }
226
227         switch (hdr->operation) {
228         case CTDB_REPLY_CALL:
229                 ctdb_client_reply_call(ctdb, hdr);
230                 break;
231
232         case CTDB_REQ_MESSAGE:
233                 ctdb_request_message(ctdb, hdr);
234                 break;
235
236         case CTDB_REPLY_CONTROL:
237                 ctdb_client_reply_control(ctdb, hdr);
238                 break;
239
240         default:
241                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
242         }
243
244 done:
245         talloc_free(tmp_ctx);
246 }
247
248 /*
249   connect with exponential backoff, thanks Stevens
250 */
251 #define CONNECT_MAXSLEEP 64
252 static int ctdb_connect_retry(struct ctdb_context *ctdb)
253 {
254         struct sockaddr_un addr;
255         int secs;
256         int ret = 0;
257
258         memset(&addr, 0, sizeof(addr));
259         addr.sun_family = AF_UNIX;
260         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
261
262         for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) {
263                 ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr,
264                               sizeof(addr));
265                 if ((ret == 0) || (errno != EAGAIN)) {
266                         break;
267                 }
268
269                 if (secs <= (CONNECT_MAXSLEEP / 2)) {
270                         DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n",
271                                          strerror(errno), secs));
272                         sleep(secs);
273                 }
274         }
275
276         return ret;
277 }
278
279 /*
280   connect to a unix domain socket
281 */
282 int ctdb_socket_connect(struct ctdb_context *ctdb)
283 {
284         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
285         if (ctdb->daemon.sd == -1) {
286                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
287                 return -1;
288         }
289
290         set_nonblocking(ctdb->daemon.sd);
291         set_close_on_exec(ctdb->daemon.sd);
292
293         if (ctdb_connect_retry(ctdb) == -1) {
294                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
295                 close(ctdb->daemon.sd);
296                 ctdb->daemon.sd = -1;
297                 return -1;
298         }
299
300         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
301                                               CTDB_DS_ALIGNMENT, 
302                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
303         return 0;
304 }
305
306
307 struct ctdb_record_handle {
308         struct ctdb_db_context *ctdb_db;
309         TDB_DATA key;
310         TDB_DATA *data;
311         struct ctdb_ltdb_header header;
312 };
313
314
315 /*
316   make a recv call to the local ctdb daemon - called from client context
317
318   This is called when the program wants to wait for a ctdb_call to complete and get the 
319   results. This call will block unless the call has already completed.
320 */
321 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
322 {
323         if (state == NULL) {
324                 return -1;
325         }
326
327         while (state->state < CTDB_CALL_DONE) {
328                 event_loop_once(state->ctdb_db->ctdb->ev);
329         }
330         if (state->state != CTDB_CALL_DONE) {
331                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
332                 talloc_free(state);
333                 return -1;
334         }
335
336         if (state->call->reply_data.dsize) {
337                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
338                                                       state->call->reply_data.dptr,
339                                                       state->call->reply_data.dsize);
340                 call->reply_data.dsize = state->call->reply_data.dsize;
341         } else {
342                 call->reply_data.dptr = NULL;
343                 call->reply_data.dsize = 0;
344         }
345         call->status = state->call->status;
346         talloc_free(state);
347
348         return 0;
349 }
350
351
352
353
354 /*
355   destroy a ctdb_call in client
356 */
357 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
358 {
359         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
360         return 0;
361 }
362
363 /*
364   construct an event driven local ctdb_call
365
366   this is used so that locally processed ctdb_call requests are processed
367   in an event driven manner
368 */
369 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
370                                                                   struct ctdb_call *call,
371                                                                   struct ctdb_ltdb_header *header,
372                                                                   TDB_DATA *data)
373 {
374         struct ctdb_client_call_state *state;
375         struct ctdb_context *ctdb = ctdb_db->ctdb;
376         int ret;
377
378         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
379         CTDB_NO_MEMORY_NULL(ctdb, state);
380         state->call = talloc_zero(state, struct ctdb_call);
381         CTDB_NO_MEMORY_NULL(ctdb, state->call);
382
383         talloc_steal(state, data->dptr);
384
385         state->state   = CTDB_CALL_DONE;
386         *(state->call) = *call;
387         state->ctdb_db = ctdb_db;
388
389         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
390
391         return state;
392 }
393
394 /*
395   make a ctdb call to the local daemon - async send. Called from client context.
396
397   This constructs a ctdb_call request and queues it for processing. 
398   This call never blocks.
399 */
400 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
401                                               struct ctdb_call *call)
402 {
403         struct ctdb_client_call_state *state;
404         struct ctdb_context *ctdb = ctdb_db->ctdb;
405         struct ctdb_ltdb_header header;
406         TDB_DATA data;
407         int ret;
408         size_t len;
409         struct ctdb_req_call *c;
410
411         /* if the domain socket is not yet open, open it */
412         if (ctdb->daemon.sd==-1) {
413                 ctdb_socket_connect(ctdb);
414         }
415
416         ret = ctdb_ltdb_lock(ctdb_db, call->key);
417         if (ret != 0) {
418                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
419                 return NULL;
420         }
421
422         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
423
424         if (ret == 0 && header.dmaster == ctdb->pnn) {
425                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
426                 talloc_free(data.dptr);
427                 ctdb_ltdb_unlock(ctdb_db, call->key);
428                 return state;
429         }
430
431         ctdb_ltdb_unlock(ctdb_db, call->key);
432         talloc_free(data.dptr);
433
434         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
435         if (state == NULL) {
436                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
437                 return NULL;
438         }
439         state->call = talloc_zero(state, struct ctdb_call);
440         if (state->call == NULL) {
441                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
442                 return NULL;
443         }
444
445         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
446         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
447         if (c == NULL) {
448                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
449                 return NULL;
450         }
451
452         state->reqid     = ctdb_reqid_new(ctdb, state);
453         state->ctdb_db = ctdb_db;
454         talloc_set_destructor(state, ctdb_client_call_destructor);
455
456         c->hdr.reqid     = state->reqid;
457         c->flags         = call->flags;
458         c->db_id         = ctdb_db->db_id;
459         c->callid        = call->call_id;
460         c->hopcount      = 0;
461         c->keylen        = call->key.dsize;
462         c->calldatalen   = call->call_data.dsize;
463         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
464         memcpy(&c->data[call->key.dsize], 
465                call->call_data.dptr, call->call_data.dsize);
466         *(state->call)              = *call;
467         state->call->call_data.dptr = &c->data[call->key.dsize];
468         state->call->key.dptr       = &c->data[0];
469
470         state->state  = CTDB_CALL_WAIT;
471
472
473         ctdb_client_queue_pkt(ctdb, &c->hdr);
474
475         return state;
476 }
477
478
479 /*
480   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
481 */
482 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
483 {
484         struct ctdb_client_call_state *state;
485
486         state = ctdb_call_send(ctdb_db, call);
487         return ctdb_call_recv(state, call);
488 }
489
490
491 /*
492   tell the daemon what messaging srvid we will use, and register the message
493   handler function in the client
494 */
495 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
496                              ctdb_msg_fn_t handler,
497                              void *private_data)
498                                     
499 {
500         int res;
501         int32_t status;
502         
503         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
504                            tdb_null, NULL, NULL, &status, NULL, NULL);
505         if (res != 0 || status != 0) {
506                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
507                 return -1;
508         }
509
510         /* also need to register the handler with our own ctdb structure */
511         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
512 }
513
514 /*
515   tell the daemon we no longer want a srvid
516 */
517 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
518 {
519         int res;
520         int32_t status;
521         
522         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
523                            tdb_null, NULL, NULL, &status, NULL, NULL);
524         if (res != 0 || status != 0) {
525                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
526                 return -1;
527         }
528
529         /* also need to register the handler with our own ctdb structure */
530         ctdb_deregister_message_handler(ctdb, srvid, private_data);
531         return 0;
532 }
533
534
535 /*
536   send a message - from client context
537  */
538 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
539                       uint64_t srvid, TDB_DATA data)
540 {
541         struct ctdb_req_message *r;
542         int len, res;
543
544         len = offsetof(struct ctdb_req_message, data) + data.dsize;
545         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
546                                len, struct ctdb_req_message);
547         CTDB_NO_MEMORY(ctdb, r);
548
549         r->hdr.destnode  = pnn;
550         r->srvid         = srvid;
551         r->datalen       = data.dsize;
552         memcpy(&r->data[0], data.dptr, data.dsize);
553         
554         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
555         if (res != 0) {
556                 return res;
557         }
558
559         talloc_free(r);
560         return 0;
561 }
562
563
564 /*
565   cancel a ctdb_fetch_lock operation, releasing the lock
566  */
567 static int fetch_lock_destructor(struct ctdb_record_handle *h)
568 {
569         ctdb_ltdb_unlock(h->ctdb_db, h->key);
570         return 0;
571 }
572
573 /*
574   force the migration of a record to this node
575  */
576 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
577 {
578         struct ctdb_call call;
579         ZERO_STRUCT(call);
580         call.call_id = CTDB_NULL_FUNC;
581         call.key = key;
582         call.flags = CTDB_IMMEDIATE_MIGRATION;
583         return ctdb_call(ctdb_db, &call);
584 }
585
586 /*
587   get a lock on a record, and return the records data. Blocks until it gets the lock
588  */
589 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
590                                            TDB_DATA key, TDB_DATA *data)
591 {
592         int ret;
593         struct ctdb_record_handle *h;
594
595         /*
596           procedure is as follows:
597
598           1) get the chain lock. 
599           2) check if we are dmaster
600           3) if we are the dmaster then return handle 
601           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
602              reply from ctdbd
603           5) when we get the reply, goto (1)
604          */
605
606         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
607         if (h == NULL) {
608                 return NULL;
609         }
610
611         h->ctdb_db = ctdb_db;
612         h->key     = key;
613         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
614         if (h->key.dptr == NULL) {
615                 talloc_free(h);
616                 return NULL;
617         }
618         h->data    = data;
619
620         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
621                  (const char *)key.dptr));
622
623 again:
624         /* step 1 - get the chain lock */
625         ret = ctdb_ltdb_lock(ctdb_db, key);
626         if (ret != 0) {
627                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
628                 talloc_free(h);
629                 return NULL;
630         }
631
632         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
633
634         talloc_set_destructor(h, fetch_lock_destructor);
635
636         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
637
638         /* when torturing, ensure we test the remote path */
639         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
640             random() % 5 == 0) {
641                 h->header.dmaster = (uint32_t)-1;
642         }
643
644
645         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
646
647         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
648                 ctdb_ltdb_unlock(ctdb_db, key);
649                 ret = ctdb_client_force_migration(ctdb_db, key);
650                 if (ret != 0) {
651                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
652                         talloc_free(h);
653                         return NULL;
654                 }
655                 goto again;
656         }
657
658         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
659         return h;
660 }
661
662 /*
663   store some data to the record that was locked with ctdb_fetch_lock()
664 */
665 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
666 {
667         if (h->ctdb_db->persistent) {
668                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
669                 return -1;
670         }
671
672         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
673 }
674
675 /*
676   non-locking fetch of a record
677  */
678 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
679                TDB_DATA key, TDB_DATA *data)
680 {
681         struct ctdb_call call;
682         int ret;
683
684         call.call_id = CTDB_FETCH_FUNC;
685         call.call_data.dptr = NULL;
686         call.call_data.dsize = 0;
687
688         ret = ctdb_call(ctdb_db, &call);
689
690         if (ret == 0) {
691                 *data = call.reply_data;
692                 talloc_steal(mem_ctx, data->dptr);
693         }
694
695         return ret;
696 }
697
698
699
700 /*
701    called when a control completes or timesout to invoke the callback
702    function the user provided
703 */
704 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
705         struct timeval t, void *private_data)
706 {
707         struct ctdb_client_control_state *state;
708         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
709         int ret;
710
711         state = talloc_get_type(private_data, struct ctdb_client_control_state);
712         talloc_steal(tmp_ctx, state);
713
714         ret = ctdb_control_recv(state->ctdb, state, state,
715                         NULL, 
716                         NULL, 
717                         NULL);
718
719         talloc_free(tmp_ctx);
720 }
721
722 /*
723   called when a CTDB_REPLY_CONTROL packet comes in in the client
724
725   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
726   contains any reply data from the control
727 */
728 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
729                                       struct ctdb_req_header *hdr)
730 {
731         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
732         struct ctdb_client_control_state *state;
733
734         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
735         if (state == NULL) {
736                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
737                 return;
738         }
739
740         if (hdr->reqid != state->reqid) {
741                 /* we found a record  but it was the wrong one */
742                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
743                 return;
744         }
745
746         state->outdata.dptr = c->data;
747         state->outdata.dsize = c->datalen;
748         state->status = c->status;
749         if (c->errorlen) {
750                 state->errormsg = talloc_strndup(state, 
751                                                  (char *)&c->data[c->datalen], 
752                                                  c->errorlen);
753         }
754
755         /* state->outdata now uses resources from c so we dont want c
756            to just dissappear from under us while state is still alive
757         */
758         talloc_steal(state, c);
759
760         state->state = CTDB_CONTROL_DONE;
761
762         /* if we had a callback registered for this control, pull the response
763            and call the callback.
764         */
765         if (state->async.fn) {
766                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
767         }
768 }
769
770
771 /*
772   destroy a ctdb_control in client
773 */
774 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
775 {
776         ctdb_reqid_remove(state->ctdb, state->reqid);
777         return 0;
778 }
779
780
781 /* time out handler for ctdb_control */
782 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
783         struct timeval t, void *private_data)
784 {
785         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
786
787         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
788                          "dstnode:%u\n", state->reqid, state->c->opcode,
789                          state->c->hdr.destnode));
790
791         state->state = CTDB_CONTROL_TIMEOUT;
792
793         /* if we had a callback registered for this control, pull the response
794            and call the callback.
795         */
796         if (state->async.fn) {
797                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
798         }
799 }
800
801 /* async version of send control request */
802 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
803                 uint32_t destnode, uint64_t srvid, 
804                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
805                 TALLOC_CTX *mem_ctx,
806                 struct timeval *timeout,
807                 char **errormsg)
808 {
809         struct ctdb_client_control_state *state;
810         size_t len;
811         struct ctdb_req_control *c;
812         int ret;
813
814         if (errormsg) {
815                 *errormsg = NULL;
816         }
817
818         /* if the domain socket is not yet open, open it */
819         if (ctdb->daemon.sd==-1) {
820                 ctdb_socket_connect(ctdb);
821         }
822
823         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
824         CTDB_NO_MEMORY_NULL(ctdb, state);
825
826         state->ctdb       = ctdb;
827         state->reqid      = ctdb_reqid_new(ctdb, state);
828         state->state      = CTDB_CONTROL_WAIT;
829         state->errormsg   = NULL;
830
831         talloc_set_destructor(state, ctdb_control_destructor);
832
833         len = offsetof(struct ctdb_req_control, data) + data.dsize;
834         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
835                                len, struct ctdb_req_control);
836         state->c            = c;        
837         CTDB_NO_MEMORY_NULL(ctdb, c);
838         c->hdr.reqid        = state->reqid;
839         c->hdr.destnode     = destnode;
840         c->opcode           = opcode;
841         c->client_id        = 0;
842         c->flags            = flags;
843         c->srvid            = srvid;
844         c->datalen          = data.dsize;
845         if (data.dsize) {
846                 memcpy(&c->data[0], data.dptr, data.dsize);
847         }
848
849         /* timeout */
850         if (timeout && !timeval_is_zero(timeout)) {
851                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
852         }
853
854         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
855         if (ret != 0) {
856                 talloc_free(state);
857                 return NULL;
858         }
859
860         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
861                 talloc_free(state);
862                 return NULL;
863         }
864
865         return state;
866 }
867
868
869 /* async version of receive control reply */
870 int ctdb_control_recv(struct ctdb_context *ctdb, 
871                 struct ctdb_client_control_state *state, 
872                 TALLOC_CTX *mem_ctx,
873                 TDB_DATA *outdata, int32_t *status, char **errormsg)
874 {
875         TALLOC_CTX *tmp_ctx;
876
877         if (status != NULL) {
878                 *status = -1;
879         }
880         if (errormsg != NULL) {
881                 *errormsg = NULL;
882         }
883
884         if (state == NULL) {
885                 return -1;
886         }
887
888         /* prevent double free of state */
889         tmp_ctx = talloc_new(ctdb);
890         talloc_steal(tmp_ctx, state);
891
892         /* loop one event at a time until we either timeout or the control
893            completes.
894         */
895         while (state->state == CTDB_CONTROL_WAIT) {
896                 event_loop_once(ctdb->ev);
897         }
898
899         if (state->state != CTDB_CONTROL_DONE) {
900                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
901                 if (state->async.fn) {
902                         state->async.fn(state);
903                 }
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         if (state->errormsg) {
909                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
910                 if (errormsg) {
911                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
912                 }
913                 if (state->async.fn) {
914                         state->async.fn(state);
915                 }
916                 talloc_free(tmp_ctx);
917                 return -1;
918         }
919
920         if (outdata) {
921                 *outdata = state->outdata;
922                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
923         }
924
925         if (status) {
926                 *status = state->status;
927         }
928
929         if (state->async.fn) {
930                 state->async.fn(state);
931         }
932
933         talloc_free(tmp_ctx);
934         return 0;
935 }
936
937
938
939 /*
940   send a ctdb control message
941   timeout specifies how long we should wait for a reply.
942   if timeout is NULL we wait indefinitely
943  */
944 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
945                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
946                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
947                  struct timeval *timeout,
948                  char **errormsg)
949 {
950         struct ctdb_client_control_state *state;
951
952         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
953                         flags, data, mem_ctx,
954                         timeout, errormsg);
955         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
956                         errormsg);
957 }
958
959
960
961
962 /*
963   a process exists call. Returns 0 if process exists, -1 otherwise
964  */
965 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
966 {
967         int ret;
968         TDB_DATA data;
969         int32_t status;
970
971         data.dptr = (uint8_t*)&pid;
972         data.dsize = sizeof(pid);
973
974         ret = ctdb_control(ctdb, destnode, 0, 
975                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
976                            NULL, NULL, &status, NULL, NULL);
977         if (ret != 0) {
978                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
979                 return -1;
980         }
981
982         return status;
983 }
984
985 /*
986   get remote statistics
987  */
988 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
989 {
990         int ret;
991         TDB_DATA data;
992         int32_t res;
993
994         ret = ctdb_control(ctdb, destnode, 0, 
995                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
996                            ctdb, &data, &res, NULL, NULL);
997         if (ret != 0 || res != 0) {
998                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
999                 return -1;
1000         }
1001
1002         if (data.dsize != sizeof(struct ctdb_statistics)) {
1003                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1004                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1005                       return -1;
1006         }
1007
1008         *status = *(struct ctdb_statistics *)data.dptr;
1009         talloc_free(data.dptr);
1010                         
1011         return 0;
1012 }
1013
1014 /*
1015   shutdown a remote ctdb node
1016  */
1017 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1018 {
1019         struct ctdb_client_control_state *state;
1020
1021         state = ctdb_control_send(ctdb, destnode, 0, 
1022                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1023                            NULL, &timeout, NULL);
1024         if (state == NULL) {
1025                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1026                 return -1;
1027         }
1028
1029         return 0;
1030 }
1031
1032 /*
1033   get vnn map from a remote node
1034  */
1035 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1036 {
1037         int ret;
1038         TDB_DATA outdata;
1039         int32_t res;
1040         struct ctdb_vnn_map_wire *map;
1041
1042         ret = ctdb_control(ctdb, destnode, 0, 
1043                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1044                            mem_ctx, &outdata, &res, &timeout, NULL);
1045         if (ret != 0 || res != 0) {
1046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1047                 return -1;
1048         }
1049         
1050         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1051         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1052             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1053                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1054                 return -1;
1055         }
1056
1057         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1058         CTDB_NO_MEMORY(ctdb, *vnnmap);
1059         (*vnnmap)->generation = map->generation;
1060         (*vnnmap)->size       = map->size;
1061         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1062
1063         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1064         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1065         talloc_free(outdata.dptr);
1066                     
1067         return 0;
1068 }
1069
1070
1071 /*
1072   get the recovery mode of a remote node
1073  */
1074 struct ctdb_client_control_state *
1075 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1076 {
1077         return ctdb_control_send(ctdb, destnode, 0, 
1078                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1079                            mem_ctx, &timeout, NULL);
1080 }
1081
1082 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1083 {
1084         int ret;
1085         int32_t res;
1086
1087         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1088         if (ret != 0) {
1089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1090                 return -1;
1091         }
1092
1093         if (recmode) {
1094                 *recmode = (uint32_t)res;
1095         }
1096
1097         return 0;
1098 }
1099
1100 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1101 {
1102         struct ctdb_client_control_state *state;
1103
1104         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1105         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1106 }
1107
1108
1109
1110
1111 /*
1112   set the recovery mode of a remote node
1113  */
1114 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1115 {
1116         int ret;
1117         TDB_DATA data;
1118         int32_t res;
1119
1120         data.dsize = sizeof(uint32_t);
1121         data.dptr = (unsigned char *)&recmode;
1122
1123         ret = ctdb_control(ctdb, destnode, 0, 
1124                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1125                            NULL, NULL, &res, &timeout, NULL);
1126         if (ret != 0 || res != 0) {
1127                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1128                 return -1;
1129         }
1130
1131         return 0;
1132 }
1133
1134
1135
1136 /*
1137   get the recovery master of a remote node
1138  */
1139 struct ctdb_client_control_state *
1140 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1141                         struct timeval timeout, uint32_t destnode)
1142 {
1143         return ctdb_control_send(ctdb, destnode, 0, 
1144                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1145                            mem_ctx, &timeout, NULL);
1146 }
1147
1148 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1149 {
1150         int ret;
1151         int32_t res;
1152
1153         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1154         if (ret != 0) {
1155                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1156                 return -1;
1157         }
1158
1159         if (recmaster) {
1160                 *recmaster = (uint32_t)res;
1161         }
1162
1163         return 0;
1164 }
1165
1166 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1167 {
1168         struct ctdb_client_control_state *state;
1169
1170         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1171         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1172 }
1173
1174
1175 /*
1176   set the recovery master of a remote node
1177  */
1178 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1179 {
1180         int ret;
1181         TDB_DATA data;
1182         int32_t res;
1183
1184         ZERO_STRUCT(data);
1185         data.dsize = sizeof(uint32_t);
1186         data.dptr = (unsigned char *)&recmaster;
1187
1188         ret = ctdb_control(ctdb, destnode, 0, 
1189                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1190                            NULL, NULL, &res, &timeout, NULL);
1191         if (ret != 0 || res != 0) {
1192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1193                 return -1;
1194         }
1195
1196         return 0;
1197 }
1198
1199
1200 /*
1201   get a list of databases off a remote node
1202  */
1203 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1204                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1205 {
1206         int ret;
1207         TDB_DATA outdata;
1208         int32_t res;
1209
1210         ret = ctdb_control(ctdb, destnode, 0, 
1211                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1212                            mem_ctx, &outdata, &res, &timeout, NULL);
1213         if (ret != 0 || res != 0) {
1214                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1215                 return -1;
1216         }
1217
1218         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1219         talloc_free(outdata.dptr);
1220                     
1221         return 0;
1222 }
1223
1224 /*
1225   get a list of nodes (vnn and flags ) from a remote node
1226  */
1227 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1228                 struct timeval timeout, uint32_t destnode, 
1229                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1230 {
1231         int ret;
1232         TDB_DATA outdata;
1233         int32_t res;
1234
1235         ret = ctdb_control(ctdb, destnode, 0, 
1236                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1237                            mem_ctx, &outdata, &res, &timeout, NULL);
1238         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1239                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1240                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1241         }
1242         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1244                 return -1;
1245         }
1246
1247         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1248         talloc_free(outdata.dptr);
1249                     
1250         return 0;
1251 }
1252
1253 /*
1254   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1255  */
1256 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1257                 struct timeval timeout, uint32_t destnode, 
1258                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1259 {
1260         int ret, i, len;
1261         TDB_DATA outdata;
1262         struct ctdb_node_mapv4 *nodemapv4;
1263         int32_t res;
1264
1265         ret = ctdb_control(ctdb, destnode, 0, 
1266                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1267                            mem_ctx, &outdata, &res, &timeout, NULL);
1268         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1269                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1270                 return -1;
1271         }
1272
1273         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1274
1275         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1276         (*nodemap) = talloc_zero_size(mem_ctx, len);
1277         CTDB_NO_MEMORY(ctdb, (*nodemap));
1278
1279         (*nodemap)->num = nodemapv4->num;
1280         for (i=0; i<nodemapv4->num; i++) {
1281                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1282                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1283                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1284                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1285         }
1286                 
1287         talloc_free(outdata.dptr);
1288                     
1289         return 0;
1290 }
1291
1292 /*
1293   drop the transport, reload the nodes file and restart the transport
1294  */
1295 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1296                     struct timeval timeout, uint32_t destnode)
1297 {
1298         int ret;
1299         int32_t res;
1300
1301         ret = ctdb_control(ctdb, destnode, 0, 
1302                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1303                            NULL, NULL, &res, &timeout, NULL);
1304         if (ret != 0 || res != 0) {
1305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1306                 return -1;
1307         }
1308
1309         return 0;
1310 }
1311
1312
1313 /*
1314   set vnn map on a node
1315  */
1316 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1317                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1318 {
1319         int ret;
1320         TDB_DATA data;
1321         int32_t res;
1322         struct ctdb_vnn_map_wire *map;
1323         size_t len;
1324
1325         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1326         map = talloc_size(mem_ctx, len);
1327         CTDB_NO_MEMORY(ctdb, map);
1328
1329         map->generation = vnnmap->generation;
1330         map->size = vnnmap->size;
1331         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1332         
1333         data.dsize = len;
1334         data.dptr  = (uint8_t *)map;
1335
1336         ret = ctdb_control(ctdb, destnode, 0, 
1337                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1338                            NULL, NULL, &res, &timeout, NULL);
1339         if (ret != 0 || res != 0) {
1340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1341                 return -1;
1342         }
1343
1344         talloc_free(map);
1345
1346         return 0;
1347 }
1348
1349
1350 /*
1351   async send for pull database
1352  */
1353 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1354         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1355         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1356 {
1357         TDB_DATA indata;
1358         struct ctdb_control_pulldb *pull;
1359         struct ctdb_client_control_state *state;
1360
1361         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1362         CTDB_NO_MEMORY_NULL(ctdb, pull);
1363
1364         pull->db_id   = dbid;
1365         pull->lmaster = lmaster;
1366
1367         indata.dsize = sizeof(struct ctdb_control_pulldb);
1368         indata.dptr  = (unsigned char *)pull;
1369
1370         state = ctdb_control_send(ctdb, destnode, 0, 
1371                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1372                                   mem_ctx, &timeout, NULL);
1373         talloc_free(pull);
1374
1375         return state;
1376 }
1377
1378 /*
1379   async recv for pull database
1380  */
1381 int ctdb_ctrl_pulldb_recv(
1382         struct ctdb_context *ctdb, 
1383         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1384         TDB_DATA *outdata)
1385 {
1386         int ret;
1387         int32_t res;
1388
1389         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1390         if ( (ret != 0) || (res != 0) ){
1391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1392                 return -1;
1393         }
1394
1395         return 0;
1396 }
1397
1398 /*
1399   pull all keys and records for a specific database on a node
1400  */
1401 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1402                 uint32_t dbid, uint32_t lmaster, 
1403                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1404                 TDB_DATA *outdata)
1405 {
1406         struct ctdb_client_control_state *state;
1407
1408         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1409                                       timeout);
1410         
1411         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1412 }
1413
1414
1415 /*
1416   change dmaster for all keys in the database to the new value
1417  */
1418 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1419                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1420 {
1421         int ret;
1422         TDB_DATA indata;
1423         int32_t res;
1424
1425         indata.dsize = 2*sizeof(uint32_t);
1426         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1427
1428         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1429         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1430
1431         ret = ctdb_control(ctdb, destnode, 0, 
1432                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1433                            NULL, NULL, &res, &timeout, NULL);
1434         if (ret != 0 || res != 0) {
1435                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1436                 return -1;
1437         }
1438
1439         return 0;
1440 }
1441
1442 /*
1443   ping a node, return number of clients connected
1444  */
1445 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1446 {
1447         int ret;
1448         int32_t res;
1449
1450         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1451                            tdb_null, NULL, NULL, &res, NULL, NULL);
1452         if (ret != 0) {
1453                 return -1;
1454         }
1455         return res;
1456 }
1457
1458 /*
1459   find the real path to a ltdb 
1460  */
1461 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1462                    const char **path)
1463 {
1464         int ret;
1465         int32_t res;
1466         TDB_DATA data;
1467
1468         data.dptr = (uint8_t *)&dbid;
1469         data.dsize = sizeof(dbid);
1470
1471         ret = ctdb_control(ctdb, destnode, 0, 
1472                            CTDB_CONTROL_GETDBPATH, 0, data, 
1473                            mem_ctx, &data, &res, &timeout, NULL);
1474         if (ret != 0 || res != 0) {
1475                 return -1;
1476         }
1477
1478         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1479         if ((*path) == NULL) {
1480                 return -1;
1481         }
1482
1483         talloc_free(data.dptr);
1484
1485         return 0;
1486 }
1487
1488 /*
1489   find the name of a db 
1490  */
1491 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1492                    const char **name)
1493 {
1494         int ret;
1495         int32_t res;
1496         TDB_DATA data;
1497
1498         data.dptr = (uint8_t *)&dbid;
1499         data.dsize = sizeof(dbid);
1500
1501         ret = ctdb_control(ctdb, destnode, 0, 
1502                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1503                            mem_ctx, &data, &res, &timeout, NULL);
1504         if (ret != 0 || res != 0) {
1505                 return -1;
1506         }
1507
1508         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1509         if ((*name) == NULL) {
1510                 return -1;
1511         }
1512
1513         talloc_free(data.dptr);
1514
1515         return 0;
1516 }
1517
1518 /*
1519   get the health status of a db
1520  */
1521 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1522                           struct timeval timeout,
1523                           uint32_t destnode,
1524                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1525                           const char **reason)
1526 {
1527         int ret;
1528         int32_t res;
1529         TDB_DATA data;
1530
1531         data.dptr = (uint8_t *)&dbid;
1532         data.dsize = sizeof(dbid);
1533
1534         ret = ctdb_control(ctdb, destnode, 0,
1535                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1536                            mem_ctx, &data, &res, &timeout, NULL);
1537         if (ret != 0 || res != 0) {
1538                 return -1;
1539         }
1540
1541         if (data.dsize == 0) {
1542                 (*reason) = NULL;
1543                 return 0;
1544         }
1545
1546         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1547         if ((*reason) == NULL) {
1548                 return -1;
1549         }
1550
1551         talloc_free(data.dptr);
1552
1553         return 0;
1554 }
1555
1556 /*
1557   create a database
1558  */
1559 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1560                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1561 {
1562         int ret;
1563         int32_t res;
1564         TDB_DATA data;
1565
1566         data.dptr = discard_const(name);
1567         data.dsize = strlen(name)+1;
1568
1569         ret = ctdb_control(ctdb, destnode, 0, 
1570                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1571                            0, data, 
1572                            mem_ctx, &data, &res, &timeout, NULL);
1573
1574         if (ret != 0 || res != 0) {
1575                 return -1;
1576         }
1577
1578         return 0;
1579 }
1580
1581 /*
1582   get debug level on a node
1583  */
1584 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1585 {
1586         int ret;
1587         int32_t res;
1588         TDB_DATA data;
1589
1590         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1591                            ctdb, &data, &res, NULL, NULL);
1592         if (ret != 0 || res != 0) {
1593                 return -1;
1594         }
1595         if (data.dsize != sizeof(int32_t)) {
1596                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1597                          (unsigned)data.dsize));
1598                 return -1;
1599         }
1600         *level = *(int32_t *)data.dptr;
1601         talloc_free(data.dptr);
1602         return 0;
1603 }
1604
1605 /*
1606   set debug level on a node
1607  */
1608 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1609 {
1610         int ret;
1611         int32_t res;
1612         TDB_DATA data;
1613
1614         data.dptr = (uint8_t *)&level;
1615         data.dsize = sizeof(level);
1616
1617         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1618                            NULL, NULL, &res, NULL, NULL);
1619         if (ret != 0 || res != 0) {
1620                 return -1;
1621         }
1622         return 0;
1623 }
1624
1625
1626 /*
1627   get a list of connected nodes
1628  */
1629 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1630                                 struct timeval timeout,
1631                                 TALLOC_CTX *mem_ctx,
1632                                 uint32_t *num_nodes)
1633 {
1634         struct ctdb_node_map *map=NULL;
1635         int ret, i;
1636         uint32_t *nodes;
1637
1638         *num_nodes = 0;
1639
1640         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1641         if (ret != 0) {
1642                 return NULL;
1643         }
1644
1645         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1646         if (nodes == NULL) {
1647                 return NULL;
1648         }
1649
1650         for (i=0;i<map->num;i++) {
1651                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1652                         nodes[*num_nodes] = map->nodes[i].pnn;
1653                         (*num_nodes)++;
1654                 }
1655         }
1656
1657         return nodes;
1658 }
1659
1660
1661 /*
1662   reset remote status
1663  */
1664 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1665 {
1666         int ret;
1667         int32_t res;
1668
1669         ret = ctdb_control(ctdb, destnode, 0, 
1670                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1671                            NULL, NULL, &res, NULL, NULL);
1672         if (ret != 0 || res != 0) {
1673                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1674                 return -1;
1675         }
1676         return 0;
1677 }
1678
1679 /*
1680   this is the dummy null procedure that all databases support
1681 */
1682 static int ctdb_null_func(struct ctdb_call_info *call)
1683 {
1684         return 0;
1685 }
1686
1687 /*
1688   this is a plain fetch procedure that all databases support
1689 */
1690 static int ctdb_fetch_func(struct ctdb_call_info *call)
1691 {
1692         call->reply_data = &call->record_data;
1693         return 0;
1694 }
1695
1696 /*
1697   attach to a specific database - client call
1698 */
1699 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1700                                     struct timeval timeout,
1701                                     const char *name,
1702                                     bool persistent,
1703                                     uint32_t tdb_flags)
1704 {
1705         struct ctdb_db_context *ctdb_db;
1706         TDB_DATA data;
1707         int ret;
1708         int32_t res;
1709
1710         ctdb_db = ctdb_db_handle(ctdb, name);
1711         if (ctdb_db) {
1712                 return ctdb_db;
1713         }
1714
1715         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1716         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1717
1718         ctdb_db->ctdb = ctdb;
1719         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1720         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1721
1722         data.dptr = discard_const(name);
1723         data.dsize = strlen(name)+1;
1724
1725         /* tell ctdb daemon to attach */
1726         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1727                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1728                            0, data, ctdb_db, &data, &res, NULL, NULL);
1729         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1730                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1731                 talloc_free(ctdb_db);
1732                 return NULL;
1733         }
1734         
1735         ctdb_db->db_id = *(uint32_t *)data.dptr;
1736         talloc_free(data.dptr);
1737
1738         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1739         if (ret != 0) {
1740                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1741                 talloc_free(ctdb_db);
1742                 return NULL;
1743         }
1744
1745         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1746         if (ctdb->valgrinding) {
1747                 tdb_flags |= TDB_NOMMAP;
1748         }
1749         tdb_flags |= TDB_DISALLOW_NESTING;
1750
1751         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1752         if (ctdb_db->ltdb == NULL) {
1753                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1754                 talloc_free(ctdb_db);
1755                 return NULL;
1756         }
1757
1758         ctdb_db->persistent = persistent;
1759
1760         DLIST_ADD(ctdb->db_list, ctdb_db);
1761
1762         /* add well known functions */
1763         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1764         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1765
1766         return ctdb_db;
1767 }
1768
1769
1770 /*
1771   setup a call for a database
1772  */
1773 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1774 {
1775         struct ctdb_registered_call *call;
1776
1777 #if 0
1778         TDB_DATA data;
1779         int32_t status;
1780         struct ctdb_control_set_call c;
1781         int ret;
1782
1783         /* this is no longer valid with the separate daemon architecture */
1784         c.db_id = ctdb_db->db_id;
1785         c.fn    = fn;
1786         c.id    = id;
1787
1788         data.dptr = (uint8_t *)&c;
1789         data.dsize = sizeof(c);
1790
1791         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1792                            data, NULL, NULL, &status, NULL, NULL);
1793         if (ret != 0 || status != 0) {
1794                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1795                 return -1;
1796         }
1797 #endif
1798
1799         /* also register locally */
1800         call = talloc(ctdb_db, struct ctdb_registered_call);
1801         call->fn = fn;
1802         call->id = id;
1803
1804         DLIST_ADD(ctdb_db->calls, call);        
1805         return 0;
1806 }
1807
1808
1809 struct traverse_state {
1810         bool done;
1811         uint32_t count;
1812         ctdb_traverse_func fn;
1813         void *private_data;
1814 };
1815
1816 /*
1817   called on each key during a ctdb_traverse
1818  */
1819 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1820 {
1821         struct traverse_state *state = (struct traverse_state *)p;
1822         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1823         TDB_DATA key;
1824
1825         if (data.dsize < sizeof(uint32_t) ||
1826             d->length != data.dsize) {
1827                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1828                 state->done = True;
1829                 return;
1830         }
1831
1832         key.dsize = d->keylen;
1833         key.dptr  = &d->data[0];
1834         data.dsize = d->datalen;
1835         data.dptr = &d->data[d->keylen];
1836
1837         if (key.dsize == 0 && data.dsize == 0) {
1838                 /* end of traverse */
1839                 state->done = True;
1840                 return;
1841         }
1842
1843         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1844                 /* empty records are deleted records in ctdb */
1845                 return;
1846         }
1847
1848         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1849                 state->done = True;
1850         }
1851
1852         state->count++;
1853 }
1854
1855
1856 /*
1857   start a cluster wide traverse, calling the supplied fn on each record
1858   return the number of records traversed, or -1 on error
1859  */
1860 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1861 {
1862         TDB_DATA data;
1863         struct ctdb_traverse_start t;
1864         int32_t status;
1865         int ret;
1866         uint64_t srvid = (getpid() | 0xFLL<<60);
1867         struct traverse_state state;
1868
1869         state.done = False;
1870         state.count = 0;
1871         state.private_data = private_data;
1872         state.fn = fn;
1873
1874         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1875         if (ret != 0) {
1876                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1877                 return -1;
1878         }
1879
1880         t.db_id = ctdb_db->db_id;
1881         t.srvid = srvid;
1882         t.reqid = 0;
1883
1884         data.dptr = (uint8_t *)&t;
1885         data.dsize = sizeof(t);
1886
1887         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1888                            data, NULL, NULL, &status, NULL, NULL);
1889         if (ret != 0 || status != 0) {
1890                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1891                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1892                 return -1;
1893         }
1894
1895         while (!state.done) {
1896                 event_loop_once(ctdb_db->ctdb->ev);
1897         }
1898
1899         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1900         if (ret != 0) {
1901                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1902                 return -1;
1903         }
1904
1905         return state.count;
1906 }
1907
1908 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1909 /*
1910   called on each key during a catdb
1911  */
1912 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1913 {
1914         int i;
1915         FILE *f = (FILE *)p;
1916         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1917
1918         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1919         for (i=0;i<key.dsize;i++) {
1920                 if (ISASCII(key.dptr[i])) {
1921                         fprintf(f, "%c", key.dptr[i]);
1922                 } else {
1923                         fprintf(f, "\\%02X", key.dptr[i]);
1924                 }
1925         }
1926         fprintf(f, "\"\n");
1927
1928         fprintf(f, "dmaster: %u\n", h->dmaster);
1929         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1930
1931         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1932         for (i=sizeof(*h);i<data.dsize;i++) {
1933                 if (ISASCII(data.dptr[i])) {
1934                         fprintf(f, "%c", data.dptr[i]);
1935                 } else {
1936                         fprintf(f, "\\%02X", data.dptr[i]);
1937                 }
1938         }
1939         fprintf(f, "\"\n");
1940
1941         fprintf(f, "\n");
1942
1943         return 0;
1944 }
1945
1946 /*
1947   convenience function to list all keys to stdout
1948  */
1949 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1950 {
1951         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1952 }
1953
1954 /*
1955   get the pid of a ctdb daemon
1956  */
1957 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1958 {
1959         int ret;
1960         int32_t res;
1961
1962         ret = ctdb_control(ctdb, destnode, 0, 
1963                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1964                            NULL, NULL, &res, &timeout, NULL);
1965         if (ret != 0) {
1966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1967                 return -1;
1968         }
1969
1970         *pid = res;
1971
1972         return 0;
1973 }
1974
1975
1976 /*
1977   async freeze send control
1978  */
1979 struct ctdb_client_control_state *
1980 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1981 {
1982         return ctdb_control_send(ctdb, destnode, priority, 
1983                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1984                            mem_ctx, &timeout, NULL);
1985 }
1986
1987 /* 
1988    async freeze recv control
1989 */
1990 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1991 {
1992         int ret;
1993         int32_t res;
1994
1995         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1996         if ( (ret != 0) || (res != 0) ){
1997                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1998                 return -1;
1999         }
2000
2001         return 0;
2002 }
2003
2004 /*
2005   freeze databases of a certain priority
2006  */
2007 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2008 {
2009         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2010         struct ctdb_client_control_state *state;
2011         int ret;
2012
2013         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2014         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2015         talloc_free(tmp_ctx);
2016
2017         return ret;
2018 }
2019
2020 /* Freeze all databases */
2021 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2022 {
2023         int i;
2024
2025         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2026                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2027                         return -1;
2028                 }
2029         }
2030         return 0;
2031 }
2032
2033 /*
2034   thaw databases of a certain priority
2035  */
2036 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2037 {
2038         int ret;
2039         int32_t res;
2040
2041         ret = ctdb_control(ctdb, destnode, priority, 
2042                            CTDB_CONTROL_THAW, 0, tdb_null, 
2043                            NULL, NULL, &res, &timeout, NULL);
2044         if (ret != 0 || res != 0) {
2045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2046                 return -1;
2047         }
2048
2049         return 0;
2050 }
2051
2052 /* thaw all databases */
2053 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2054 {
2055         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2056 }
2057
2058 /*
2059   get pnn of a node, or -1
2060  */
2061 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2062 {
2063         int ret;
2064         int32_t res;
2065
2066         ret = ctdb_control(ctdb, destnode, 0, 
2067                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2068                            NULL, NULL, &res, &timeout, NULL);
2069         if (ret != 0) {
2070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2071                 return -1;
2072         }
2073
2074         return res;
2075 }
2076
2077 /*
2078   get the monitoring mode of a remote node
2079  */
2080 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2081 {
2082         int ret;
2083         int32_t res;
2084
2085         ret = ctdb_control(ctdb, destnode, 0, 
2086                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2087                            NULL, NULL, &res, &timeout, NULL);
2088         if (ret != 0) {
2089                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2090                 return -1;
2091         }
2092
2093         *monmode = res;
2094
2095         return 0;
2096 }
2097
2098
2099 /*
2100  set the monitoring mode of a remote node to active
2101  */
2102 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2103 {
2104         int ret;
2105         
2106
2107         ret = ctdb_control(ctdb, destnode, 0, 
2108                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2109                            NULL, NULL,NULL, &timeout, NULL);
2110         if (ret != 0) {
2111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2112                 return -1;
2113         }
2114
2115         
2116
2117         return 0;
2118 }
2119
2120 /*
2121   set the monitoring mode of a remote node to disable
2122  */
2123 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2124 {
2125         int ret;
2126         
2127
2128         ret = ctdb_control(ctdb, destnode, 0, 
2129                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2130                            NULL, NULL, NULL, &timeout, NULL);
2131         if (ret != 0) {
2132                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2133                 return -1;
2134         }
2135
2136         
2137
2138         return 0;
2139 }
2140
2141
2142
2143 /* 
2144   sent to a node to make it take over an ip address
2145 */
2146 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2147                           uint32_t destnode, struct ctdb_public_ip *ip)
2148 {
2149         TDB_DATA data;
2150         struct ctdb_public_ipv4 ipv4;
2151         int ret;
2152         int32_t res;
2153
2154         if (ip->addr.sa.sa_family == AF_INET) {
2155                 ipv4.pnn = ip->pnn;
2156                 ipv4.sin = ip->addr.ip;
2157
2158                 data.dsize = sizeof(ipv4);
2159                 data.dptr  = (uint8_t *)&ipv4;
2160
2161                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2162                            NULL, &res, &timeout, NULL);
2163         } else {
2164                 data.dsize = sizeof(*ip);
2165                 data.dptr  = (uint8_t *)ip;
2166
2167                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2168                            NULL, &res, &timeout, NULL);
2169         }
2170
2171         if (ret != 0 || res != 0) {
2172                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2173                 return -1;
2174         }
2175
2176         return 0;       
2177 }
2178
2179
2180 /* 
2181   sent to a node to make it release an ip address
2182 */
2183 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2184                          uint32_t destnode, struct ctdb_public_ip *ip)
2185 {
2186         TDB_DATA data;
2187         struct ctdb_public_ipv4 ipv4;
2188         int ret;
2189         int32_t res;
2190
2191         if (ip->addr.sa.sa_family == AF_INET) {
2192                 ipv4.pnn = ip->pnn;
2193                 ipv4.sin = ip->addr.ip;
2194
2195                 data.dsize = sizeof(ipv4);
2196                 data.dptr  = (uint8_t *)&ipv4;
2197
2198                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2199                                    NULL, &res, &timeout, NULL);
2200         } else {
2201                 data.dsize = sizeof(*ip);
2202                 data.dptr  = (uint8_t *)ip;
2203
2204                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2205                                    NULL, &res, &timeout, NULL);
2206         }
2207
2208         if (ret != 0 || res != 0) {
2209                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2210                 return -1;
2211         }
2212
2213         return 0;       
2214 }
2215
2216
2217 /*
2218   get a tunable
2219  */
2220 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2221                           struct timeval timeout, 
2222                           uint32_t destnode,
2223                           const char *name, uint32_t *value)
2224 {
2225         struct ctdb_control_get_tunable *t;
2226         TDB_DATA data, outdata;
2227         int32_t res;
2228         int ret;
2229
2230         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2231         data.dptr  = talloc_size(ctdb, data.dsize);
2232         CTDB_NO_MEMORY(ctdb, data.dptr);
2233
2234         t = (struct ctdb_control_get_tunable *)data.dptr;
2235         t->length = strlen(name)+1;
2236         memcpy(t->name, name, t->length);
2237
2238         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2239                            &outdata, &res, &timeout, NULL);
2240         talloc_free(data.dptr);
2241         if (ret != 0 || res != 0) {
2242                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2243                 return -1;
2244         }
2245
2246         if (outdata.dsize != sizeof(uint32_t)) {
2247                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2248                 talloc_free(outdata.dptr);
2249                 return -1;
2250         }
2251         
2252         *value = *(uint32_t *)outdata.dptr;
2253         talloc_free(outdata.dptr);
2254
2255         return 0;
2256 }
2257
2258 /*
2259   set a tunable
2260  */
2261 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2262                           struct timeval timeout, 
2263                           uint32_t destnode,
2264                           const char *name, uint32_t value)
2265 {
2266         struct ctdb_control_set_tunable *t;
2267         TDB_DATA data;
2268         int32_t res;
2269         int ret;
2270
2271         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2272         data.dptr  = talloc_size(ctdb, data.dsize);
2273         CTDB_NO_MEMORY(ctdb, data.dptr);
2274
2275         t = (struct ctdb_control_set_tunable *)data.dptr;
2276         t->length = strlen(name)+1;
2277         memcpy(t->name, name, t->length);
2278         t->value = value;
2279
2280         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2281                            NULL, &res, &timeout, NULL);
2282         talloc_free(data.dptr);
2283         if (ret != 0 || res != 0) {
2284                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2285                 return -1;
2286         }
2287
2288         return 0;
2289 }
2290
2291 /*
2292   list tunables
2293  */
2294 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2295                             struct timeval timeout, 
2296                             uint32_t destnode,
2297                             TALLOC_CTX *mem_ctx,
2298                             const char ***list, uint32_t *count)
2299 {
2300         TDB_DATA outdata;
2301         int32_t res;
2302         int ret;
2303         struct ctdb_control_list_tunable *t;
2304         char *p, *s, *ptr;
2305
2306         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2307                            mem_ctx, &outdata, &res, &timeout, NULL);
2308         if (ret != 0 || res != 0) {
2309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2310                 return -1;
2311         }
2312
2313         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2314         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2315             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2316                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2317                 talloc_free(outdata.dptr);
2318                 return -1;              
2319         }
2320         
2321         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2322         CTDB_NO_MEMORY(ctdb, p);
2323
2324         talloc_free(outdata.dptr);
2325         
2326         (*list) = NULL;
2327         (*count) = 0;
2328
2329         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2330                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2331                 CTDB_NO_MEMORY(ctdb, *list);
2332                 (*list)[*count] = talloc_strdup(*list, s);
2333                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2334                 (*count)++;
2335         }
2336
2337         talloc_free(p);
2338
2339         return 0;
2340 }
2341
2342
2343 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2344                                    struct timeval timeout, uint32_t destnode,
2345                                    TALLOC_CTX *mem_ctx,
2346                                    uint32_t flags,
2347                                    struct ctdb_all_public_ips **ips)
2348 {
2349         int ret;
2350         TDB_DATA outdata;
2351         int32_t res;
2352
2353         ret = ctdb_control(ctdb, destnode, 0, 
2354                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2355                            mem_ctx, &outdata, &res, &timeout, NULL);
2356         if (ret == 0 && res == -1) {
2357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2358                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2359         }
2360         if (ret != 0 || res != 0) {
2361           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2362                 return -1;
2363         }
2364
2365         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2366         talloc_free(outdata.dptr);
2367                     
2368         return 0;
2369 }
2370
2371 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2372                              struct timeval timeout, uint32_t destnode,
2373                              TALLOC_CTX *mem_ctx,
2374                              struct ctdb_all_public_ips **ips)
2375 {
2376         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2377                                               destnode, mem_ctx,
2378                                               0, ips);
2379 }
2380
2381 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2382                         struct timeval timeout, uint32_t destnode, 
2383                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2384 {
2385         int ret, i, len;
2386         TDB_DATA outdata;
2387         int32_t res;
2388         struct ctdb_all_public_ipsv4 *ipsv4;
2389
2390         ret = ctdb_control(ctdb, destnode, 0, 
2391                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2392                            mem_ctx, &outdata, &res, &timeout, NULL);
2393         if (ret != 0 || res != 0) {
2394                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2395                 return -1;
2396         }
2397
2398         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2399         len = offsetof(struct ctdb_all_public_ips, ips) +
2400                 ipsv4->num*sizeof(struct ctdb_public_ip);
2401         *ips = talloc_zero_size(mem_ctx, len);
2402         CTDB_NO_MEMORY(ctdb, *ips);
2403         (*ips)->num = ipsv4->num;
2404         for (i=0; i<ipsv4->num; i++) {
2405                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2406                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2407         }
2408
2409         talloc_free(outdata.dptr);
2410                     
2411         return 0;
2412 }
2413
2414 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2415                                  struct timeval timeout, uint32_t destnode,
2416                                  TALLOC_CTX *mem_ctx,
2417                                  const ctdb_sock_addr *addr,
2418                                  struct ctdb_control_public_ip_info **_info)
2419 {
2420         int ret;
2421         TDB_DATA indata;
2422         TDB_DATA outdata;
2423         int32_t res;
2424         struct ctdb_control_public_ip_info *info;
2425         uint32_t len;
2426         uint32_t i;
2427
2428         indata.dptr = discard_const_p(uint8_t, addr);
2429         indata.dsize = sizeof(*addr);
2430
2431         ret = ctdb_control(ctdb, destnode, 0,
2432                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2433                            mem_ctx, &outdata, &res, &timeout, NULL);
2434         if (ret != 0 || res != 0) {
2435                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2436                                 "failed ret:%d res:%d\n",
2437                                 ret, res));
2438                 return -1;
2439         }
2440
2441         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2442         if (len > outdata.dsize) {
2443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2444                                 "returned invalid data with size %u > %u\n",
2445                                 (unsigned int)outdata.dsize,
2446                                 (unsigned int)len));
2447                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2448                 return -1;
2449         }
2450
2451         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2452         len += info->num*sizeof(struct ctdb_control_iface_info);
2453
2454         if (len > outdata.dsize) {
2455                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2456                                 "returned invalid data with size %u > %u\n",
2457                                 (unsigned int)outdata.dsize,
2458                                 (unsigned int)len));
2459                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2460                 return -1;
2461         }
2462
2463         /* make sure we null terminate the returned strings */
2464         for (i=0; i < info->num; i++) {
2465                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2466         }
2467
2468         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2469                                                                 outdata.dptr,
2470                                                                 outdata.dsize);
2471         talloc_free(outdata.dptr);
2472         if (*_info == NULL) {
2473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2474                                 "talloc_memdup size %u failed\n",
2475                                 (unsigned int)outdata.dsize));
2476                 return -1;
2477         }
2478
2479         return 0;
2480 }
2481
2482 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2483                          struct timeval timeout, uint32_t destnode,
2484                          TALLOC_CTX *mem_ctx,
2485                          struct ctdb_control_get_ifaces **_ifaces)
2486 {
2487         int ret;
2488         TDB_DATA outdata;
2489         int32_t res;
2490         struct ctdb_control_get_ifaces *ifaces;
2491         uint32_t len;
2492         uint32_t i;
2493
2494         ret = ctdb_control(ctdb, destnode, 0,
2495                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2496                            mem_ctx, &outdata, &res, &timeout, NULL);
2497         if (ret != 0 || res != 0) {
2498                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2499                                 "failed ret:%d res:%d\n",
2500                                 ret, res));
2501                 return -1;
2502         }
2503
2504         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2505         if (len > outdata.dsize) {
2506                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2507                                 "returned invalid data with size %u > %u\n",
2508                                 (unsigned int)outdata.dsize,
2509                                 (unsigned int)len));
2510                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2511                 return -1;
2512         }
2513
2514         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2515         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2516
2517         if (len > outdata.dsize) {
2518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2519                                 "returned invalid data with size %u > %u\n",
2520                                 (unsigned int)outdata.dsize,
2521                                 (unsigned int)len));
2522                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2523                 return -1;
2524         }
2525
2526         /* make sure we null terminate the returned strings */
2527         for (i=0; i < ifaces->num; i++) {
2528                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2529         }
2530
2531         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2532                                                                   outdata.dptr,
2533                                                                   outdata.dsize);
2534         talloc_free(outdata.dptr);
2535         if (*_ifaces == NULL) {
2536                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2537                                 "talloc_memdup size %u failed\n",
2538                                 (unsigned int)outdata.dsize));
2539                 return -1;
2540         }
2541
2542         return 0;
2543 }
2544
2545 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2546                              struct timeval timeout, uint32_t destnode,
2547                              TALLOC_CTX *mem_ctx,
2548                              const struct ctdb_control_iface_info *info)
2549 {
2550         int ret;
2551         TDB_DATA indata;
2552         int32_t res;
2553
2554         indata.dptr = discard_const_p(uint8_t, info);
2555         indata.dsize = sizeof(*info);
2556
2557         ret = ctdb_control(ctdb, destnode, 0,
2558                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2559                            mem_ctx, NULL, &res, &timeout, NULL);
2560         if (ret != 0 || res != 0) {
2561                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2562                                 "failed ret:%d res:%d\n",
2563                                 ret, res));
2564                 return -1;
2565         }
2566
2567         return 0;
2568 }
2569
2570 /*
2571   set/clear the permanent disabled bit on a remote node
2572  */
2573 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2574                        uint32_t set, uint32_t clear)
2575 {
2576         int ret;
2577         TDB_DATA data;
2578         struct ctdb_node_map *nodemap=NULL;
2579         struct ctdb_node_flag_change c;
2580         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2581         uint32_t recmaster;
2582         uint32_t *nodes;
2583
2584
2585         /* find the recovery master */
2586         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2587         if (ret != 0) {
2588                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2589                 talloc_free(tmp_ctx);
2590                 return ret;
2591         }
2592
2593
2594         /* read the node flags from the recmaster */
2595         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2596         if (ret != 0) {
2597                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2598                 talloc_free(tmp_ctx);
2599                 return -1;
2600         }
2601         if (destnode >= nodemap->num) {
2602                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2603                 talloc_free(tmp_ctx);
2604                 return -1;
2605         }
2606
2607         c.pnn       = destnode;
2608         c.old_flags = nodemap->nodes[destnode].flags;
2609         c.new_flags = c.old_flags;
2610         c.new_flags |= set;
2611         c.new_flags &= ~clear;
2612
2613         data.dsize = sizeof(c);
2614         data.dptr = (unsigned char *)&c;
2615
2616         /* send the flags update to all connected nodes */
2617         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2618
2619         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2620                                         nodes, 0,
2621                                         timeout, false, data,
2622                                         NULL, NULL,
2623                                         NULL) != 0) {
2624                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2625
2626                 talloc_free(tmp_ctx);
2627                 return -1;
2628         }
2629
2630         talloc_free(tmp_ctx);
2631         return 0;
2632 }
2633
2634
2635 /*
2636   get all tunables
2637  */
2638 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2639                                struct timeval timeout, 
2640                                uint32_t destnode,
2641                                struct ctdb_tunable *tunables)
2642 {
2643         TDB_DATA outdata;
2644         int ret;
2645         int32_t res;
2646
2647         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2648                            &outdata, &res, &timeout, NULL);
2649         if (ret != 0 || res != 0) {
2650                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2651                 return -1;
2652         }
2653
2654         if (outdata.dsize != sizeof(*tunables)) {
2655                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2656                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2657                 return -1;              
2658         }
2659
2660         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2661         talloc_free(outdata.dptr);
2662         return 0;
2663 }
2664
2665 /*
2666   add a public address to a node
2667  */
2668 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2669                       struct timeval timeout, 
2670                       uint32_t destnode,
2671                       struct ctdb_control_ip_iface *pub)
2672 {
2673         TDB_DATA data;
2674         int32_t res;
2675         int ret;
2676
2677         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2678         data.dptr  = (unsigned char *)pub;
2679
2680         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2681                            NULL, &res, &timeout, NULL);
2682         if (ret != 0 || res != 0) {
2683                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2684                 return -1;
2685         }
2686
2687         return 0;
2688 }
2689
2690 /*
2691   delete a public address from a node
2692  */
2693 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2694                       struct timeval timeout, 
2695                       uint32_t destnode,
2696                       struct ctdb_control_ip_iface *pub)
2697 {
2698         TDB_DATA data;
2699         int32_t res;
2700         int ret;
2701
2702         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2703         data.dptr  = (unsigned char *)pub;
2704
2705         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2706                            NULL, &res, &timeout, NULL);
2707         if (ret != 0 || res != 0) {
2708                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2709                 return -1;
2710         }
2711
2712         return 0;
2713 }
2714
2715 /*
2716   kill a tcp connection
2717  */
2718 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2719                       struct timeval timeout, 
2720                       uint32_t destnode,
2721                       struct ctdb_control_killtcp *killtcp)
2722 {
2723         TDB_DATA data;
2724         int32_t res;
2725         int ret;
2726
2727         data.dsize = sizeof(struct ctdb_control_killtcp);
2728         data.dptr  = (unsigned char *)killtcp;
2729
2730         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2731                            NULL, &res, &timeout, NULL);
2732         if (ret != 0 || res != 0) {
2733                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2734                 return -1;
2735         }
2736
2737         return 0;
2738 }
2739
2740 /*
2741   send a gratious arp
2742  */
2743 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2744                       struct timeval timeout, 
2745                       uint32_t destnode,
2746                       ctdb_sock_addr *addr,
2747                       const char *ifname)
2748 {
2749         TDB_DATA data;
2750         int32_t res;
2751         int ret, len;
2752         struct ctdb_control_gratious_arp *gratious_arp;
2753         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2754
2755
2756         len = strlen(ifname)+1;
2757         gratious_arp = talloc_size(tmp_ctx, 
2758                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2759         CTDB_NO_MEMORY(ctdb, gratious_arp);
2760
2761         gratious_arp->addr = *addr;
2762         gratious_arp->len = len;
2763         memcpy(&gratious_arp->iface[0], ifname, len);
2764
2765
2766         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2767         data.dptr  = (unsigned char *)gratious_arp;
2768
2769         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2770                            NULL, &res, &timeout, NULL);
2771         if (ret != 0 || res != 0) {
2772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2773                 talloc_free(tmp_ctx);
2774                 return -1;
2775         }
2776
2777         talloc_free(tmp_ctx);
2778         return 0;
2779 }
2780
2781 /*
2782   get a list of all tcp tickles that a node knows about for a particular vnn
2783  */
2784 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2785                               struct timeval timeout, uint32_t destnode, 
2786                               TALLOC_CTX *mem_ctx, 
2787                               ctdb_sock_addr *addr,
2788                               struct ctdb_control_tcp_tickle_list **list)
2789 {
2790         int ret;
2791         TDB_DATA data, outdata;
2792         int32_t status;
2793
2794         data.dptr = (uint8_t*)addr;
2795         data.dsize = sizeof(ctdb_sock_addr);
2796
2797         ret = ctdb_control(ctdb, destnode, 0, 
2798                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2799                            mem_ctx, &outdata, &status, NULL, NULL);
2800         if (ret != 0 || status != 0) {
2801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2802                 return -1;
2803         }
2804
2805         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2806
2807         return status;
2808 }
2809
2810 /*
2811   register a server id
2812  */
2813 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2814                       struct timeval timeout, 
2815                       struct ctdb_server_id *id)
2816 {
2817         TDB_DATA data;
2818         int32_t res;
2819         int ret;
2820
2821         data.dsize = sizeof(struct ctdb_server_id);
2822         data.dptr  = (unsigned char *)id;
2823
2824         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2825                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2826                         0, data, NULL,
2827                         NULL, &res, &timeout, NULL);
2828         if (ret != 0 || res != 0) {
2829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2830                 return -1;
2831         }
2832
2833         return 0;
2834 }
2835
2836 /*
2837   unregister a server id
2838  */
2839 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2840                       struct timeval timeout, 
2841                       struct ctdb_server_id *id)
2842 {
2843         TDB_DATA data;
2844         int32_t res;
2845         int ret;
2846
2847         data.dsize = sizeof(struct ctdb_server_id);
2848         data.dptr  = (unsigned char *)id;
2849
2850         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2851                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2852                         0, data, NULL,
2853                         NULL, &res, &timeout, NULL);
2854         if (ret != 0 || res != 0) {
2855                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2856                 return -1;
2857         }
2858
2859         return 0;
2860 }
2861
2862
2863 /*
2864   check if a server id exists
2865
2866   if a server id does exist, return *status == 1, otherwise *status == 0
2867  */
2868 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2869                       struct timeval timeout, 
2870                       uint32_t destnode,
2871                       struct ctdb_server_id *id,
2872                       uint32_t *status)
2873 {
2874         TDB_DATA data;
2875         int32_t res;
2876         int ret;
2877
2878         data.dsize = sizeof(struct ctdb_server_id);
2879         data.dptr  = (unsigned char *)id;
2880
2881         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2882                         0, data, NULL,
2883                         NULL, &res, &timeout, NULL);
2884         if (ret != 0) {
2885                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2886                 return -1;
2887         }
2888
2889         if (res) {
2890                 *status = 1;
2891         } else {
2892                 *status = 0;
2893         }
2894
2895         return 0;
2896 }
2897
2898 /*
2899    get the list of server ids that are registered on a node
2900 */
2901 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2902                 TALLOC_CTX *mem_ctx,
2903                 struct timeval timeout, uint32_t destnode, 
2904                 struct ctdb_server_id_list **svid_list)
2905 {
2906         int ret;
2907         TDB_DATA outdata;
2908         int32_t res;
2909
2910         ret = ctdb_control(ctdb, destnode, 0, 
2911                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2912                            mem_ctx, &outdata, &res, &timeout, NULL);
2913         if (ret != 0 || res != 0) {
2914                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2915                 return -1;
2916         }
2917
2918         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2919                     
2920         return 0;
2921 }
2922
2923 /*
2924   initialise the ctdb daemon for client applications
2925
2926   NOTE: In current code the daemon does not fork. This is for testing purposes only
2927   and to simplify the code.
2928 */
2929 struct ctdb_context *ctdb_init(struct event_context *ev)
2930 {
2931         int ret;
2932         struct ctdb_context *ctdb;
2933
2934         ctdb = talloc_zero(ev, struct ctdb_context);
2935         if (ctdb == NULL) {
2936                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2937                 return NULL;
2938         }
2939         ctdb->ev  = ev;
2940         ctdb->idr = idr_init(ctdb);
2941         /* Wrap early to exercise code. */
2942         ctdb->lastid = INT_MAX-200;
2943         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2944
2945         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2946         if (ret != 0) {
2947                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2948                 talloc_free(ctdb);
2949                 return NULL;
2950         }
2951
2952         ctdb->statistics.statistics_start_time = timeval_current();
2953
2954         return ctdb;
2955 }
2956
2957
2958 /*
2959   set some ctdb flags
2960 */
2961 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2962 {
2963         ctdb->flags |= flags;
2964 }
2965
2966 /*
2967   setup the local socket name
2968 */
2969 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2970 {
2971         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2972         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2973
2974         return 0;
2975 }
2976
2977 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2978 {
2979         return ctdb->daemon.name;
2980 }
2981
2982 /*
2983   return the pnn of this node
2984 */
2985 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2986 {
2987         return ctdb->pnn;
2988 }
2989
2990
2991 /*
2992   get the uptime of a remote node
2993  */
2994 struct ctdb_client_control_state *
2995 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2996 {
2997         return ctdb_control_send(ctdb, destnode, 0, 
2998                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2999                            mem_ctx, &timeout, NULL);
3000 }
3001
3002 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3003 {
3004         int ret;
3005         int32_t res;
3006         TDB_DATA outdata;
3007
3008         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3009         if (ret != 0 || res != 0) {
3010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3011                 return -1;
3012         }
3013
3014         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3015
3016         return 0;
3017 }
3018
3019 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3020 {
3021         struct ctdb_client_control_state *state;
3022
3023         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3024         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3025 }
3026
3027 /*
3028   send a control to execute the "recovered" event script on a node
3029  */
3030 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3031 {
3032         int ret;
3033         int32_t status;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, 
3036                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3037                            NULL, NULL, &status, &timeout, NULL);
3038         if (ret != 0 || status != 0) {
3039                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3040                 return -1;
3041         }
3042
3043         return 0;
3044 }
3045
3046 /* 
3047   callback for the async helpers used when sending the same control
3048   to multiple nodes in parallell.
3049 */
3050 static void async_callback(struct ctdb_client_control_state *state)
3051 {
3052         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3053         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3054         int ret;
3055         TDB_DATA outdata;
3056         int32_t res;
3057         uint32_t destnode = state->c->hdr.destnode;
3058
3059         /* one more node has responded with recmode data */
3060         data->count--;
3061
3062         /* if we failed to push the db, then return an error and let
3063            the main loop try again.
3064         */
3065         if (state->state != CTDB_CONTROL_DONE) {
3066                 if ( !data->dont_log_errors) {
3067                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3068                 }
3069                 data->fail_count++;
3070                 if (data->fail_callback) {
3071                         data->fail_callback(ctdb, destnode, res, outdata,
3072                                         data->callback_data);
3073                 }
3074                 return;
3075         }
3076         
3077         state->async.fn = NULL;
3078
3079         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3080         if ((ret != 0) || (res != 0)) {
3081                 if ( !data->dont_log_errors) {
3082                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3083                 }
3084                 data->fail_count++;
3085                 if (data->fail_callback) {
3086                         data->fail_callback(ctdb, destnode, res, outdata,
3087                                         data->callback_data);
3088                 }
3089         }
3090         if ((ret == 0) && (data->callback != NULL)) {
3091                 data->callback(ctdb, destnode, res, outdata,
3092                                         data->callback_data);
3093         }
3094 }
3095
3096
3097 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3098 {
3099         /* set up the callback functions */
3100         state->async.fn = async_callback;
3101         state->async.private_data = data;
3102         
3103         /* one more control to wait for to complete */
3104         data->count++;
3105 }
3106
3107
3108 /* wait for up to the maximum number of seconds allowed
3109    or until all nodes we expect a response from has replied
3110 */
3111 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3112 {
3113         while (data->count > 0) {
3114                 event_loop_once(ctdb->ev);
3115         }
3116         if (data->fail_count != 0) {
3117                 if (!data->dont_log_errors) {
3118                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3119                                  data->fail_count));
3120                 }
3121                 return -1;
3122         }
3123         return 0;
3124 }
3125
3126
3127 /* 
3128    perform a simple control on the listed nodes
3129    The control cannot return data
3130  */
3131 int ctdb_client_async_control(struct ctdb_context *ctdb,
3132                                 enum ctdb_controls opcode,
3133                                 uint32_t *nodes,
3134                                 uint64_t srvid,
3135                                 struct timeval timeout,
3136                                 bool dont_log_errors,
3137                                 TDB_DATA data,
3138                                 client_async_callback client_callback,
3139                                 client_async_callback fail_callback,
3140                                 void *callback_data)
3141 {
3142         struct client_async_data *async_data;
3143         struct ctdb_client_control_state *state;
3144         int j, num_nodes;
3145
3146         async_data = talloc_zero(ctdb, struct client_async_data);
3147         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3148         async_data->dont_log_errors = dont_log_errors;
3149         async_data->callback = client_callback;
3150         async_data->fail_callback = fail_callback;
3151         async_data->callback_data = callback_data;
3152         async_data->opcode        = opcode;
3153
3154         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3155
3156         /* loop over all nodes and send an async control to each of them */
3157         for (j=0; j<num_nodes; j++) {
3158                 uint32_t pnn = nodes[j];
3159
3160                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3161                                           0, data, async_data, &timeout, NULL);
3162                 if (state == NULL) {
3163                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3164                         talloc_free(async_data);
3165                         return -1;
3166                 }
3167                 
3168                 ctdb_client_async_add(async_data, state);
3169         }
3170
3171         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3172                 talloc_free(async_data);
3173                 return -1;
3174         }
3175
3176         talloc_free(async_data);
3177         return 0;
3178 }
3179
3180 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3181                                 struct ctdb_vnn_map *vnn_map,
3182                                 TALLOC_CTX *mem_ctx,
3183                                 bool include_self)
3184 {
3185         int i, j, num_nodes;
3186         uint32_t *nodes;
3187
3188         for (i=num_nodes=0;i<vnn_map->size;i++) {
3189                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3190                         continue;
3191                 }
3192                 num_nodes++;
3193         } 
3194
3195         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3196         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3197
3198         for (i=j=0;i<vnn_map->size;i++) {
3199                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3200                         continue;
3201                 }
3202                 nodes[j++] = vnn_map->map[i];
3203         } 
3204
3205         return nodes;
3206 }
3207
3208 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3209                                 struct ctdb_node_map *node_map,
3210                                 TALLOC_CTX *mem_ctx,
3211                                 bool include_self)
3212 {
3213         int i, j, num_nodes;
3214         uint32_t *nodes;
3215
3216         for (i=num_nodes=0;i<node_map->num;i++) {
3217                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3218                         continue;
3219                 }
3220                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3221                         continue;
3222                 }
3223                 num_nodes++;
3224         } 
3225
3226         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3227         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3228
3229         for (i=j=0;i<node_map->num;i++) {
3230                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3231                         continue;
3232                 }
3233                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3234                         continue;
3235                 }
3236                 nodes[j++] = node_map->nodes[i].pnn;
3237         } 
3238
3239         return nodes;
3240 }
3241
3242 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3243                                 struct ctdb_node_map *node_map,
3244                                 TALLOC_CTX *mem_ctx,
3245                                 uint32_t pnn)
3246 {
3247         int i, j, num_nodes;
3248         uint32_t *nodes;
3249
3250         for (i=num_nodes=0;i<node_map->num;i++) {
3251                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3252                         continue;
3253                 }
3254                 if (node_map->nodes[i].pnn == pnn) {
3255                         continue;
3256                 }
3257                 num_nodes++;
3258         } 
3259
3260         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3261         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3262
3263         for (i=j=0;i<node_map->num;i++) {
3264                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3265                         continue;
3266                 }
3267                 if (node_map->nodes[i].pnn == pnn) {
3268                         continue;
3269                 }
3270                 nodes[j++] = node_map->nodes[i].pnn;
3271         } 
3272
3273         return nodes;
3274 }
3275
3276 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3277                                 struct ctdb_node_map *node_map,
3278                                 TALLOC_CTX *mem_ctx,
3279                                 bool include_self)
3280 {
3281         int i, j, num_nodes;
3282         uint32_t *nodes;
3283
3284         for (i=num_nodes=0;i<node_map->num;i++) {
3285                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3286                         continue;
3287                 }
3288                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3289                         continue;
3290                 }
3291                 num_nodes++;
3292         } 
3293
3294         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3295         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3296
3297         for (i=j=0;i<node_map->num;i++) {
3298                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3299                         continue;
3300                 }
3301                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3302                         continue;
3303                 }
3304                 nodes[j++] = node_map->nodes[i].pnn;
3305         } 
3306
3307         return nodes;
3308 }
3309
3310 /* 
3311   this is used to test if a pnn lock exists and if it exists will return
3312   the number of connections that pnn has reported or -1 if that recovery
3313   daemon is not running.
3314 */
3315 int
3316 ctdb_read_pnn_lock(int fd, int32_t pnn)
3317 {
3318         struct flock lock;
3319         char c;
3320
3321         lock.l_type = F_WRLCK;
3322         lock.l_whence = SEEK_SET;
3323         lock.l_start = pnn;
3324         lock.l_len = 1;
3325         lock.l_pid = 0;
3326
3327         if (fcntl(fd, F_GETLK, &lock) != 0) {
3328                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3329                 return -1;
3330         }
3331
3332         if (lock.l_type == F_UNLCK) {
3333                 return -1;
3334         }
3335
3336         if (pread(fd, &c, 1, pnn) == -1) {
3337                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3338                 return -1;
3339         }
3340
3341         return c;
3342 }
3343
3344 /*
3345   get capabilities of a remote node
3346  */
3347 struct ctdb_client_control_state *
3348 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3349 {
3350         return ctdb_control_send(ctdb, destnode, 0, 
3351                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3352                            mem_ctx, &timeout, NULL);
3353 }
3354
3355 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3356 {
3357         int ret;
3358         int32_t res;
3359         TDB_DATA outdata;
3360
3361         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3362         if ( (ret != 0) || (res != 0) ) {
3363                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3364                 return -1;
3365         }
3366
3367         if (capabilities) {
3368                 *capabilities = *((uint32_t *)outdata.dptr);
3369         }
3370
3371         return 0;
3372 }
3373
3374 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3375 {
3376         struct ctdb_client_control_state *state;
3377         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3378         int ret;
3379
3380         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3381         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3382         talloc_free(tmp_ctx);
3383         return ret;
3384 }
3385
3386 /**
3387  * check whether a transaction is active on a given db on a given node
3388  */
3389 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3390                                      uint32_t destnode,
3391                                      uint32_t db_id)
3392 {
3393         int32_t status;
3394         int ret;
3395         TDB_DATA indata;
3396
3397         indata.dptr = (uint8_t *)&db_id;
3398         indata.dsize = sizeof(db_id);
3399
3400         ret = ctdb_control(ctdb, destnode, 0,
3401                            CTDB_CONTROL_TRANS2_ACTIVE,
3402                            0, indata, NULL, NULL, &status,
3403                            NULL, NULL);
3404
3405         if (ret != 0) {
3406                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3407                 return -1;
3408         }
3409
3410         return status;
3411 }
3412
3413
3414 struct ctdb_transaction_handle {
3415         struct ctdb_db_context *ctdb_db;
3416         bool in_replay;
3417         /*
3418          * we store the reads and writes done under a transaction:
3419          * - one list stores both reads and writes (m_all),
3420          * - the other just writes (m_write)
3421          */
3422         struct ctdb_marshall_buffer *m_all;
3423         struct ctdb_marshall_buffer *m_write;
3424 };
3425
3426 /* start a transaction on a database */
3427 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3428 {
3429         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3430         return 0;
3431 }
3432
3433 /* start a transaction on a database */
3434 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3435 {
3436         struct ctdb_record_handle *rh;
3437         TDB_DATA key;
3438         TDB_DATA data;
3439         struct ctdb_ltdb_header header;
3440         TALLOC_CTX *tmp_ctx;
3441         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3442         int ret;
3443         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3444         pid_t pid;
3445         int32_t status;
3446
3447         key.dptr = discard_const(keyname);
3448         key.dsize = strlen(keyname);
3449
3450         if (!ctdb_db->persistent) {
3451                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3452                 return -1;
3453         }
3454
3455 again:
3456         tmp_ctx = talloc_new(h);
3457
3458         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3459         if (rh == NULL) {
3460                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3461                 talloc_free(tmp_ctx);
3462                 return -1;
3463         }
3464
3465         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3466                                               CTDB_CURRENT_NODE,
3467                                               ctdb_db->db_id);
3468         if (status == 1) {
3469                 unsigned long int usec = (1000 + random()) % 100000;
3470                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3471                                     "on db_id[0x%08x]. waiting for %lu "
3472                                     "microseconds\n",
3473                                     ctdb_db->db_id, usec));
3474                 talloc_free(tmp_ctx);
3475                 usleep(usec);
3476                 goto again;
3477         }
3478
3479         /*
3480          * store the pid in the database:
3481          * it is not enough that the node is dmaster...
3482          */
3483         pid = getpid();
3484         data.dptr = (unsigned char *)&pid;
3485         data.dsize = sizeof(pid_t);
3486         rh->header.rsn++;
3487         rh->header.dmaster = ctdb_db->ctdb->pnn;
3488         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3489         if (ret != 0) {
3490                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3491                                   "transaction record\n"));
3492                 talloc_free(tmp_ctx);
3493                 return -1;
3494         }
3495
3496         talloc_free(rh);
3497
3498         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3499         if (ret != 0) {
3500                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3501                 talloc_free(tmp_ctx);
3502                 return -1;
3503         }
3504
3505         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3506         if (ret != 0) {
3507                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3508                                  "lock record inside transaction\n"));
3509                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3510                 talloc_free(tmp_ctx);
3511                 goto again;
3512         }
3513
3514         if (header.dmaster != ctdb_db->ctdb->pnn) {
3515                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3516                                    "transaction lock record\n"));
3517                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3518                 talloc_free(tmp_ctx);
3519                 goto again;
3520         }
3521
3522         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3523                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3524                                     "the transaction lock record\n"));
3525                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3526                 talloc_free(tmp_ctx);
3527                 goto again;
3528         }
3529
3530         talloc_free(tmp_ctx);
3531
3532         return 0;
3533 }
3534
3535
3536 /* start a transaction on a database */
3537 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3538                                                        TALLOC_CTX *mem_ctx)
3539 {
3540         struct ctdb_transaction_handle *h;
3541         int ret;
3542
3543         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3544         if (h == NULL) {
3545                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3546                 return NULL;
3547         }
3548
3549         h->ctdb_db = ctdb_db;
3550
3551         ret = ctdb_transaction_fetch_start(h);
3552         if (ret != 0) {
3553                 talloc_free(h);
3554                 return NULL;
3555         }
3556
3557         talloc_set_destructor(h, ctdb_transaction_destructor);
3558
3559         return h;
3560 }
3561
3562
3563
3564 /*
3565   fetch a record inside a transaction
3566  */
3567 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3568                            TALLOC_CTX *mem_ctx, 
3569                            TDB_DATA key, TDB_DATA *data)
3570 {
3571         struct ctdb_ltdb_header header;
3572         int ret;
3573
3574         ZERO_STRUCT(header);
3575
3576         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3577         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3578                 /* record doesn't exist yet */
3579                 *data = tdb_null;
3580                 ret = 0;
3581         }
3582         
3583         if (ret != 0) {
3584                 return ret;
3585         }
3586
3587         if (!h->in_replay) {
3588                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3589                 if (h->m_all == NULL) {
3590                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3591                         return -1;
3592                 }
3593         }
3594
3595         return 0;
3596 }
3597
3598 /*
3599   stores a record inside a transaction
3600  */
3601 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3602                            TDB_DATA key, TDB_DATA data)
3603 {
3604         TALLOC_CTX *tmp_ctx = talloc_new(h);
3605         struct ctdb_ltdb_header header;
3606         TDB_DATA olddata;
3607         int ret;
3608
3609         ZERO_STRUCT(header);
3610
3611         /* we need the header so we can update the RSN */
3612         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3613         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3614                 /* the record doesn't exist - create one with us as dmaster.
3615                    This is only safe because we are in a transaction and this
3616                    is a persistent database */
3617                 ZERO_STRUCT(header);
3618         } else if (ret != 0) {
3619                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3620                 talloc_free(tmp_ctx);
3621                 return ret;
3622         }
3623
3624         if (data.dsize == olddata.dsize &&
3625             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3626                 /* save writing the same data */
3627                 talloc_free(tmp_ctx);
3628                 return 0;
3629         }
3630
3631         header.dmaster = h->ctdb_db->ctdb->pnn;
3632         header.rsn++;
3633
3634         if (!h->in_replay) {
3635                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3636                 if (h->m_all == NULL) {
3637                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3638                         talloc_free(tmp_ctx);
3639                         return -1;
3640                 }
3641         }               
3642
3643         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3644         if (h->m_write == NULL) {
3645                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3646                 talloc_free(tmp_ctx);
3647                 return -1;
3648         }
3649         
3650         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3651
3652         talloc_free(tmp_ctx);
3653         
3654         return ret;
3655 }
3656
3657 /*
3658   replay a transaction
3659  */
3660 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3661 {
3662         int ret, i;
3663         struct ctdb_rec_data *rec = NULL;
3664
3665         h->in_replay = true;
3666         talloc_free(h->m_write);
3667         h->m_write = NULL;
3668
3669         ret = ctdb_transaction_fetch_start(h);
3670         if (ret != 0) {
3671                 return ret;
3672         }
3673
3674         for (i=0;i<h->m_all->count;i++) {
3675                 TDB_DATA key, data;
3676
3677                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3678                 if (rec == NULL) {
3679                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3680                         goto failed;
3681                 }
3682
3683                 if (rec->reqid == 0) {
3684                         /* its a store */
3685                         if (ctdb_transaction_store(h, key, data) != 0) {
3686                                 goto failed;
3687                         }
3688                 } else {
3689                         TDB_DATA data2;
3690                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3691
3692                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3693                                 talloc_free(tmp_ctx);
3694                                 goto failed;
3695                         }
3696                         if (data2.dsize != data.dsize ||
3697                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3698                                 /* the record has changed on us - we have to give up */
3699                                 talloc_free(tmp_ctx);
3700                                 goto failed;
3701                         }
3702                         talloc_free(tmp_ctx);
3703                 }
3704         }
3705         
3706         return 0;
3707
3708 failed:
3709         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3710         return -1;
3711 }
3712
3713
3714 /*
3715   commit a transaction
3716  */
3717 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3718 {
3719         int ret, retries=0;
3720         int32_t status;
3721         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3722         struct timeval timeout;
3723         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3724
3725         talloc_set_destructor(h, NULL);
3726
3727         /* our commit strategy is quite complex.
3728
3729            - we first try to commit the changes to all other nodes
3730
3731            - if that works, then we commit locally and we are done
3732
3733            - if a commit on another node fails, then we need to cancel
3734              the transaction, then restart the transaction (thus
3735              opening a window of time for a pending recovery to
3736              complete), then replay the transaction, checking all the
3737              reads and writes (checking that reads give the same data,
3738              and writes succeed). Then we retry the transaction to the
3739              other nodes
3740         */
3741
3742 again:
3743         if (h->m_write == NULL) {
3744                 /* no changes were made */
3745                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3746                 talloc_free(h);
3747                 return 0;
3748         }
3749
3750         /* tell ctdbd to commit to the other nodes */
3751         timeout = timeval_current_ofs(1, 0);
3752         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3753                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3754                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3755                            &timeout, NULL);
3756         if (ret != 0 || status != 0) {
3757                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3758                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3759                                      ", retrying after 1 second...\n",
3760                                      (retries==0)?"":"retry "));
3761                 sleep(1);
3762
3763                 if (ret != 0) {
3764                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3765                 } else {
3766                         /* work out what error code we will give if we 
3767                            have to fail the operation */
3768                         switch ((enum ctdb_trans2_commit_error)status) {
3769                         case CTDB_TRANS2_COMMIT_SUCCESS:
3770                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3771                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3772                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3773                                 break;
3774                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3775                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3776                                 break;
3777                         }
3778                 }
3779
3780                 if (++retries == 100) {
3781                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3782                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3783                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3784                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3785                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3786                         talloc_free(h);
3787                         return -1;
3788                 }               
3789
3790                 if (ctdb_replay_transaction(h) != 0) {
3791                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3792                                           "transaction on db 0x%08x, "
3793                                           "failure control =%u\n",
3794                                           h->ctdb_db->db_id,
3795                                           (unsigned)failure_control));
3796                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3797                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3798                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3799                         talloc_free(h);
3800                         return -1;
3801                 }
3802                 goto again;
3803         } else {
3804                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3805         }
3806
3807         /* do the real commit locally */
3808         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3809         if (ret != 0) {
3810                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3811                                   "on db id 0x%08x locally, "
3812                                   "failure_control=%u\n",
3813                                   h->ctdb_db->db_id,
3814                                   (unsigned)failure_control));
3815                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3816                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3817                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3818                 talloc_free(h);
3819                 return ret;
3820         }
3821
3822         /* tell ctdbd that we are finished with our local commit */
3823         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3824                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3825                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3826         talloc_free(h);
3827         return 0;
3828 }
3829
3830 /*
3831   recovery daemon ping to main daemon
3832  */
3833 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3834 {
3835         int ret;
3836         int32_t res;
3837
3838         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3839                            ctdb, NULL, &res, NULL, NULL);
3840         if (ret != 0 || res != 0) {
3841                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3842                 return -1;
3843         }
3844
3845         return 0;
3846 }
3847
3848 /* when forking the main daemon and the child process needs to connect back
3849  * to the daemon as a client process, this function can be used to change
3850  * the ctdb context from daemon into client mode
3851  */
3852 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3853 {
3854         int ret;
3855         va_list ap;
3856
3857         /* Add extra information so we can identify this in the logs */
3858         va_start(ap, fmt);
3859         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3860         va_end(ap);
3861
3862         /* shutdown the transport */
3863         if (ctdb->methods) {
3864                 ctdb->methods->shutdown(ctdb);
3865         }
3866
3867         /* get a new event context */
3868         talloc_free(ctdb->ev);
3869         ctdb->ev = event_context_init(ctdb);
3870         tevent_loop_allow_nesting(ctdb->ev);
3871
3872         close(ctdb->daemon.sd);
3873         ctdb->daemon.sd = -1;
3874
3875         /* the client does not need to be realtime */
3876         if (ctdb->do_setsched) {
3877                 ctdb_restore_scheduler(ctdb);
3878         }
3879
3880         /* initialise ctdb */
3881         ret = ctdb_socket_connect(ctdb);
3882         if (ret != 0) {
3883                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3884                 return -1;
3885         }
3886
3887          return 0;
3888 }
3889
3890 /*
3891   get the status of running the monitor eventscripts: NULL means never run.
3892  */
3893 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3894                 struct timeval timeout, uint32_t destnode, 
3895                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3896                 struct ctdb_scripts_wire **script_status)
3897 {
3898         int ret;
3899         TDB_DATA outdata, indata;
3900         int32_t res;
3901         uint32_t uinttype = type;
3902
3903         indata.dptr = (uint8_t *)&uinttype;
3904         indata.dsize = sizeof(uinttype);
3905
3906         ret = ctdb_control(ctdb, destnode, 0, 
3907                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3908                            mem_ctx, &outdata, &res, &timeout, NULL);
3909         if (ret != 0 || res != 0) {
3910                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3911                 return -1;
3912         }
3913
3914         if (outdata.dsize == 0) {
3915                 *script_status = NULL;
3916         } else {
3917                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3918                 talloc_free(outdata.dptr);
3919         }
3920                     
3921         return 0;
3922 }
3923
3924 /*
3925   tell the main daemon how long it took to lock the reclock file
3926  */
3927 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3928 {
3929         int ret;
3930         int32_t res;
3931         TDB_DATA data;
3932
3933         data.dptr = (uint8_t *)&latency;
3934         data.dsize = sizeof(latency);
3935
3936         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3937                            ctdb, NULL, &res, NULL, NULL);
3938         if (ret != 0 || res != 0) {
3939                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3940                 return -1;
3941         }
3942
3943         return 0;
3944 }
3945
3946 /*
3947   get the name of the reclock file
3948  */
3949 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3950                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3951                          const char **name)
3952 {
3953         int ret;
3954         int32_t res;
3955         TDB_DATA data;
3956
3957         ret = ctdb_control(ctdb, destnode, 0, 
3958                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3959                            mem_ctx, &data, &res, &timeout, NULL);
3960         if (ret != 0 || res != 0) {
3961                 return -1;
3962         }
3963
3964         if (data.dsize == 0) {
3965                 *name = NULL;
3966         } else {
3967                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3968         }
3969         talloc_free(data.dptr);
3970
3971         return 0;
3972 }
3973
3974 /*
3975   set the reclock filename for a node
3976  */
3977 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3978 {
3979         int ret;
3980         TDB_DATA data;
3981         int32_t res;
3982
3983         if (reclock == NULL) {
3984                 data.dsize = 0;
3985                 data.dptr  = NULL;
3986         } else {
3987                 data.dsize = strlen(reclock) + 1;
3988                 data.dptr  = discard_const(reclock);
3989         }
3990
3991         ret = ctdb_control(ctdb, destnode, 0, 
3992                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3993                            NULL, NULL, &res, &timeout, NULL);
3994         if (ret != 0 || res != 0) {
3995                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3996                 return -1;
3997         }
3998
3999         return 0;
4000 }
4001
4002 /*
4003   stop a node
4004  */
4005 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4006 {
4007         int ret;
4008         int32_t res;
4009
4010         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4011                            ctdb, NULL, &res, &timeout, NULL);
4012         if (ret != 0 || res != 0) {
4013                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4014                 return -1;
4015         }
4016
4017         return 0;
4018 }
4019
4020 /*
4021   continue a node
4022  */
4023 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4024 {
4025         int ret;
4026
4027         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4028                            ctdb, NULL, NULL, &timeout, NULL);
4029         if (ret != 0) {
4030                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4031                 return -1;
4032         }
4033
4034         return 0;
4035 }
4036
4037 /*
4038   set the natgw state for a node
4039  */
4040 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4041 {
4042         int ret;
4043         TDB_DATA data;
4044         int32_t res;
4045
4046         data.dsize = sizeof(natgwstate);
4047         data.dptr  = (uint8_t *)&natgwstate;
4048
4049         ret = ctdb_control(ctdb, destnode, 0, 
4050                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4051                            NULL, NULL, &res, &timeout, NULL);
4052         if (ret != 0 || res != 0) {
4053                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4054                 return -1;
4055         }
4056
4057         return 0;
4058 }
4059
4060 /*
4061   set the lmaster role for a node
4062  */
4063 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4064 {
4065         int ret;
4066         TDB_DATA data;
4067         int32_t res;
4068
4069         data.dsize = sizeof(lmasterrole);
4070         data.dptr  = (uint8_t *)&lmasterrole;
4071
4072         ret = ctdb_control(ctdb, destnode, 0, 
4073                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4074                            NULL, NULL, &res, &timeout, NULL);
4075         if (ret != 0 || res != 0) {
4076                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4077                 return -1;
4078         }
4079
4080         return 0;
4081 }
4082
4083 /*
4084   set the recmaster role for a node
4085  */
4086 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4087 {
4088         int ret;
4089         TDB_DATA data;
4090         int32_t res;
4091
4092         data.dsize = sizeof(recmasterrole);
4093         data.dptr  = (uint8_t *)&recmasterrole;
4094
4095         ret = ctdb_control(ctdb, destnode, 0, 
4096                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4097                            NULL, NULL, &res, &timeout, NULL);
4098         if (ret != 0 || res != 0) {
4099                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4100                 return -1;
4101         }
4102
4103         return 0;
4104 }
4105
4106 /* enable an eventscript
4107  */
4108 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4109 {
4110         int ret;
4111         TDB_DATA data;
4112         int32_t res;
4113
4114         data.dsize = strlen(script) + 1;
4115         data.dptr  = discard_const(script);
4116
4117         ret = ctdb_control(ctdb, destnode, 0, 
4118                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4119                            NULL, NULL, &res, &timeout, NULL);
4120         if (ret != 0 || res != 0) {
4121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4122                 return -1;
4123         }
4124
4125         return 0;
4126 }
4127
4128 /* disable an eventscript
4129  */
4130 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4131 {
4132         int ret;
4133         TDB_DATA data;
4134         int32_t res;
4135
4136         data.dsize = strlen(script) + 1;
4137         data.dptr  = discard_const(script);
4138
4139         ret = ctdb_control(ctdb, destnode, 0, 
4140                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4141                            NULL, NULL, &res, &timeout, NULL);
4142         if (ret != 0 || res != 0) {
4143                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4144                 return -1;
4145         }
4146
4147         return 0;
4148 }
4149
4150
4151 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4152 {
4153         int ret;
4154         TDB_DATA data;
4155         int32_t res;
4156
4157         data.dsize = sizeof(*bantime);
4158         data.dptr  = (uint8_t *)bantime;
4159
4160         ret = ctdb_control(ctdb, destnode, 0, 
4161                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4162                            NULL, NULL, &res, &timeout, NULL);
4163         if (ret != 0 || res != 0) {
4164                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4165                 return -1;
4166         }
4167
4168         return 0;
4169 }
4170
4171
4172 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4173 {
4174         int ret;
4175         TDB_DATA outdata;
4176         int32_t res;
4177         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4178
4179         ret = ctdb_control(ctdb, destnode, 0, 
4180                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4181                            tmp_ctx, &outdata, &res, &timeout, NULL);
4182         if (ret != 0 || res != 0) {
4183                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4184                 talloc_free(tmp_ctx);
4185                 return -1;
4186         }
4187
4188         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4189         talloc_free(tmp_ctx);
4190
4191         return 0;
4192 }
4193
4194
4195 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4196 {
4197         int ret;
4198         int32_t res;
4199         TDB_DATA data;
4200         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4201
4202         data.dptr = (uint8_t*)db_prio;
4203         data.dsize = sizeof(*db_prio);
4204
4205         ret = ctdb_control(ctdb, destnode, 0, 
4206                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4207                            tmp_ctx, NULL, &res, &timeout, NULL);
4208         if (ret != 0 || res != 0) {
4209                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4210                 talloc_free(tmp_ctx);
4211                 return -1;
4212         }
4213
4214         talloc_free(tmp_ctx);
4215
4216         return 0;
4217 }
4218
4219 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4220 {
4221         int ret;
4222         int32_t res;
4223         TDB_DATA data;
4224         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4225
4226         data.dptr = (uint8_t*)&db_id;
4227         data.dsize = sizeof(db_id);
4228
4229         ret = ctdb_control(ctdb, destnode, 0, 
4230                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4231                            tmp_ctx, NULL, &res, &timeout, NULL);
4232         if (ret != 0 || res < 0) {
4233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4234                 talloc_free(tmp_ctx);
4235                 return -1;
4236         }
4237
4238         if (priority) {
4239                 *priority = res;
4240         }
4241
4242         talloc_free(tmp_ctx);
4243
4244         return 0;
4245 }
4246
4247 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4248 {
4249         int ret;
4250         TDB_DATA outdata;
4251         int32_t res;
4252
4253         ret = ctdb_control(ctdb, destnode, 0, 
4254                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4255                            mem_ctx, &outdata, &res, &timeout, NULL);
4256         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4257                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4258                 return -1;
4259         }
4260
4261         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4262         talloc_free(outdata.dptr);
4263                     
4264         return 0;
4265 }
4266
4267 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4268 {
4269         if (h == NULL) {
4270                 return NULL;
4271         }
4272
4273         return &h->header;
4274 }