Revert scheduling back to use real-time processes
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 pid_t ctdbd_pid;
34
35 /*
36   allocate a packet for use in client<->daemon communication
37  */
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
39                                             TALLOC_CTX *mem_ctx, 
40                                             enum ctdb_operation operation, 
41                                             size_t length, size_t slength,
42                                             const char *type)
43 {
44         int size;
45         struct ctdb_req_header *hdr;
46
47         length = MAX(length, slength);
48         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
49
50         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
51         if (hdr == NULL) {
52                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53                          operation, (unsigned)length));
54                 return NULL;
55         }
56         talloc_set_name_const(hdr, type);
57         memset(hdr, 0, slength);
58         hdr->length       = length;
59         hdr->operation    = operation;
60         hdr->ctdb_magic   = CTDB_MAGIC;
61         hdr->ctdb_version = CTDB_VERSION;
62         hdr->srcnode      = ctdb->pnn;
63         if (ctdb->vnn_map) {
64                 hdr->generation = ctdb->vnn_map->generation;
65         }
66
67         return hdr;
68 }
69
70 /*
71   local version of ctdb_call
72 */
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75                     TDB_DATA *data)
76 {
77         struct ctdb_call_info *c;
78         struct ctdb_registered_call *fn;
79         struct ctdb_context *ctdb = ctdb_db->ctdb;
80         
81         c = talloc(ctdb, struct ctdb_call_info);
82         CTDB_NO_MEMORY(ctdb, c);
83
84         c->key = call->key;
85         c->call_data = &call->call_data;
86         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87         c->record_data.dsize = data->dsize;
88         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
89         c->new_data = NULL;
90         c->reply_data = NULL;
91         c->status = 0;
92
93         for (fn=ctdb_db->calls;fn;fn=fn->next) {
94                 if (fn->id == call->call_id) break;
95         }
96         if (fn == NULL) {
97                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
98                 talloc_free(c);
99                 return -1;
100         }
101
102         if (fn->fn(c) != 0) {
103                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
104                 talloc_free(c);
105                 return -1;
106         }
107
108         /* we need to force the record to be written out if this was a remote access */
109         if (c->new_data == NULL) {
110                 c->new_data = &c->record_data;
111         }
112
113         if (c->new_data) {
114                 /* XXX check that we always have the lock here? */
115                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
116                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
117                         talloc_free(c);
118                         return -1;
119                 }
120         }
121
122         if (c->reply_data) {
123                 call->reply_data = *c->reply_data;
124
125                 talloc_steal(call, call->reply_data.dptr);
126                 talloc_set_name_const(call->reply_data.dptr, __location__);
127         } else {
128                 call->reply_data.dptr = NULL;
129                 call->reply_data.dsize = 0;
130         }
131         call->status = c->status;
132
133         talloc_free(c);
134
135         return 0;
136 }
137
138
139 /*
140   queue a packet for sending from client to daemon
141 */
142 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
143 {
144         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
145 }
146
147
148 /*
149   called when a CTDB_REPLY_CALL packet comes in in the client
150
151   This packet comes in response to a CTDB_REQ_CALL request packet. It
152   contains any reply data from the call
153 */
154 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
155 {
156         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
157         struct ctdb_client_call_state *state;
158
159         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
160         if (state == NULL) {
161                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
162                 return;
163         }
164
165         if (hdr->reqid != state->reqid) {
166                 /* we found a record  but it was the wrong one */
167                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
168                 return;
169         }
170
171         state->call->reply_data.dptr = c->data;
172         state->call->reply_data.dsize = c->datalen;
173         state->call->status = c->status;
174
175         talloc_steal(state, c);
176
177         state->state = CTDB_CALL_DONE;
178
179         if (state->async.fn) {
180                 state->async.fn(state);
181         }
182 }
183
184 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
185
186 /*
187   this is called in the client, when data comes in from the daemon
188  */
189 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
190 {
191         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
192         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
193         TALLOC_CTX *tmp_ctx;
194
195         /* place the packet as a child of a tmp_ctx. We then use
196            talloc_free() below to free it. If any of the calls want
197            to keep it, then they will steal it somewhere else, and the
198            talloc_free() will be a no-op */
199         tmp_ctx = talloc_new(ctdb);
200         talloc_steal(tmp_ctx, hdr);
201
202         if (cnt == 0) {
203                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
204                 exit(0);
205         }
206
207         if (cnt < sizeof(*hdr)) {
208                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
209                 goto done;
210         }
211         if (cnt != hdr->length) {
212                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
213                                (unsigned)hdr->length, (unsigned)cnt);
214                 goto done;
215         }
216
217         if (hdr->ctdb_magic != CTDB_MAGIC) {
218                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
219                 goto done;
220         }
221
222         if (hdr->ctdb_version != CTDB_VERSION) {
223                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
224                 goto done;
225         }
226
227         switch (hdr->operation) {
228         case CTDB_REPLY_CALL:
229                 ctdb_client_reply_call(ctdb, hdr);
230                 break;
231
232         case CTDB_REQ_MESSAGE:
233                 ctdb_request_message(ctdb, hdr);
234                 break;
235
236         case CTDB_REPLY_CONTROL:
237                 ctdb_client_reply_control(ctdb, hdr);
238                 break;
239
240         default:
241                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
242         }
243
244 done:
245         talloc_free(tmp_ctx);
246 }
247
248 /*
249   connect to a unix domain socket
250 */
251 int ctdb_socket_connect(struct ctdb_context *ctdb)
252 {
253         struct sockaddr_un addr;
254
255         memset(&addr, 0, sizeof(addr));
256         addr.sun_family = AF_UNIX;
257         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
258
259         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
260         if (ctdb->daemon.sd == -1) {
261                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
262                 return -1;
263         }
264
265         set_nonblocking(ctdb->daemon.sd);
266         set_close_on_exec(ctdb->daemon.sd);
267         
268         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
269                 close(ctdb->daemon.sd);
270                 ctdb->daemon.sd = -1;
271                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
272                 return -1;
273         }
274
275         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
276                                               CTDB_DS_ALIGNMENT, 
277                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
278         return 0;
279 }
280
281
282 struct ctdb_record_handle {
283         struct ctdb_db_context *ctdb_db;
284         TDB_DATA key;
285         TDB_DATA *data;
286         struct ctdb_ltdb_header header;
287 };
288
289
290 /*
291   make a recv call to the local ctdb daemon - called from client context
292
293   This is called when the program wants to wait for a ctdb_call to complete and get the 
294   results. This call will block unless the call has already completed.
295 */
296 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
297 {
298         if (state == NULL) {
299                 return -1;
300         }
301
302         while (state->state < CTDB_CALL_DONE) {
303                 event_loop_once(state->ctdb_db->ctdb->ev);
304         }
305         if (state->state != CTDB_CALL_DONE) {
306                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
307                 talloc_free(state);
308                 return -1;
309         }
310
311         if (state->call->reply_data.dsize) {
312                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
313                                                       state->call->reply_data.dptr,
314                                                       state->call->reply_data.dsize);
315                 call->reply_data.dsize = state->call->reply_data.dsize;
316         } else {
317                 call->reply_data.dptr = NULL;
318                 call->reply_data.dsize = 0;
319         }
320         call->status = state->call->status;
321         talloc_free(state);
322
323         return 0;
324 }
325
326
327
328
329 /*
330   destroy a ctdb_call in client
331 */
332 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
333 {
334         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
335         return 0;
336 }
337
338 /*
339   construct an event driven local ctdb_call
340
341   this is used so that locally processed ctdb_call requests are processed
342   in an event driven manner
343 */
344 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
345                                                                   struct ctdb_call *call,
346                                                                   struct ctdb_ltdb_header *header,
347                                                                   TDB_DATA *data)
348 {
349         struct ctdb_client_call_state *state;
350         struct ctdb_context *ctdb = ctdb_db->ctdb;
351         int ret;
352
353         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
354         CTDB_NO_MEMORY_NULL(ctdb, state);
355         state->call = talloc_zero(state, struct ctdb_call);
356         CTDB_NO_MEMORY_NULL(ctdb, state->call);
357
358         talloc_steal(state, data->dptr);
359
360         state->state   = CTDB_CALL_DONE;
361         *(state->call) = *call;
362         state->ctdb_db = ctdb_db;
363
364         ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if (ret == 0 && header.dmaster == ctdb->pnn) {
400                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
401                 talloc_free(data.dptr);
402                 ctdb_ltdb_unlock(ctdb_db, call->key);
403                 return state;
404         }
405
406         ctdb_ltdb_unlock(ctdb_db, call->key);
407         talloc_free(data.dptr);
408
409         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
410         if (state == NULL) {
411                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
412                 return NULL;
413         }
414         state->call = talloc_zero(state, struct ctdb_call);
415         if (state->call == NULL) {
416                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
417                 return NULL;
418         }
419
420         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
421         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
422         if (c == NULL) {
423                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
424                 return NULL;
425         }
426
427         state->reqid     = ctdb_reqid_new(ctdb, state);
428         state->ctdb_db = ctdb_db;
429         talloc_set_destructor(state, ctdb_client_call_destructor);
430
431         c->hdr.reqid     = state->reqid;
432         c->flags         = call->flags;
433         c->db_id         = ctdb_db->db_id;
434         c->callid        = call->call_id;
435         c->hopcount      = 0;
436         c->keylen        = call->key.dsize;
437         c->calldatalen   = call->call_data.dsize;
438         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
439         memcpy(&c->data[call->key.dsize], 
440                call->call_data.dptr, call->call_data.dsize);
441         *(state->call)              = *call;
442         state->call->call_data.dptr = &c->data[call->key.dsize];
443         state->call->key.dptr       = &c->data[0];
444
445         state->state  = CTDB_CALL_WAIT;
446
447
448         ctdb_client_queue_pkt(ctdb, &c->hdr);
449
450         return state;
451 }
452
453
454 /*
455   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
456 */
457 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
458 {
459         struct ctdb_client_call_state *state;
460
461         state = ctdb_call_send(ctdb_db, call);
462         return ctdb_call_recv(state, call);
463 }
464
465
466 /*
467   tell the daemon what messaging srvid we will use, and register the message
468   handler function in the client
469 */
470 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
471                              ctdb_msg_fn_t handler,
472                              void *private_data)
473                                     
474 {
475         int res;
476         int32_t status;
477         
478         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
479                            tdb_null, NULL, NULL, &status, NULL, NULL);
480         if (res != 0 || status != 0) {
481                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
482                 return -1;
483         }
484
485         /* also need to register the handler with our own ctdb structure */
486         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
487 }
488
489 /*
490   tell the daemon we no longer want a srvid
491 */
492 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
493 {
494         int res;
495         int32_t status;
496         
497         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
498                            tdb_null, NULL, NULL, &status, NULL, NULL);
499         if (res != 0 || status != 0) {
500                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
501                 return -1;
502         }
503
504         /* also need to register the handler with our own ctdb structure */
505         ctdb_deregister_message_handler(ctdb, srvid, private_data);
506         return 0;
507 }
508
509
510 /*
511   send a message - from client context
512  */
513 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
514                       uint64_t srvid, TDB_DATA data)
515 {
516         struct ctdb_req_message *r;
517         int len, res;
518
519         len = offsetof(struct ctdb_req_message, data) + data.dsize;
520         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
521                                len, struct ctdb_req_message);
522         CTDB_NO_MEMORY(ctdb, r);
523
524         r->hdr.destnode  = pnn;
525         r->srvid         = srvid;
526         r->datalen       = data.dsize;
527         memcpy(&r->data[0], data.dptr, data.dsize);
528         
529         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
530         if (res != 0) {
531                 return res;
532         }
533
534         talloc_free(r);
535         return 0;
536 }
537
538
539 /*
540   cancel a ctdb_fetch_lock operation, releasing the lock
541  */
542 static int fetch_lock_destructor(struct ctdb_record_handle *h)
543 {
544         ctdb_ltdb_unlock(h->ctdb_db, h->key);
545         return 0;
546 }
547
548 /*
549   force the migration of a record to this node
550  */
551 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
552 {
553         struct ctdb_call call;
554         ZERO_STRUCT(call);
555         call.call_id = CTDB_NULL_FUNC;
556         call.key = key;
557         call.flags = CTDB_IMMEDIATE_MIGRATION;
558         return ctdb_call(ctdb_db, &call);
559 }
560
561 /*
562   get a lock on a record, and return the records data. Blocks until it gets the lock
563  */
564 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
565                                            TDB_DATA key, TDB_DATA *data)
566 {
567         int ret;
568         struct ctdb_record_handle *h;
569
570         /*
571           procedure is as follows:
572
573           1) get the chain lock. 
574           2) check if we are dmaster
575           3) if we are the dmaster then return handle 
576           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
577              reply from ctdbd
578           5) when we get the reply, goto (1)
579          */
580
581         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
582         if (h == NULL) {
583                 return NULL;
584         }
585
586         h->ctdb_db = ctdb_db;
587         h->key     = key;
588         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
589         if (h->key.dptr == NULL) {
590                 talloc_free(h);
591                 return NULL;
592         }
593         h->data    = data;
594
595         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
596                  (const char *)key.dptr));
597
598 again:
599         /* step 1 - get the chain lock */
600         ret = ctdb_ltdb_lock(ctdb_db, key);
601         if (ret != 0) {
602                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
603                 talloc_free(h);
604                 return NULL;
605         }
606
607         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
608
609         talloc_set_destructor(h, fetch_lock_destructor);
610
611         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
612
613         /* when torturing, ensure we test the remote path */
614         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
615             random() % 5 == 0) {
616                 h->header.dmaster = (uint32_t)-1;
617         }
618
619
620         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
621
622         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
623                 ctdb_ltdb_unlock(ctdb_db, key);
624                 ret = ctdb_client_force_migration(ctdb_db, key);
625                 if (ret != 0) {
626                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
627                         talloc_free(h);
628                         return NULL;
629                 }
630                 goto again;
631         }
632
633         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
634         return h;
635 }
636
637 /*
638   store some data to the record that was locked with ctdb_fetch_lock()
639 */
640 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
641 {
642         if (h->ctdb_db->persistent) {
643                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
644                 return -1;
645         }
646
647         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
648 }
649
650 /*
651   non-locking fetch of a record
652  */
653 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
654                TDB_DATA key, TDB_DATA *data)
655 {
656         struct ctdb_call call;
657         int ret;
658
659         call.call_id = CTDB_FETCH_FUNC;
660         call.call_data.dptr = NULL;
661         call.call_data.dsize = 0;
662
663         ret = ctdb_call(ctdb_db, &call);
664
665         if (ret == 0) {
666                 *data = call.reply_data;
667                 talloc_steal(mem_ctx, data->dptr);
668         }
669
670         return ret;
671 }
672
673
674
675 /*
676    called when a control completes or timesout to invoke the callback
677    function the user provided
678 */
679 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
680         struct timeval t, void *private_data)
681 {
682         struct ctdb_client_control_state *state;
683         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
684         int ret;
685
686         state = talloc_get_type(private_data, struct ctdb_client_control_state);
687         talloc_steal(tmp_ctx, state);
688
689         ret = ctdb_control_recv(state->ctdb, state, state,
690                         NULL, 
691                         NULL, 
692                         NULL);
693
694         talloc_free(tmp_ctx);
695 }
696
697 /*
698   called when a CTDB_REPLY_CONTROL packet comes in in the client
699
700   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
701   contains any reply data from the control
702 */
703 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
704                                       struct ctdb_req_header *hdr)
705 {
706         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
707         struct ctdb_client_control_state *state;
708
709         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
710         if (state == NULL) {
711                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
712                 return;
713         }
714
715         if (hdr->reqid != state->reqid) {
716                 /* we found a record  but it was the wrong one */
717                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
718                 return;
719         }
720
721         state->outdata.dptr = c->data;
722         state->outdata.dsize = c->datalen;
723         state->status = c->status;
724         if (c->errorlen) {
725                 state->errormsg = talloc_strndup(state, 
726                                                  (char *)&c->data[c->datalen], 
727                                                  c->errorlen);
728         }
729
730         /* state->outdata now uses resources from c so we dont want c
731            to just dissappear from under us while state is still alive
732         */
733         talloc_steal(state, c);
734
735         state->state = CTDB_CONTROL_DONE;
736
737         /* if we had a callback registered for this control, pull the response
738            and call the callback.
739         */
740         if (state->async.fn) {
741                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
742         }
743 }
744
745
746 /*
747   destroy a ctdb_control in client
748 */
749 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
750 {
751         ctdb_reqid_remove(state->ctdb, state->reqid);
752         return 0;
753 }
754
755
756 /* time out handler for ctdb_control */
757 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
758         struct timeval t, void *private_data)
759 {
760         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
761
762         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
763                          "dstnode:%u\n", state->reqid, state->c->opcode,
764                          state->c->hdr.destnode));
765
766         state->state = CTDB_CONTROL_TIMEOUT;
767
768         /* if we had a callback registered for this control, pull the response
769            and call the callback.
770         */
771         if (state->async.fn) {
772                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
773         }
774 }
775
776 /* async version of send control request */
777 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
778                 uint32_t destnode, uint64_t srvid, 
779                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
780                 TALLOC_CTX *mem_ctx,
781                 struct timeval *timeout,
782                 char **errormsg)
783 {
784         struct ctdb_client_control_state *state;
785         size_t len;
786         struct ctdb_req_control *c;
787         int ret;
788
789         if (errormsg) {
790                 *errormsg = NULL;
791         }
792
793         /* if the domain socket is not yet open, open it */
794         if (ctdb->daemon.sd==-1) {
795                 ctdb_socket_connect(ctdb);
796         }
797
798         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
799         CTDB_NO_MEMORY_NULL(ctdb, state);
800
801         state->ctdb       = ctdb;
802         state->reqid      = ctdb_reqid_new(ctdb, state);
803         state->state      = CTDB_CONTROL_WAIT;
804         state->errormsg   = NULL;
805
806         talloc_set_destructor(state, ctdb_control_destructor);
807
808         len = offsetof(struct ctdb_req_control, data) + data.dsize;
809         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
810                                len, struct ctdb_req_control);
811         state->c            = c;        
812         CTDB_NO_MEMORY_NULL(ctdb, c);
813         c->hdr.reqid        = state->reqid;
814         c->hdr.destnode     = destnode;
815         c->opcode           = opcode;
816         c->client_id        = 0;
817         c->flags            = flags;
818         c->srvid            = srvid;
819         c->datalen          = data.dsize;
820         if (data.dsize) {
821                 memcpy(&c->data[0], data.dptr, data.dsize);
822         }
823
824         /* timeout */
825         if (timeout && !timeval_is_zero(timeout)) {
826                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
827         }
828
829         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
830         if (ret != 0) {
831                 talloc_free(state);
832                 return NULL;
833         }
834
835         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
836                 talloc_free(state);
837                 return NULL;
838         }
839
840         return state;
841 }
842
843
844 /* async version of receive control reply */
845 int ctdb_control_recv(struct ctdb_context *ctdb, 
846                 struct ctdb_client_control_state *state, 
847                 TALLOC_CTX *mem_ctx,
848                 TDB_DATA *outdata, int32_t *status, char **errormsg)
849 {
850         TALLOC_CTX *tmp_ctx;
851
852         if (status != NULL) {
853                 *status = -1;
854         }
855         if (errormsg != NULL) {
856                 *errormsg = NULL;
857         }
858
859         if (state == NULL) {
860                 return -1;
861         }
862
863         /* prevent double free of state */
864         tmp_ctx = talloc_new(ctdb);
865         talloc_steal(tmp_ctx, state);
866
867         /* loop one event at a time until we either timeout or the control
868            completes.
869         */
870         while (state->state == CTDB_CONTROL_WAIT) {
871                 event_loop_once(ctdb->ev);
872         }
873
874         if (state->state != CTDB_CONTROL_DONE) {
875                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
876                 if (state->async.fn) {
877                         state->async.fn(state);
878                 }
879                 talloc_free(tmp_ctx);
880                 return -1;
881         }
882
883         if (state->errormsg) {
884                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
885                 if (errormsg) {
886                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
887                 }
888                 if (state->async.fn) {
889                         state->async.fn(state);
890                 }
891                 talloc_free(tmp_ctx);
892                 return -1;
893         }
894
895         if (outdata) {
896                 *outdata = state->outdata;
897                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
898         }
899
900         if (status) {
901                 *status = state->status;
902         }
903
904         if (state->async.fn) {
905                 state->async.fn(state);
906         }
907
908         talloc_free(tmp_ctx);
909         return 0;
910 }
911
912
913
914 /*
915   send a ctdb control message
916   timeout specifies how long we should wait for a reply.
917   if timeout is NULL we wait indefinitely
918  */
919 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
920                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
921                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
922                  struct timeval *timeout,
923                  char **errormsg)
924 {
925         struct ctdb_client_control_state *state;
926
927         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
928                         flags, data, mem_ctx,
929                         timeout, errormsg);
930         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
931                         errormsg);
932 }
933
934
935
936
937 /*
938   a process exists call. Returns 0 if process exists, -1 otherwise
939  */
940 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
941 {
942         int ret;
943         TDB_DATA data;
944         int32_t status;
945
946         data.dptr = (uint8_t*)&pid;
947         data.dsize = sizeof(pid);
948
949         ret = ctdb_control(ctdb, destnode, 0, 
950                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
951                            NULL, NULL, &status, NULL, NULL);
952         if (ret != 0) {
953                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
954                 return -1;
955         }
956
957         return status;
958 }
959
960 /*
961   get remote statistics
962  */
963 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
964 {
965         int ret;
966         TDB_DATA data;
967         int32_t res;
968
969         ret = ctdb_control(ctdb, destnode, 0, 
970                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
971                            ctdb, &data, &res, NULL, NULL);
972         if (ret != 0 || res != 0) {
973                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
974                 return -1;
975         }
976
977         if (data.dsize != sizeof(struct ctdb_statistics)) {
978                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
979                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
980                       return -1;
981         }
982
983         *status = *(struct ctdb_statistics *)data.dptr;
984         talloc_free(data.dptr);
985                         
986         return 0;
987 }
988
989 /*
990   shutdown a remote ctdb node
991  */
992 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
993 {
994         struct ctdb_client_control_state *state;
995
996         state = ctdb_control_send(ctdb, destnode, 0, 
997                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
998                            NULL, &timeout, NULL);
999         if (state == NULL) {
1000                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1001                 return -1;
1002         }
1003
1004         return 0;
1005 }
1006
1007 /*
1008   get vnn map from a remote node
1009  */
1010 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1011 {
1012         int ret;
1013         TDB_DATA outdata;
1014         int32_t res;
1015         struct ctdb_vnn_map_wire *map;
1016
1017         ret = ctdb_control(ctdb, destnode, 0, 
1018                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1019                            mem_ctx, &outdata, &res, &timeout, NULL);
1020         if (ret != 0 || res != 0) {
1021                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1022                 return -1;
1023         }
1024         
1025         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1026         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1027             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1028                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1029                 return -1;
1030         }
1031
1032         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1033         CTDB_NO_MEMORY(ctdb, *vnnmap);
1034         (*vnnmap)->generation = map->generation;
1035         (*vnnmap)->size       = map->size;
1036         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1037
1038         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1039         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1040         talloc_free(outdata.dptr);
1041                     
1042         return 0;
1043 }
1044
1045
1046 /*
1047   get the recovery mode of a remote node
1048  */
1049 struct ctdb_client_control_state *
1050 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1051 {
1052         return ctdb_control_send(ctdb, destnode, 0, 
1053                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1054                            mem_ctx, &timeout, NULL);
1055 }
1056
1057 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1058 {
1059         int ret;
1060         int32_t res;
1061
1062         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1063         if (ret != 0) {
1064                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1065                 return -1;
1066         }
1067
1068         if (recmode) {
1069                 *recmode = (uint32_t)res;
1070         }
1071
1072         return 0;
1073 }
1074
1075 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1076 {
1077         struct ctdb_client_control_state *state;
1078
1079         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1080         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1081 }
1082
1083
1084
1085
1086 /*
1087   set the recovery mode of a remote node
1088  */
1089 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1090 {
1091         int ret;
1092         TDB_DATA data;
1093         int32_t res;
1094
1095         data.dsize = sizeof(uint32_t);
1096         data.dptr = (unsigned char *)&recmode;
1097
1098         ret = ctdb_control(ctdb, destnode, 0, 
1099                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1100                            NULL, NULL, &res, &timeout, NULL);
1101         if (ret != 0 || res != 0) {
1102                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1103                 return -1;
1104         }
1105
1106         return 0;
1107 }
1108
1109
1110
1111 /*
1112   get the recovery master of a remote node
1113  */
1114 struct ctdb_client_control_state *
1115 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1116                         struct timeval timeout, uint32_t destnode)
1117 {
1118         return ctdb_control_send(ctdb, destnode, 0, 
1119                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1120                            mem_ctx, &timeout, NULL);
1121 }
1122
1123 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1124 {
1125         int ret;
1126         int32_t res;
1127
1128         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1129         if (ret != 0) {
1130                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1131                 return -1;
1132         }
1133
1134         if (recmaster) {
1135                 *recmaster = (uint32_t)res;
1136         }
1137
1138         return 0;
1139 }
1140
1141 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1142 {
1143         struct ctdb_client_control_state *state;
1144
1145         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1146         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1147 }
1148
1149
1150 /*
1151   set the recovery master of a remote node
1152  */
1153 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1154 {
1155         int ret;
1156         TDB_DATA data;
1157         int32_t res;
1158
1159         ZERO_STRUCT(data);
1160         data.dsize = sizeof(uint32_t);
1161         data.dptr = (unsigned char *)&recmaster;
1162
1163         ret = ctdb_control(ctdb, destnode, 0, 
1164                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1165                            NULL, NULL, &res, &timeout, NULL);
1166         if (ret != 0 || res != 0) {
1167                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1168                 return -1;
1169         }
1170
1171         return 0;
1172 }
1173
1174
1175 /*
1176   get a list of databases off a remote node
1177  */
1178 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1179                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1180 {
1181         int ret;
1182         TDB_DATA outdata;
1183         int32_t res;
1184
1185         ret = ctdb_control(ctdb, destnode, 0, 
1186                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1187                            mem_ctx, &outdata, &res, &timeout, NULL);
1188         if (ret != 0 || res != 0) {
1189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1190                 return -1;
1191         }
1192
1193         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1194         talloc_free(outdata.dptr);
1195                     
1196         return 0;
1197 }
1198
1199 /*
1200   get a list of nodes (vnn and flags ) from a remote node
1201  */
1202 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1203                 struct timeval timeout, uint32_t destnode, 
1204                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1205 {
1206         int ret;
1207         TDB_DATA outdata;
1208         int32_t res;
1209
1210         ret = ctdb_control(ctdb, destnode, 0, 
1211                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1212                            mem_ctx, &outdata, &res, &timeout, NULL);
1213         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1214                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1215                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1216         }
1217         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1218                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1219                 return -1;
1220         }
1221
1222         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1223         talloc_free(outdata.dptr);
1224                     
1225         return 0;
1226 }
1227
1228 /*
1229   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1230  */
1231 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1232                 struct timeval timeout, uint32_t destnode, 
1233                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1234 {
1235         int ret, i, len;
1236         TDB_DATA outdata;
1237         struct ctdb_node_mapv4 *nodemapv4;
1238         int32_t res;
1239
1240         ret = ctdb_control(ctdb, destnode, 0, 
1241                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1242                            mem_ctx, &outdata, &res, &timeout, NULL);
1243         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1245                 return -1;
1246         }
1247
1248         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1249
1250         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1251         (*nodemap) = talloc_zero_size(mem_ctx, len);
1252         CTDB_NO_MEMORY(ctdb, (*nodemap));
1253
1254         (*nodemap)->num = nodemapv4->num;
1255         for (i=0; i<nodemapv4->num; i++) {
1256                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1257                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1258                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1259                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1260         }
1261                 
1262         talloc_free(outdata.dptr);
1263                     
1264         return 0;
1265 }
1266
1267 /*
1268   drop the transport, reload the nodes file and restart the transport
1269  */
1270 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1271                     struct timeval timeout, uint32_t destnode)
1272 {
1273         int ret;
1274         int32_t res;
1275
1276         ret = ctdb_control(ctdb, destnode, 0, 
1277                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1278                            NULL, NULL, &res, &timeout, NULL);
1279         if (ret != 0 || res != 0) {
1280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1281                 return -1;
1282         }
1283
1284         return 0;
1285 }
1286
1287
1288 /*
1289   set vnn map on a node
1290  */
1291 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1292                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1293 {
1294         int ret;
1295         TDB_DATA data;
1296         int32_t res;
1297         struct ctdb_vnn_map_wire *map;
1298         size_t len;
1299
1300         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1301         map = talloc_size(mem_ctx, len);
1302         CTDB_NO_MEMORY(ctdb, map);
1303
1304         map->generation = vnnmap->generation;
1305         map->size = vnnmap->size;
1306         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1307         
1308         data.dsize = len;
1309         data.dptr  = (uint8_t *)map;
1310
1311         ret = ctdb_control(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1313                            NULL, NULL, &res, &timeout, NULL);
1314         if (ret != 0 || res != 0) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1316                 return -1;
1317         }
1318
1319         talloc_free(map);
1320
1321         return 0;
1322 }
1323
1324
1325 /*
1326   async send for pull database
1327  */
1328 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1329         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1330         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1331 {
1332         TDB_DATA indata;
1333         struct ctdb_control_pulldb *pull;
1334         struct ctdb_client_control_state *state;
1335
1336         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1337         CTDB_NO_MEMORY_NULL(ctdb, pull);
1338
1339         pull->db_id   = dbid;
1340         pull->lmaster = lmaster;
1341
1342         indata.dsize = sizeof(struct ctdb_control_pulldb);
1343         indata.dptr  = (unsigned char *)pull;
1344
1345         state = ctdb_control_send(ctdb, destnode, 0, 
1346                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1347                                   mem_ctx, &timeout, NULL);
1348         talloc_free(pull);
1349
1350         return state;
1351 }
1352
1353 /*
1354   async recv for pull database
1355  */
1356 int ctdb_ctrl_pulldb_recv(
1357         struct ctdb_context *ctdb, 
1358         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1359         TDB_DATA *outdata)
1360 {
1361         int ret;
1362         int32_t res;
1363
1364         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1365         if ( (ret != 0) || (res != 0) ){
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1367                 return -1;
1368         }
1369
1370         return 0;
1371 }
1372
1373 /*
1374   pull all keys and records for a specific database on a node
1375  */
1376 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1377                 uint32_t dbid, uint32_t lmaster, 
1378                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1379                 TDB_DATA *outdata)
1380 {
1381         struct ctdb_client_control_state *state;
1382
1383         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1384                                       timeout);
1385         
1386         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1387 }
1388
1389
1390 /*
1391   change dmaster for all keys in the database to the new value
1392  */
1393 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1394                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1395 {
1396         int ret;
1397         TDB_DATA indata;
1398         int32_t res;
1399
1400         indata.dsize = 2*sizeof(uint32_t);
1401         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1402
1403         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1404         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1405
1406         ret = ctdb_control(ctdb, destnode, 0, 
1407                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1408                            NULL, NULL, &res, &timeout, NULL);
1409         if (ret != 0 || res != 0) {
1410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1411                 return -1;
1412         }
1413
1414         return 0;
1415 }
1416
1417 /*
1418   ping a node, return number of clients connected
1419  */
1420 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1421 {
1422         int ret;
1423         int32_t res;
1424
1425         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1426                            tdb_null, NULL, NULL, &res, NULL, NULL);
1427         if (ret != 0) {
1428                 return -1;
1429         }
1430         return res;
1431 }
1432
1433 /*
1434   find the real path to a ltdb 
1435  */
1436 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1437                    const char **path)
1438 {
1439         int ret;
1440         int32_t res;
1441         TDB_DATA data;
1442
1443         data.dptr = (uint8_t *)&dbid;
1444         data.dsize = sizeof(dbid);
1445
1446         ret = ctdb_control(ctdb, destnode, 0, 
1447                            CTDB_CONTROL_GETDBPATH, 0, data, 
1448                            mem_ctx, &data, &res, &timeout, NULL);
1449         if (ret != 0 || res != 0) {
1450                 return -1;
1451         }
1452
1453         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1454         if ((*path) == NULL) {
1455                 return -1;
1456         }
1457
1458         talloc_free(data.dptr);
1459
1460         return 0;
1461 }
1462
1463 /*
1464   find the name of a db 
1465  */
1466 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1467                    const char **name)
1468 {
1469         int ret;
1470         int32_t res;
1471         TDB_DATA data;
1472
1473         data.dptr = (uint8_t *)&dbid;
1474         data.dsize = sizeof(dbid);
1475
1476         ret = ctdb_control(ctdb, destnode, 0, 
1477                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1478                            mem_ctx, &data, &res, &timeout, NULL);
1479         if (ret != 0 || res != 0) {
1480                 return -1;
1481         }
1482
1483         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1484         if ((*name) == NULL) {
1485                 return -1;
1486         }
1487
1488         talloc_free(data.dptr);
1489
1490         return 0;
1491 }
1492
1493 /*
1494   get the health status of a db
1495  */
1496 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1497                           struct timeval timeout,
1498                           uint32_t destnode,
1499                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1500                           const char **reason)
1501 {
1502         int ret;
1503         int32_t res;
1504         TDB_DATA data;
1505
1506         data.dptr = (uint8_t *)&dbid;
1507         data.dsize = sizeof(dbid);
1508
1509         ret = ctdb_control(ctdb, destnode, 0,
1510                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1511                            mem_ctx, &data, &res, &timeout, NULL);
1512         if (ret != 0 || res != 0) {
1513                 return -1;
1514         }
1515
1516         if (data.dsize == 0) {
1517                 (*reason) = NULL;
1518                 return 0;
1519         }
1520
1521         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1522         if ((*reason) == NULL) {
1523                 return -1;
1524         }
1525
1526         talloc_free(data.dptr);
1527
1528         return 0;
1529 }
1530
1531 /*
1532   create a database
1533  */
1534 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1535                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1536 {
1537         int ret;
1538         int32_t res;
1539         TDB_DATA data;
1540
1541         data.dptr = discard_const(name);
1542         data.dsize = strlen(name)+1;
1543
1544         ret = ctdb_control(ctdb, destnode, 0, 
1545                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1546                            0, data, 
1547                            mem_ctx, &data, &res, &timeout, NULL);
1548
1549         if (ret != 0 || res != 0) {
1550                 return -1;
1551         }
1552
1553         return 0;
1554 }
1555
1556 /*
1557   get debug level on a node
1558  */
1559 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1560 {
1561         int ret;
1562         int32_t res;
1563         TDB_DATA data;
1564
1565         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1566                            ctdb, &data, &res, NULL, NULL);
1567         if (ret != 0 || res != 0) {
1568                 return -1;
1569         }
1570         if (data.dsize != sizeof(int32_t)) {
1571                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1572                          (unsigned)data.dsize));
1573                 return -1;
1574         }
1575         *level = *(int32_t *)data.dptr;
1576         talloc_free(data.dptr);
1577         return 0;
1578 }
1579
1580 /*
1581   set debug level on a node
1582  */
1583 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1584 {
1585         int ret;
1586         int32_t res;
1587         TDB_DATA data;
1588
1589         data.dptr = (uint8_t *)&level;
1590         data.dsize = sizeof(level);
1591
1592         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1593                            NULL, NULL, &res, NULL, NULL);
1594         if (ret != 0 || res != 0) {
1595                 return -1;
1596         }
1597         return 0;
1598 }
1599
1600
1601 /*
1602   get a list of connected nodes
1603  */
1604 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1605                                 struct timeval timeout,
1606                                 TALLOC_CTX *mem_ctx,
1607                                 uint32_t *num_nodes)
1608 {
1609         struct ctdb_node_map *map=NULL;
1610         int ret, i;
1611         uint32_t *nodes;
1612
1613         *num_nodes = 0;
1614
1615         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1616         if (ret != 0) {
1617                 return NULL;
1618         }
1619
1620         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1621         if (nodes == NULL) {
1622                 return NULL;
1623         }
1624
1625         for (i=0;i<map->num;i++) {
1626                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1627                         nodes[*num_nodes] = map->nodes[i].pnn;
1628                         (*num_nodes)++;
1629                 }
1630         }
1631
1632         return nodes;
1633 }
1634
1635
1636 /*
1637   reset remote status
1638  */
1639 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1640 {
1641         int ret;
1642         int32_t res;
1643
1644         ret = ctdb_control(ctdb, destnode, 0, 
1645                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1646                            NULL, NULL, &res, NULL, NULL);
1647         if (ret != 0 || res != 0) {
1648                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1649                 return -1;
1650         }
1651         return 0;
1652 }
1653
1654 /*
1655   this is the dummy null procedure that all databases support
1656 */
1657 static int ctdb_null_func(struct ctdb_call_info *call)
1658 {
1659         return 0;
1660 }
1661
1662 /*
1663   this is a plain fetch procedure that all databases support
1664 */
1665 static int ctdb_fetch_func(struct ctdb_call_info *call)
1666 {
1667         call->reply_data = &call->record_data;
1668         return 0;
1669 }
1670
1671 /*
1672   attach to a specific database - client call
1673 */
1674 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1675 {
1676         struct ctdb_db_context *ctdb_db;
1677         TDB_DATA data;
1678         int ret;
1679         int32_t res;
1680
1681         ctdb_db = ctdb_db_handle(ctdb, name);
1682         if (ctdb_db) {
1683                 return ctdb_db;
1684         }
1685
1686         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1687         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1688
1689         ctdb_db->ctdb = ctdb;
1690         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1691         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1692
1693         data.dptr = discard_const(name);
1694         data.dsize = strlen(name)+1;
1695
1696         /* tell ctdb daemon to attach */
1697         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1698                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1699                            0, data, ctdb_db, &data, &res, NULL, NULL);
1700         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1701                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1702                 talloc_free(ctdb_db);
1703                 return NULL;
1704         }
1705         
1706         ctdb_db->db_id = *(uint32_t *)data.dptr;
1707         talloc_free(data.dptr);
1708
1709         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1710         if (ret != 0) {
1711                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1712                 talloc_free(ctdb_db);
1713                 return NULL;
1714         }
1715
1716         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1717         if (ctdb->valgrinding) {
1718                 tdb_flags |= TDB_NOMMAP;
1719         }
1720         tdb_flags |= TDB_DISALLOW_NESTING;
1721
1722         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1723         if (ctdb_db->ltdb == NULL) {
1724                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1725                 talloc_free(ctdb_db);
1726                 return NULL;
1727         }
1728
1729         ctdb_db->persistent = persistent;
1730
1731         DLIST_ADD(ctdb->db_list, ctdb_db);
1732
1733         /* add well known functions */
1734         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1735         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1736
1737         return ctdb_db;
1738 }
1739
1740
1741 /*
1742   setup a call for a database
1743  */
1744 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1745 {
1746         struct ctdb_registered_call *call;
1747
1748 #if 0
1749         TDB_DATA data;
1750         int32_t status;
1751         struct ctdb_control_set_call c;
1752         int ret;
1753
1754         /* this is no longer valid with the separate daemon architecture */
1755         c.db_id = ctdb_db->db_id;
1756         c.fn    = fn;
1757         c.id    = id;
1758
1759         data.dptr = (uint8_t *)&c;
1760         data.dsize = sizeof(c);
1761
1762         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1763                            data, NULL, NULL, &status, NULL, NULL);
1764         if (ret != 0 || status != 0) {
1765                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1766                 return -1;
1767         }
1768 #endif
1769
1770         /* also register locally */
1771         call = talloc(ctdb_db, struct ctdb_registered_call);
1772         call->fn = fn;
1773         call->id = id;
1774
1775         DLIST_ADD(ctdb_db->calls, call);        
1776         return 0;
1777 }
1778
1779
1780 struct traverse_state {
1781         bool done;
1782         uint32_t count;
1783         ctdb_traverse_func fn;
1784         void *private_data;
1785 };
1786
1787 /*
1788   called on each key during a ctdb_traverse
1789  */
1790 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1791 {
1792         struct traverse_state *state = (struct traverse_state *)p;
1793         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1794         TDB_DATA key;
1795
1796         if (data.dsize < sizeof(uint32_t) ||
1797             d->length != data.dsize) {
1798                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1799                 state->done = True;
1800                 return;
1801         }
1802
1803         key.dsize = d->keylen;
1804         key.dptr  = &d->data[0];
1805         data.dsize = d->datalen;
1806         data.dptr = &d->data[d->keylen];
1807
1808         if (key.dsize == 0 && data.dsize == 0) {
1809                 /* end of traverse */
1810                 state->done = True;
1811                 return;
1812         }
1813
1814         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1815                 /* empty records are deleted records in ctdb */
1816                 return;
1817         }
1818
1819         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1820                 state->done = True;
1821         }
1822
1823         state->count++;
1824 }
1825
1826
1827 /*
1828   start a cluster wide traverse, calling the supplied fn on each record
1829   return the number of records traversed, or -1 on error
1830  */
1831 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1832 {
1833         TDB_DATA data;
1834         struct ctdb_traverse_start t;
1835         int32_t status;
1836         int ret;
1837         uint64_t srvid = (getpid() | 0xFLL<<60);
1838         struct traverse_state state;
1839
1840         state.done = False;
1841         state.count = 0;
1842         state.private_data = private_data;
1843         state.fn = fn;
1844
1845         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1846         if (ret != 0) {
1847                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1848                 return -1;
1849         }
1850
1851         t.db_id = ctdb_db->db_id;
1852         t.srvid = srvid;
1853         t.reqid = 0;
1854
1855         data.dptr = (uint8_t *)&t;
1856         data.dsize = sizeof(t);
1857
1858         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1859                            data, NULL, NULL, &status, NULL, NULL);
1860         if (ret != 0 || status != 0) {
1861                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1862                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1863                 return -1;
1864         }
1865
1866         while (!state.done) {
1867                 event_loop_once(ctdb_db->ctdb->ev);
1868         }
1869
1870         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1873                 return -1;
1874         }
1875
1876         return state.count;
1877 }
1878
1879 #define ISASCII(x) ((x>31)&&(x<128))
1880 /*
1881   called on each key during a catdb
1882  */
1883 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1884 {
1885         int i;
1886         FILE *f = (FILE *)p;
1887         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1888
1889         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1890         for (i=0;i<key.dsize;i++) {
1891                 if (ISASCII(key.dptr[i])) {
1892                         fprintf(f, "%c", key.dptr[i]);
1893                 } else {
1894                         fprintf(f, "\\%02X", key.dptr[i]);
1895                 }
1896         }
1897         fprintf(f, "\"\n");
1898
1899         fprintf(f, "dmaster: %u\n", h->dmaster);
1900         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1901
1902         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1903         for (i=sizeof(*h);i<data.dsize;i++) {
1904                 if (ISASCII(data.dptr[i])) {
1905                         fprintf(f, "%c", data.dptr[i]);
1906                 } else {
1907                         fprintf(f, "\\%02X", data.dptr[i]);
1908                 }
1909         }
1910         fprintf(f, "\"\n");
1911
1912         fprintf(f, "\n");
1913
1914         return 0;
1915 }
1916
1917 /*
1918   convenience function to list all keys to stdout
1919  */
1920 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1921 {
1922         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1923 }
1924
1925 /*
1926   get the pid of a ctdb daemon
1927  */
1928 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1929 {
1930         int ret;
1931         int32_t res;
1932
1933         ret = ctdb_control(ctdb, destnode, 0, 
1934                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1935                            NULL, NULL, &res, &timeout, NULL);
1936         if (ret != 0) {
1937                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1938                 return -1;
1939         }
1940
1941         *pid = res;
1942
1943         return 0;
1944 }
1945
1946
1947 /*
1948   async freeze send control
1949  */
1950 struct ctdb_client_control_state *
1951 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1952 {
1953         return ctdb_control_send(ctdb, destnode, priority, 
1954                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1955                            mem_ctx, &timeout, NULL);
1956 }
1957
1958 /* 
1959    async freeze recv control
1960 */
1961 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1962 {
1963         int ret;
1964         int32_t res;
1965
1966         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1967         if ( (ret != 0) || (res != 0) ){
1968                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1969                 return -1;
1970         }
1971
1972         return 0;
1973 }
1974
1975 /*
1976   freeze databases of a certain priority
1977  */
1978 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1979 {
1980         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1981         struct ctdb_client_control_state *state;
1982         int ret;
1983
1984         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1985         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1986         talloc_free(tmp_ctx);
1987
1988         return ret;
1989 }
1990
1991 /* Freeze all databases */
1992 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1993 {
1994         int i;
1995
1996         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1997                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1998                         return -1;
1999                 }
2000         }
2001         return 0;
2002 }
2003
2004 /*
2005   thaw databases of a certain priority
2006  */
2007 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2008 {
2009         int ret;
2010         int32_t res;
2011
2012         ret = ctdb_control(ctdb, destnode, priority, 
2013                            CTDB_CONTROL_THAW, 0, tdb_null, 
2014                            NULL, NULL, &res, &timeout, NULL);
2015         if (ret != 0 || res != 0) {
2016                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2017                 return -1;
2018         }
2019
2020         return 0;
2021 }
2022
2023 /* thaw all databases */
2024 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2025 {
2026         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2027 }
2028
2029 /*
2030   get pnn of a node, or -1
2031  */
2032 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2033 {
2034         int ret;
2035         int32_t res;
2036
2037         ret = ctdb_control(ctdb, destnode, 0, 
2038                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2039                            NULL, NULL, &res, &timeout, NULL);
2040         if (ret != 0) {
2041                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2042                 return -1;
2043         }
2044
2045         return res;
2046 }
2047
2048 /*
2049   get the monitoring mode of a remote node
2050  */
2051 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2052 {
2053         int ret;
2054         int32_t res;
2055
2056         ret = ctdb_control(ctdb, destnode, 0, 
2057                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2058                            NULL, NULL, &res, &timeout, NULL);
2059         if (ret != 0) {
2060                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2061                 return -1;
2062         }
2063
2064         *monmode = res;
2065
2066         return 0;
2067 }
2068
2069
2070 /*
2071  set the monitoring mode of a remote node to active
2072  */
2073 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2074 {
2075         int ret;
2076         
2077
2078         ret = ctdb_control(ctdb, destnode, 0, 
2079                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2080                            NULL, NULL,NULL, &timeout, NULL);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2083                 return -1;
2084         }
2085
2086         
2087
2088         return 0;
2089 }
2090
2091 /*
2092   set the monitoring mode of a remote node to disable
2093  */
2094 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2095 {
2096         int ret;
2097         
2098
2099         ret = ctdb_control(ctdb, destnode, 0, 
2100                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2101                            NULL, NULL, NULL, &timeout, NULL);
2102         if (ret != 0) {
2103                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2104                 return -1;
2105         }
2106
2107         
2108
2109         return 0;
2110 }
2111
2112
2113
2114 /* 
2115   sent to a node to make it take over an ip address
2116 */
2117 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2118                           uint32_t destnode, struct ctdb_public_ip *ip)
2119 {
2120         TDB_DATA data;
2121         struct ctdb_public_ipv4 ipv4;
2122         int ret;
2123         int32_t res;
2124
2125         if (ip->addr.sa.sa_family == AF_INET) {
2126                 ipv4.pnn = ip->pnn;
2127                 ipv4.sin = ip->addr.ip;
2128
2129                 data.dsize = sizeof(ipv4);
2130                 data.dptr  = (uint8_t *)&ipv4;
2131
2132                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2133                            NULL, &res, &timeout, NULL);
2134         } else {
2135                 data.dsize = sizeof(*ip);
2136                 data.dptr  = (uint8_t *)ip;
2137
2138                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2139                            NULL, &res, &timeout, NULL);
2140         }
2141
2142         if (ret != 0 || res != 0) {
2143                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2144                 return -1;
2145         }
2146
2147         return 0;       
2148 }
2149
2150
2151 /* 
2152   sent to a node to make it release an ip address
2153 */
2154 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2155                          uint32_t destnode, struct ctdb_public_ip *ip)
2156 {
2157         TDB_DATA data;
2158         struct ctdb_public_ipv4 ipv4;
2159         int ret;
2160         int32_t res;
2161
2162         if (ip->addr.sa.sa_family == AF_INET) {
2163                 ipv4.pnn = ip->pnn;
2164                 ipv4.sin = ip->addr.ip;
2165
2166                 data.dsize = sizeof(ipv4);
2167                 data.dptr  = (uint8_t *)&ipv4;
2168
2169                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2170                                    NULL, &res, &timeout, NULL);
2171         } else {
2172                 data.dsize = sizeof(*ip);
2173                 data.dptr  = (uint8_t *)ip;
2174
2175                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2176                                    NULL, &res, &timeout, NULL);
2177         }
2178
2179         if (ret != 0 || res != 0) {
2180                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2181                 return -1;
2182         }
2183
2184         return 0;       
2185 }
2186
2187
2188 /*
2189   get a tunable
2190  */
2191 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2192                           struct timeval timeout, 
2193                           uint32_t destnode,
2194                           const char *name, uint32_t *value)
2195 {
2196         struct ctdb_control_get_tunable *t;
2197         TDB_DATA data, outdata;
2198         int32_t res;
2199         int ret;
2200
2201         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2202         data.dptr  = talloc_size(ctdb, data.dsize);
2203         CTDB_NO_MEMORY(ctdb, data.dptr);
2204
2205         t = (struct ctdb_control_get_tunable *)data.dptr;
2206         t->length = strlen(name)+1;
2207         memcpy(t->name, name, t->length);
2208
2209         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2210                            &outdata, &res, &timeout, NULL);
2211         talloc_free(data.dptr);
2212         if (ret != 0 || res != 0) {
2213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2214                 return -1;
2215         }
2216
2217         if (outdata.dsize != sizeof(uint32_t)) {
2218                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2219                 talloc_free(outdata.dptr);
2220                 return -1;
2221         }
2222         
2223         *value = *(uint32_t *)outdata.dptr;
2224         talloc_free(outdata.dptr);
2225
2226         return 0;
2227 }
2228
2229 /*
2230   set a tunable
2231  */
2232 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2233                           struct timeval timeout, 
2234                           uint32_t destnode,
2235                           const char *name, uint32_t value)
2236 {
2237         struct ctdb_control_set_tunable *t;
2238         TDB_DATA data;
2239         int32_t res;
2240         int ret;
2241
2242         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2243         data.dptr  = talloc_size(ctdb, data.dsize);
2244         CTDB_NO_MEMORY(ctdb, data.dptr);
2245
2246         t = (struct ctdb_control_set_tunable *)data.dptr;
2247         t->length = strlen(name)+1;
2248         memcpy(t->name, name, t->length);
2249         t->value = value;
2250
2251         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2252                            NULL, &res, &timeout, NULL);
2253         talloc_free(data.dptr);
2254         if (ret != 0 || res != 0) {
2255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2256                 return -1;
2257         }
2258
2259         return 0;
2260 }
2261
2262 /*
2263   list tunables
2264  */
2265 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2266                             struct timeval timeout, 
2267                             uint32_t destnode,
2268                             TALLOC_CTX *mem_ctx,
2269                             const char ***list, uint32_t *count)
2270 {
2271         TDB_DATA outdata;
2272         int32_t res;
2273         int ret;
2274         struct ctdb_control_list_tunable *t;
2275         char *p, *s, *ptr;
2276
2277         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2278                            mem_ctx, &outdata, &res, &timeout, NULL);
2279         if (ret != 0 || res != 0) {
2280                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2281                 return -1;
2282         }
2283
2284         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2285         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2286             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2287                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2288                 talloc_free(outdata.dptr);
2289                 return -1;              
2290         }
2291         
2292         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2293         CTDB_NO_MEMORY(ctdb, p);
2294
2295         talloc_free(outdata.dptr);
2296         
2297         (*list) = NULL;
2298         (*count) = 0;
2299
2300         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2301                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2302                 CTDB_NO_MEMORY(ctdb, *list);
2303                 (*list)[*count] = talloc_strdup(*list, s);
2304                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2305                 (*count)++;
2306         }
2307
2308         talloc_free(p);
2309
2310         return 0;
2311 }
2312
2313
2314 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2315                                    struct timeval timeout, uint32_t destnode,
2316                                    TALLOC_CTX *mem_ctx,
2317                                    uint32_t flags,
2318                                    struct ctdb_all_public_ips **ips)
2319 {
2320         int ret;
2321         TDB_DATA outdata;
2322         int32_t res;
2323
2324         ret = ctdb_control(ctdb, destnode, 0, 
2325                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2326                            mem_ctx, &outdata, &res, &timeout, NULL);
2327         if (ret == 0 && res == -1) {
2328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2329                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2330         }
2331         if (ret != 0 || res != 0) {
2332           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2333                 return -1;
2334         }
2335
2336         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2337         talloc_free(outdata.dptr);
2338                     
2339         return 0;
2340 }
2341
2342 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2343                              struct timeval timeout, uint32_t destnode,
2344                              TALLOC_CTX *mem_ctx,
2345                              struct ctdb_all_public_ips **ips)
2346 {
2347         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2348                                               destnode, mem_ctx,
2349                                               0, ips);
2350 }
2351
2352 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2353                         struct timeval timeout, uint32_t destnode, 
2354                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2355 {
2356         int ret, i, len;
2357         TDB_DATA outdata;
2358         int32_t res;
2359         struct ctdb_all_public_ipsv4 *ipsv4;
2360
2361         ret = ctdb_control(ctdb, destnode, 0, 
2362                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2363                            mem_ctx, &outdata, &res, &timeout, NULL);
2364         if (ret != 0 || res != 0) {
2365                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2366                 return -1;
2367         }
2368
2369         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2370         len = offsetof(struct ctdb_all_public_ips, ips) +
2371                 ipsv4->num*sizeof(struct ctdb_public_ip);
2372         *ips = talloc_zero_size(mem_ctx, len);
2373         CTDB_NO_MEMORY(ctdb, *ips);
2374         (*ips)->num = ipsv4->num;
2375         for (i=0; i<ipsv4->num; i++) {
2376                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2377                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2378         }
2379
2380         talloc_free(outdata.dptr);
2381                     
2382         return 0;
2383 }
2384
2385 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2386                                  struct timeval timeout, uint32_t destnode,
2387                                  TALLOC_CTX *mem_ctx,
2388                                  const ctdb_sock_addr *addr,
2389                                  struct ctdb_control_public_ip_info **_info)
2390 {
2391         int ret;
2392         TDB_DATA indata;
2393         TDB_DATA outdata;
2394         int32_t res;
2395         struct ctdb_control_public_ip_info *info;
2396         uint32_t len;
2397         uint32_t i;
2398
2399         indata.dptr = discard_const_p(uint8_t, addr);
2400         indata.dsize = sizeof(*addr);
2401
2402         ret = ctdb_control(ctdb, destnode, 0,
2403                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2404                            mem_ctx, &outdata, &res, &timeout, NULL);
2405         if (ret != 0 || res != 0) {
2406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2407                                 "failed ret:%d res:%d\n",
2408                                 ret, res));
2409                 return -1;
2410         }
2411
2412         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2413         if (len > outdata.dsize) {
2414                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2415                                 "returned invalid data with size %u > %u\n",
2416                                 (unsigned int)outdata.dsize,
2417                                 (unsigned int)len));
2418                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2419                 return -1;
2420         }
2421
2422         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2423         len += info->num*sizeof(struct ctdb_control_iface_info);
2424
2425         if (len > outdata.dsize) {
2426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2427                                 "returned invalid data with size %u > %u\n",
2428                                 (unsigned int)outdata.dsize,
2429                                 (unsigned int)len));
2430                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2431                 return -1;
2432         }
2433
2434         /* make sure we null terminate the returned strings */
2435         for (i=0; i < info->num; i++) {
2436                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2437         }
2438
2439         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2440                                                                 outdata.dptr,
2441                                                                 outdata.dsize);
2442         talloc_free(outdata.dptr);
2443         if (*_info == NULL) {
2444                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2445                                 "talloc_memdup size %u failed\n",
2446                                 (unsigned int)outdata.dsize));
2447                 return -1;
2448         }
2449
2450         return 0;
2451 }
2452
2453 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2454                          struct timeval timeout, uint32_t destnode,
2455                          TALLOC_CTX *mem_ctx,
2456                          struct ctdb_control_get_ifaces **_ifaces)
2457 {
2458         int ret;
2459         TDB_DATA outdata;
2460         int32_t res;
2461         struct ctdb_control_get_ifaces *ifaces;
2462         uint32_t len;
2463         uint32_t i;
2464
2465         ret = ctdb_control(ctdb, destnode, 0,
2466                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2467                            mem_ctx, &outdata, &res, &timeout, NULL);
2468         if (ret != 0 || res != 0) {
2469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2470                                 "failed ret:%d res:%d\n",
2471                                 ret, res));
2472                 return -1;
2473         }
2474
2475         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2476         if (len > outdata.dsize) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2478                                 "returned invalid data with size %u > %u\n",
2479                                 (unsigned int)outdata.dsize,
2480                                 (unsigned int)len));
2481                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2482                 return -1;
2483         }
2484
2485         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2486         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2487
2488         if (len > outdata.dsize) {
2489                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2490                                 "returned invalid data with size %u > %u\n",
2491                                 (unsigned int)outdata.dsize,
2492                                 (unsigned int)len));
2493                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2494                 return -1;
2495         }
2496
2497         /* make sure we null terminate the returned strings */
2498         for (i=0; i < ifaces->num; i++) {
2499                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2500         }
2501
2502         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2503                                                                   outdata.dptr,
2504                                                                   outdata.dsize);
2505         talloc_free(outdata.dptr);
2506         if (*_ifaces == NULL) {
2507                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2508                                 "talloc_memdup size %u failed\n",
2509                                 (unsigned int)outdata.dsize));
2510                 return -1;
2511         }
2512
2513         return 0;
2514 }
2515
2516 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2517                              struct timeval timeout, uint32_t destnode,
2518                              TALLOC_CTX *mem_ctx,
2519                              const struct ctdb_control_iface_info *info)
2520 {
2521         int ret;
2522         TDB_DATA indata;
2523         int32_t res;
2524
2525         indata.dptr = discard_const_p(uint8_t, info);
2526         indata.dsize = sizeof(*info);
2527
2528         ret = ctdb_control(ctdb, destnode, 0,
2529                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2530                            mem_ctx, NULL, &res, &timeout, NULL);
2531         if (ret != 0 || res != 0) {
2532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2533                                 "failed ret:%d res:%d\n",
2534                                 ret, res));
2535                 return -1;
2536         }
2537
2538         return 0;
2539 }
2540
2541 /*
2542   set/clear the permanent disabled bit on a remote node
2543  */
2544 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2545                        uint32_t set, uint32_t clear)
2546 {
2547         int ret;
2548         TDB_DATA data;
2549         struct ctdb_node_map *nodemap=NULL;
2550         struct ctdb_node_flag_change c;
2551         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2552         uint32_t recmaster;
2553         uint32_t *nodes;
2554
2555
2556         /* find the recovery master */
2557         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2558         if (ret != 0) {
2559                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2560                 talloc_free(tmp_ctx);
2561                 return ret;
2562         }
2563
2564
2565         /* read the node flags from the recmaster */
2566         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2567         if (ret != 0) {
2568                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2569                 talloc_free(tmp_ctx);
2570                 return -1;
2571         }
2572         if (destnode >= nodemap->num) {
2573                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2574                 talloc_free(tmp_ctx);
2575                 return -1;
2576         }
2577
2578         c.pnn       = destnode;
2579         c.old_flags = nodemap->nodes[destnode].flags;
2580         c.new_flags = c.old_flags;
2581         c.new_flags |= set;
2582         c.new_flags &= ~clear;
2583
2584         data.dsize = sizeof(c);
2585         data.dptr = (unsigned char *)&c;
2586
2587         /* send the flags update to all connected nodes */
2588         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2589
2590         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2591                                         nodes, 0,
2592                                         timeout, false, data,
2593                                         NULL, NULL,
2594                                         NULL) != 0) {
2595                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2596
2597                 talloc_free(tmp_ctx);
2598                 return -1;
2599         }
2600
2601         talloc_free(tmp_ctx);
2602         return 0;
2603 }
2604
2605
2606 /*
2607   get all tunables
2608  */
2609 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2610                                struct timeval timeout, 
2611                                uint32_t destnode,
2612                                struct ctdb_tunable *tunables)
2613 {
2614         TDB_DATA outdata;
2615         int ret;
2616         int32_t res;
2617
2618         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2619                            &outdata, &res, &timeout, NULL);
2620         if (ret != 0 || res != 0) {
2621                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2622                 return -1;
2623         }
2624
2625         if (outdata.dsize != sizeof(*tunables)) {
2626                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2627                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2628                 return -1;              
2629         }
2630
2631         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2632         talloc_free(outdata.dptr);
2633         return 0;
2634 }
2635
2636 /*
2637   add a public address to a node
2638  */
2639 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2640                       struct timeval timeout, 
2641                       uint32_t destnode,
2642                       struct ctdb_control_ip_iface *pub)
2643 {
2644         TDB_DATA data;
2645         int32_t res;
2646         int ret;
2647
2648         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2649         data.dptr  = (unsigned char *)pub;
2650
2651         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2652                            NULL, &res, &timeout, NULL);
2653         if (ret != 0 || res != 0) {
2654                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2655                 return -1;
2656         }
2657
2658         return 0;
2659 }
2660
2661 /*
2662   delete a public address from a node
2663  */
2664 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2665                       struct timeval timeout, 
2666                       uint32_t destnode,
2667                       struct ctdb_control_ip_iface *pub)
2668 {
2669         TDB_DATA data;
2670         int32_t res;
2671         int ret;
2672
2673         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2674         data.dptr  = (unsigned char *)pub;
2675
2676         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2677                            NULL, &res, &timeout, NULL);
2678         if (ret != 0 || res != 0) {
2679                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2680                 return -1;
2681         }
2682
2683         return 0;
2684 }
2685
2686 /*
2687   kill a tcp connection
2688  */
2689 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2690                       struct timeval timeout, 
2691                       uint32_t destnode,
2692                       struct ctdb_control_killtcp *killtcp)
2693 {
2694         TDB_DATA data;
2695         int32_t res;
2696         int ret;
2697
2698         data.dsize = sizeof(struct ctdb_control_killtcp);
2699         data.dptr  = (unsigned char *)killtcp;
2700
2701         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2702                            NULL, &res, &timeout, NULL);
2703         if (ret != 0 || res != 0) {
2704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2705                 return -1;
2706         }
2707
2708         return 0;
2709 }
2710
2711 /*
2712   send a gratious arp
2713  */
2714 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2715                       struct timeval timeout, 
2716                       uint32_t destnode,
2717                       ctdb_sock_addr *addr,
2718                       const char *ifname)
2719 {
2720         TDB_DATA data;
2721         int32_t res;
2722         int ret, len;
2723         struct ctdb_control_gratious_arp *gratious_arp;
2724         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2725
2726
2727         len = strlen(ifname)+1;
2728         gratious_arp = talloc_size(tmp_ctx, 
2729                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2730         CTDB_NO_MEMORY(ctdb, gratious_arp);
2731
2732         gratious_arp->addr = *addr;
2733         gratious_arp->len = len;
2734         memcpy(&gratious_arp->iface[0], ifname, len);
2735
2736
2737         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2738         data.dptr  = (unsigned char *)gratious_arp;
2739
2740         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2741                            NULL, &res, &timeout, NULL);
2742         if (ret != 0 || res != 0) {
2743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2744                 talloc_free(tmp_ctx);
2745                 return -1;
2746         }
2747
2748         talloc_free(tmp_ctx);
2749         return 0;
2750 }
2751
2752 /*
2753   get a list of all tcp tickles that a node knows about for a particular vnn
2754  */
2755 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2756                               struct timeval timeout, uint32_t destnode, 
2757                               TALLOC_CTX *mem_ctx, 
2758                               ctdb_sock_addr *addr,
2759                               struct ctdb_control_tcp_tickle_list **list)
2760 {
2761         int ret;
2762         TDB_DATA data, outdata;
2763         int32_t status;
2764
2765         data.dptr = (uint8_t*)addr;
2766         data.dsize = sizeof(ctdb_sock_addr);
2767
2768         ret = ctdb_control(ctdb, destnode, 0, 
2769                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2770                            mem_ctx, &outdata, &status, NULL, NULL);
2771         if (ret != 0 || status != 0) {
2772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2773                 return -1;
2774         }
2775
2776         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2777
2778         return status;
2779 }
2780
2781 /*
2782   register a server id
2783  */
2784 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2785                       struct timeval timeout, 
2786                       struct ctdb_server_id *id)
2787 {
2788         TDB_DATA data;
2789         int32_t res;
2790         int ret;
2791
2792         data.dsize = sizeof(struct ctdb_server_id);
2793         data.dptr  = (unsigned char *)id;
2794
2795         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2796                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2797                         0, data, NULL,
2798                         NULL, &res, &timeout, NULL);
2799         if (ret != 0 || res != 0) {
2800                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2801                 return -1;
2802         }
2803
2804         return 0;
2805 }
2806
2807 /*
2808   unregister a server id
2809  */
2810 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2811                       struct timeval timeout, 
2812                       struct ctdb_server_id *id)
2813 {
2814         TDB_DATA data;
2815         int32_t res;
2816         int ret;
2817
2818         data.dsize = sizeof(struct ctdb_server_id);
2819         data.dptr  = (unsigned char *)id;
2820
2821         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2822                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2823                         0, data, NULL,
2824                         NULL, &res, &timeout, NULL);
2825         if (ret != 0 || res != 0) {
2826                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2827                 return -1;
2828         }
2829
2830         return 0;
2831 }
2832
2833
2834 /*
2835   check if a server id exists
2836
2837   if a server id does exist, return *status == 1, otherwise *status == 0
2838  */
2839 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2840                       struct timeval timeout, 
2841                       uint32_t destnode,
2842                       struct ctdb_server_id *id,
2843                       uint32_t *status)
2844 {
2845         TDB_DATA data;
2846         int32_t res;
2847         int ret;
2848
2849         data.dsize = sizeof(struct ctdb_server_id);
2850         data.dptr  = (unsigned char *)id;
2851
2852         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2853                         0, data, NULL,
2854                         NULL, &res, &timeout, NULL);
2855         if (ret != 0) {
2856                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2857                 return -1;
2858         }
2859
2860         if (res) {
2861                 *status = 1;
2862         } else {
2863                 *status = 0;
2864         }
2865
2866         return 0;
2867 }
2868
2869 /*
2870    get the list of server ids that are registered on a node
2871 */
2872 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2873                 TALLOC_CTX *mem_ctx,
2874                 struct timeval timeout, uint32_t destnode, 
2875                 struct ctdb_server_id_list **svid_list)
2876 {
2877         int ret;
2878         TDB_DATA outdata;
2879         int32_t res;
2880
2881         ret = ctdb_control(ctdb, destnode, 0, 
2882                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2883                            mem_ctx, &outdata, &res, &timeout, NULL);
2884         if (ret != 0 || res != 0) {
2885                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2886                 return -1;
2887         }
2888
2889         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2890                     
2891         return 0;
2892 }
2893
2894 /*
2895   initialise the ctdb daemon for client applications
2896
2897   NOTE: In current code the daemon does not fork. This is for testing purposes only
2898   and to simplify the code.
2899 */
2900 struct ctdb_context *ctdb_init(struct event_context *ev)
2901 {
2902         int ret;
2903         struct ctdb_context *ctdb;
2904
2905         ctdb = talloc_zero(ev, struct ctdb_context);
2906         if (ctdb == NULL) {
2907                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2908                 return NULL;
2909         }
2910         ctdb->ev  = ev;
2911         ctdb->idr = idr_init(ctdb);
2912         /* Wrap early to exercise code. */
2913         ctdb->lastid = INT_MAX-200;
2914         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2915
2916         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2917         if (ret != 0) {
2918                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2919                 talloc_free(ctdb);
2920                 return NULL;
2921         }
2922
2923         ctdb->statistics.statistics_start_time = timeval_current();
2924
2925         return ctdb;
2926 }
2927
2928
2929 /*
2930   set some ctdb flags
2931 */
2932 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2933 {
2934         ctdb->flags |= flags;
2935 }
2936
2937 /*
2938   setup the local socket name
2939 */
2940 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2941 {
2942         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2943         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2944
2945         return 0;
2946 }
2947
2948 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2949 {
2950         return ctdb->daemon.name;
2951 }
2952
2953 /*
2954   return the pnn of this node
2955 */
2956 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2957 {
2958         return ctdb->pnn;
2959 }
2960
2961
2962 /*
2963   get the uptime of a remote node
2964  */
2965 struct ctdb_client_control_state *
2966 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2967 {
2968         return ctdb_control_send(ctdb, destnode, 0, 
2969                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2970                            mem_ctx, &timeout, NULL);
2971 }
2972
2973 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2974 {
2975         int ret;
2976         int32_t res;
2977         TDB_DATA outdata;
2978
2979         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2980         if (ret != 0 || res != 0) {
2981                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2982                 return -1;
2983         }
2984
2985         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2986
2987         return 0;
2988 }
2989
2990 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2991 {
2992         struct ctdb_client_control_state *state;
2993
2994         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2995         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2996 }
2997
2998 /*
2999   send a control to execute the "recovered" event script on a node
3000  */
3001 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3002 {
3003         int ret;
3004         int32_t status;
3005
3006         ret = ctdb_control(ctdb, destnode, 0, 
3007                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3008                            NULL, NULL, &status, &timeout, NULL);
3009         if (ret != 0 || status != 0) {
3010                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3011                 return -1;
3012         }
3013
3014         return 0;
3015 }
3016
3017 /* 
3018   callback for the async helpers used when sending the same control
3019   to multiple nodes in parallell.
3020 */
3021 static void async_callback(struct ctdb_client_control_state *state)
3022 {
3023         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3024         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3025         int ret;
3026         TDB_DATA outdata;
3027         int32_t res;
3028         uint32_t destnode = state->c->hdr.destnode;
3029
3030         /* one more node has responded with recmode data */
3031         data->count--;
3032
3033         /* if we failed to push the db, then return an error and let
3034            the main loop try again.
3035         */
3036         if (state->state != CTDB_CONTROL_DONE) {
3037                 if ( !data->dont_log_errors) {
3038                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3039                 }
3040                 data->fail_count++;
3041                 if (data->fail_callback) {
3042                         data->fail_callback(ctdb, destnode, res, outdata,
3043                                         data->callback_data);
3044                 }
3045                 return;
3046         }
3047         
3048         state->async.fn = NULL;
3049
3050         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3051         if ((ret != 0) || (res != 0)) {
3052                 if ( !data->dont_log_errors) {
3053                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3054                 }
3055                 data->fail_count++;
3056                 if (data->fail_callback) {
3057                         data->fail_callback(ctdb, destnode, res, outdata,
3058                                         data->callback_data);
3059                 }
3060         }
3061         if ((ret == 0) && (data->callback != NULL)) {
3062                 data->callback(ctdb, destnode, res, outdata,
3063                                         data->callback_data);
3064         }
3065 }
3066
3067
3068 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3069 {
3070         /* set up the callback functions */
3071         state->async.fn = async_callback;
3072         state->async.private_data = data;
3073         
3074         /* one more control to wait for to complete */
3075         data->count++;
3076 }
3077
3078
3079 /* wait for up to the maximum number of seconds allowed
3080    or until all nodes we expect a response from has replied
3081 */
3082 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3083 {
3084         while (data->count > 0) {
3085                 event_loop_once(ctdb->ev);
3086         }
3087         if (data->fail_count != 0) {
3088                 if (!data->dont_log_errors) {
3089                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3090                                  data->fail_count));
3091                 }
3092                 return -1;
3093         }
3094         return 0;
3095 }
3096
3097
3098 /* 
3099    perform a simple control on the listed nodes
3100    The control cannot return data
3101  */
3102 int ctdb_client_async_control(struct ctdb_context *ctdb,
3103                                 enum ctdb_controls opcode,
3104                                 uint32_t *nodes,
3105                                 uint64_t srvid,
3106                                 struct timeval timeout,
3107                                 bool dont_log_errors,
3108                                 TDB_DATA data,
3109                                 client_async_callback client_callback,
3110                                 client_async_callback fail_callback,
3111                                 void *callback_data)
3112 {
3113         struct client_async_data *async_data;
3114         struct ctdb_client_control_state *state;
3115         int j, num_nodes;
3116
3117         async_data = talloc_zero(ctdb, struct client_async_data);
3118         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3119         async_data->dont_log_errors = dont_log_errors;
3120         async_data->callback = client_callback;
3121         async_data->fail_callback = fail_callback;
3122         async_data->callback_data = callback_data;
3123         async_data->opcode        = opcode;
3124
3125         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3126
3127         /* loop over all nodes and send an async control to each of them */
3128         for (j=0; j<num_nodes; j++) {
3129                 uint32_t pnn = nodes[j];
3130
3131                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3132                                           0, data, async_data, &timeout, NULL);
3133                 if (state == NULL) {
3134                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3135                         talloc_free(async_data);
3136                         return -1;
3137                 }
3138                 
3139                 ctdb_client_async_add(async_data, state);
3140         }
3141
3142         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3143                 talloc_free(async_data);
3144                 return -1;
3145         }
3146
3147         talloc_free(async_data);
3148         return 0;
3149 }
3150
3151 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3152                                 struct ctdb_vnn_map *vnn_map,
3153                                 TALLOC_CTX *mem_ctx,
3154                                 bool include_self)
3155 {
3156         int i, j, num_nodes;
3157         uint32_t *nodes;
3158
3159         for (i=num_nodes=0;i<vnn_map->size;i++) {
3160                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3161                         continue;
3162                 }
3163                 num_nodes++;
3164         } 
3165
3166         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3167         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3168
3169         for (i=j=0;i<vnn_map->size;i++) {
3170                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3171                         continue;
3172                 }
3173                 nodes[j++] = vnn_map->map[i];
3174         } 
3175
3176         return nodes;
3177 }
3178
3179 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3180                                 struct ctdb_node_map *node_map,
3181                                 TALLOC_CTX *mem_ctx,
3182                                 bool include_self)
3183 {
3184         int i, j, num_nodes;
3185         uint32_t *nodes;
3186
3187         for (i=num_nodes=0;i<node_map->num;i++) {
3188                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3189                         continue;
3190                 }
3191                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3192                         continue;
3193                 }
3194                 num_nodes++;
3195         } 
3196
3197         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3198         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3199
3200         for (i=j=0;i<node_map->num;i++) {
3201                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3202                         continue;
3203                 }
3204                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3205                         continue;
3206                 }
3207                 nodes[j++] = node_map->nodes[i].pnn;
3208         } 
3209
3210         return nodes;
3211 }
3212
3213 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3214                                 struct ctdb_node_map *node_map,
3215                                 TALLOC_CTX *mem_ctx,
3216                                 uint32_t pnn)
3217 {
3218         int i, j, num_nodes;
3219         uint32_t *nodes;
3220
3221         for (i=num_nodes=0;i<node_map->num;i++) {
3222                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3223                         continue;
3224                 }
3225                 if (node_map->nodes[i].pnn == pnn) {
3226                         continue;
3227                 }
3228                 num_nodes++;
3229         } 
3230
3231         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3232         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3233
3234         for (i=j=0;i<node_map->num;i++) {
3235                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3236                         continue;
3237                 }
3238                 if (node_map->nodes[i].pnn == pnn) {
3239                         continue;
3240                 }
3241                 nodes[j++] = node_map->nodes[i].pnn;
3242         } 
3243
3244         return nodes;
3245 }
3246
3247 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3248                                 struct ctdb_node_map *node_map,
3249                                 TALLOC_CTX *mem_ctx,
3250                                 bool include_self)
3251 {
3252         int i, j, num_nodes;
3253         uint32_t *nodes;
3254
3255         for (i=num_nodes=0;i<node_map->num;i++) {
3256                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3257                         continue;
3258                 }
3259                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3260                         continue;
3261                 }
3262                 num_nodes++;
3263         } 
3264
3265         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3266         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3267
3268         for (i=j=0;i<node_map->num;i++) {
3269                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3270                         continue;
3271                 }
3272                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3273                         continue;
3274                 }
3275                 nodes[j++] = node_map->nodes[i].pnn;
3276         } 
3277
3278         return nodes;
3279 }
3280
3281 /* 
3282   this is used to test if a pnn lock exists and if it exists will return
3283   the number of connections that pnn has reported or -1 if that recovery
3284   daemon is not running.
3285 */
3286 int
3287 ctdb_read_pnn_lock(int fd, int32_t pnn)
3288 {
3289         struct flock lock;
3290         char c;
3291
3292         lock.l_type = F_WRLCK;
3293         lock.l_whence = SEEK_SET;
3294         lock.l_start = pnn;
3295         lock.l_len = 1;
3296         lock.l_pid = 0;
3297
3298         if (fcntl(fd, F_GETLK, &lock) != 0) {
3299                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3300                 return -1;
3301         }
3302
3303         if (lock.l_type == F_UNLCK) {
3304                 return -1;
3305         }
3306
3307         if (pread(fd, &c, 1, pnn) == -1) {
3308                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3309                 return -1;
3310         }
3311
3312         return c;
3313 }
3314
3315 /*
3316   get capabilities of a remote node
3317  */
3318 struct ctdb_client_control_state *
3319 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3320 {
3321         return ctdb_control_send(ctdb, destnode, 0, 
3322                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3323                            mem_ctx, &timeout, NULL);
3324 }
3325
3326 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3327 {
3328         int ret;
3329         int32_t res;
3330         TDB_DATA outdata;
3331
3332         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3333         if ( (ret != 0) || (res != 0) ) {
3334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3335                 return -1;
3336         }
3337
3338         if (capabilities) {
3339                 *capabilities = *((uint32_t *)outdata.dptr);
3340         }
3341
3342         return 0;
3343 }
3344
3345 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3346 {
3347         struct ctdb_client_control_state *state;
3348         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3349         int ret;
3350
3351         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3352         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3353         talloc_free(tmp_ctx);
3354         return ret;
3355 }
3356
3357 /**
3358  * check whether a transaction is active on a given db on a given node
3359  */
3360 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3361                                      uint32_t destnode,
3362                                      uint32_t db_id)
3363 {
3364         int32_t status;
3365         int ret;
3366         TDB_DATA indata;
3367
3368         indata.dptr = (uint8_t *)&db_id;
3369         indata.dsize = sizeof(db_id);
3370
3371         ret = ctdb_control(ctdb, destnode, 0,
3372                            CTDB_CONTROL_TRANS2_ACTIVE,
3373                            0, indata, NULL, NULL, &status,
3374                            NULL, NULL);
3375
3376         if (ret != 0) {
3377                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3378                 return -1;
3379         }
3380
3381         return status;
3382 }
3383
3384
3385 struct ctdb_transaction_handle {
3386         struct ctdb_db_context *ctdb_db;
3387         bool in_replay;
3388         /*
3389          * we store the reads and writes done under a transaction:
3390          * - one list stores both reads and writes (m_all),
3391          * - the other just writes (m_write)
3392          */
3393         struct ctdb_marshall_buffer *m_all;
3394         struct ctdb_marshall_buffer *m_write;
3395 };
3396
3397 /* start a transaction on a database */
3398 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3399 {
3400         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3401         return 0;
3402 }
3403
3404 /* start a transaction on a database */
3405 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3406 {
3407         struct ctdb_record_handle *rh;
3408         TDB_DATA key;
3409         TDB_DATA data;
3410         struct ctdb_ltdb_header header;
3411         TALLOC_CTX *tmp_ctx;
3412         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3413         int ret;
3414         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3415         pid_t pid;
3416         int32_t status;
3417
3418         key.dptr = discard_const(keyname);
3419         key.dsize = strlen(keyname);
3420
3421         if (!ctdb_db->persistent) {
3422                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3423                 return -1;
3424         }
3425
3426 again:
3427         tmp_ctx = talloc_new(h);
3428
3429         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3430         if (rh == NULL) {
3431                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3432                 talloc_free(tmp_ctx);
3433                 return -1;
3434         }
3435
3436         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3437                                               CTDB_CURRENT_NODE,
3438                                               ctdb_db->db_id);
3439         if (status == 1) {
3440                 unsigned long int usec = (1000 + random()) % 100000;
3441                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3442                                     "on db_id[0x%08x]. waiting for %lu "
3443                                     "microseconds\n",
3444                                     ctdb_db->db_id, usec));
3445                 talloc_free(tmp_ctx);
3446                 usleep(usec);
3447                 goto again;
3448         }
3449
3450         /*
3451          * store the pid in the database:
3452          * it is not enough that the node is dmaster...
3453          */
3454         pid = getpid();
3455         data.dptr = (unsigned char *)&pid;
3456         data.dsize = sizeof(pid_t);
3457         rh->header.rsn++;
3458         rh->header.dmaster = ctdb_db->ctdb->pnn;
3459         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3460         if (ret != 0) {
3461                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3462                                   "transaction record\n"));
3463                 talloc_free(tmp_ctx);
3464                 return -1;
3465         }
3466
3467         talloc_free(rh);
3468
3469         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3470         if (ret != 0) {
3471                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3472                 talloc_free(tmp_ctx);
3473                 return -1;
3474         }
3475
3476         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3477         if (ret != 0) {
3478                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3479                                  "lock record inside transaction\n"));
3480                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3481                 talloc_free(tmp_ctx);
3482                 goto again;
3483         }
3484
3485         if (header.dmaster != ctdb_db->ctdb->pnn) {
3486                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3487                                    "transaction lock record\n"));
3488                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3489                 talloc_free(tmp_ctx);
3490                 goto again;
3491         }
3492
3493         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3494                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3495                                     "the transaction lock record\n"));
3496                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3497                 talloc_free(tmp_ctx);
3498                 goto again;
3499         }
3500
3501         talloc_free(tmp_ctx);
3502
3503         return 0;
3504 }
3505
3506
3507 /* start a transaction on a database */
3508 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3509                                                        TALLOC_CTX *mem_ctx)
3510 {
3511         struct ctdb_transaction_handle *h;
3512         int ret;
3513
3514         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3515         if (h == NULL) {
3516                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3517                 return NULL;
3518         }
3519
3520         h->ctdb_db = ctdb_db;
3521
3522         ret = ctdb_transaction_fetch_start(h);
3523         if (ret != 0) {
3524                 talloc_free(h);
3525                 return NULL;
3526         }
3527
3528         talloc_set_destructor(h, ctdb_transaction_destructor);
3529
3530         return h;
3531 }
3532
3533
3534
3535 /*
3536   fetch a record inside a transaction
3537  */
3538 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3539                            TALLOC_CTX *mem_ctx, 
3540                            TDB_DATA key, TDB_DATA *data)
3541 {
3542         struct ctdb_ltdb_header header;
3543         int ret;
3544
3545         ZERO_STRUCT(header);
3546
3547         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3548         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3549                 /* record doesn't exist yet */
3550                 *data = tdb_null;
3551                 ret = 0;
3552         }
3553         
3554         if (ret != 0) {
3555                 return ret;
3556         }
3557
3558         if (!h->in_replay) {
3559                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3560                 if (h->m_all == NULL) {
3561                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3562                         return -1;
3563                 }
3564         }
3565
3566         return 0;
3567 }
3568
3569 /*
3570   stores a record inside a transaction
3571  */
3572 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3573                            TDB_DATA key, TDB_DATA data)
3574 {
3575         TALLOC_CTX *tmp_ctx = talloc_new(h);
3576         struct ctdb_ltdb_header header;
3577         TDB_DATA olddata;
3578         int ret;
3579
3580         ZERO_STRUCT(header);
3581
3582         /* we need the header so we can update the RSN */
3583         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3584         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3585                 /* the record doesn't exist - create one with us as dmaster.
3586                    This is only safe because we are in a transaction and this
3587                    is a persistent database */
3588                 ZERO_STRUCT(header);
3589         } else if (ret != 0) {
3590                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3591                 talloc_free(tmp_ctx);
3592                 return ret;
3593         }
3594
3595         if (data.dsize == olddata.dsize &&
3596             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3597                 /* save writing the same data */
3598                 talloc_free(tmp_ctx);
3599                 return 0;
3600         }
3601
3602         header.dmaster = h->ctdb_db->ctdb->pnn;
3603         header.rsn++;
3604
3605         if (!h->in_replay) {
3606                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3607                 if (h->m_all == NULL) {
3608                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3609                         talloc_free(tmp_ctx);
3610                         return -1;
3611                 }
3612         }               
3613
3614         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3615         if (h->m_write == NULL) {
3616                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3617                 talloc_free(tmp_ctx);
3618                 return -1;
3619         }
3620         
3621         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3622
3623         talloc_free(tmp_ctx);
3624         
3625         return ret;
3626 }
3627
3628 /*
3629   replay a transaction
3630  */
3631 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3632 {
3633         int ret, i;
3634         struct ctdb_rec_data *rec = NULL;
3635
3636         h->in_replay = true;
3637         talloc_free(h->m_write);
3638         h->m_write = NULL;
3639
3640         ret = ctdb_transaction_fetch_start(h);
3641         if (ret != 0) {
3642                 return ret;
3643         }
3644
3645         for (i=0;i<h->m_all->count;i++) {
3646                 TDB_DATA key, data;
3647
3648                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3649                 if (rec == NULL) {
3650                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3651                         goto failed;
3652                 }
3653
3654                 if (rec->reqid == 0) {
3655                         /* its a store */
3656                         if (ctdb_transaction_store(h, key, data) != 0) {
3657                                 goto failed;
3658                         }
3659                 } else {
3660                         TDB_DATA data2;
3661                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3662
3663                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3664                                 talloc_free(tmp_ctx);
3665                                 goto failed;
3666                         }
3667                         if (data2.dsize != data.dsize ||
3668                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3669                                 /* the record has changed on us - we have to give up */
3670                                 talloc_free(tmp_ctx);
3671                                 goto failed;
3672                         }
3673                         talloc_free(tmp_ctx);
3674                 }
3675         }
3676         
3677         return 0;
3678
3679 failed:
3680         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3681         return -1;
3682 }
3683
3684
3685 /*
3686   commit a transaction
3687  */
3688 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3689 {
3690         int ret, retries=0;
3691         int32_t status;
3692         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3693         struct timeval timeout;
3694         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3695
3696         talloc_set_destructor(h, NULL);
3697
3698         /* our commit strategy is quite complex.
3699
3700            - we first try to commit the changes to all other nodes
3701
3702            - if that works, then we commit locally and we are done
3703
3704            - if a commit on another node fails, then we need to cancel
3705              the transaction, then restart the transaction (thus
3706              opening a window of time for a pending recovery to
3707              complete), then replay the transaction, checking all the
3708              reads and writes (checking that reads give the same data,
3709              and writes succeed). Then we retry the transaction to the
3710              other nodes
3711         */
3712
3713 again:
3714         if (h->m_write == NULL) {
3715                 /* no changes were made */
3716                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3717                 talloc_free(h);
3718                 return 0;
3719         }
3720
3721         /* tell ctdbd to commit to the other nodes */
3722         timeout = timeval_current_ofs(1, 0);
3723         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3724                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3725                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3726                            &timeout, NULL);
3727         if (ret != 0 || status != 0) {
3728                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3729                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3730                                      ", retrying after 1 second...\n",
3731                                      (retries==0)?"":"retry "));
3732                 sleep(1);
3733
3734                 if (ret != 0) {
3735                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3736                 } else {
3737                         /* work out what error code we will give if we 
3738                            have to fail the operation */
3739                         switch ((enum ctdb_trans2_commit_error)status) {
3740                         case CTDB_TRANS2_COMMIT_SUCCESS:
3741                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3742                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3743                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3744                                 break;
3745                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3746                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3747                                 break;
3748                         }
3749                 }
3750
3751                 if (++retries == 100) {
3752                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3753                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3754                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3755                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3756                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3757                         talloc_free(h);
3758                         return -1;
3759                 }               
3760
3761                 if (ctdb_replay_transaction(h) != 0) {
3762                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3763                                           "transaction on db 0x%08x, "
3764                                           "failure control =%u\n",
3765                                           h->ctdb_db->db_id,
3766                                           (unsigned)failure_control));
3767                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3768                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3769                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3770                         talloc_free(h);
3771                         return -1;
3772                 }
3773                 goto again;
3774         } else {
3775                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3776         }
3777
3778         /* do the real commit locally */
3779         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3780         if (ret != 0) {
3781                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3782                                   "on db id 0x%08x locally, "
3783                                   "failure_control=%u\n",
3784                                   h->ctdb_db->db_id,
3785                                   (unsigned)failure_control));
3786                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3787                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3788                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3789                 talloc_free(h);
3790                 return ret;
3791         }
3792
3793         /* tell ctdbd that we are finished with our local commit */
3794         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3795                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3796                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3797         talloc_free(h);
3798         return 0;
3799 }
3800
3801 /*
3802   recovery daemon ping to main daemon
3803  */
3804 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3805 {
3806         int ret;
3807         int32_t res;
3808
3809         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3810                            ctdb, NULL, &res, NULL, NULL);
3811         if (ret != 0 || res != 0) {
3812                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3813                 return -1;
3814         }
3815
3816         return 0;
3817 }
3818
3819 /* when forking the main daemon and the child process needs to connect back
3820  * to the daemon as a client process, this function can be used to change
3821  * the ctdb context from daemon into client mode
3822  */
3823 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3824 {
3825         int ret;
3826         va_list ap;
3827
3828         /* Add extra information so we can identify this in the logs */
3829         va_start(ap, fmt);
3830         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3831         va_end(ap);
3832
3833         /* shutdown the transport */
3834         if (ctdb->methods) {
3835                 ctdb->methods->shutdown(ctdb);
3836         }
3837
3838         /* get a new event context */
3839         talloc_free(ctdb->ev);
3840         ctdb->ev = event_context_init(ctdb);
3841         tevent_loop_allow_nesting(ctdb->ev);
3842
3843         close(ctdb->daemon.sd);
3844         ctdb->daemon.sd = -1;
3845
3846         /* the client does not need to be realtime */
3847         if (ctdb->do_setsched) {
3848                 ctdb_restore_scheduler(ctdb);
3849         }
3850
3851         /* initialise ctdb */
3852         ret = ctdb_socket_connect(ctdb);
3853         if (ret != 0) {
3854                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3855                 return -1;
3856         }
3857
3858          return 0;
3859 }
3860
3861 /*
3862   get the status of running the monitor eventscripts: NULL means never run.
3863  */
3864 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3865                 struct timeval timeout, uint32_t destnode, 
3866                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3867                 struct ctdb_scripts_wire **script_status)
3868 {
3869         int ret;
3870         TDB_DATA outdata, indata;
3871         int32_t res;
3872         uint32_t uinttype = type;
3873
3874         indata.dptr = (uint8_t *)&uinttype;
3875         indata.dsize = sizeof(uinttype);
3876
3877         ret = ctdb_control(ctdb, destnode, 0, 
3878                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3879                            mem_ctx, &outdata, &res, &timeout, NULL);
3880         if (ret != 0 || res != 0) {
3881                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3882                 return -1;
3883         }
3884
3885         if (outdata.dsize == 0) {
3886                 *script_status = NULL;
3887         } else {
3888                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3889                 talloc_free(outdata.dptr);
3890         }
3891                     
3892         return 0;
3893 }
3894
3895 /*
3896   tell the main daemon how long it took to lock the reclock file
3897  */
3898 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3899 {
3900         int ret;
3901         int32_t res;
3902         TDB_DATA data;
3903
3904         data.dptr = (uint8_t *)&latency;
3905         data.dsize = sizeof(latency);
3906
3907         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3908                            ctdb, NULL, &res, NULL, NULL);
3909         if (ret != 0 || res != 0) {
3910                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3911                 return -1;
3912         }
3913
3914         return 0;
3915 }
3916
3917 /*
3918   get the name of the reclock file
3919  */
3920 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3921                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3922                          const char **name)
3923 {
3924         int ret;
3925         int32_t res;
3926         TDB_DATA data;
3927
3928         ret = ctdb_control(ctdb, destnode, 0, 
3929                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3930                            mem_ctx, &data, &res, &timeout, NULL);
3931         if (ret != 0 || res != 0) {
3932                 return -1;
3933         }
3934
3935         if (data.dsize == 0) {
3936                 *name = NULL;
3937         } else {
3938                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3939         }
3940         talloc_free(data.dptr);
3941
3942         return 0;
3943 }
3944
3945 /*
3946   set the reclock filename for a node
3947  */
3948 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3949 {
3950         int ret;
3951         TDB_DATA data;
3952         int32_t res;
3953
3954         if (reclock == NULL) {
3955                 data.dsize = 0;
3956                 data.dptr  = NULL;
3957         } else {
3958                 data.dsize = strlen(reclock) + 1;
3959                 data.dptr  = discard_const(reclock);
3960         }
3961
3962         ret = ctdb_control(ctdb, destnode, 0, 
3963                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3964                            NULL, NULL, &res, &timeout, NULL);
3965         if (ret != 0 || res != 0) {
3966                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3967                 return -1;
3968         }
3969
3970         return 0;
3971 }
3972
3973 /*
3974   stop a node
3975  */
3976 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3977 {
3978         int ret;
3979         int32_t res;
3980
3981         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3982                            ctdb, NULL, &res, &timeout, NULL);
3983         if (ret != 0 || res != 0) {
3984                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3985                 return -1;
3986         }
3987
3988         return 0;
3989 }
3990
3991 /*
3992   continue a node
3993  */
3994 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3995 {
3996         int ret;
3997
3998         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3999                            ctdb, NULL, NULL, &timeout, NULL);
4000         if (ret != 0) {
4001                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4002                 return -1;
4003         }
4004
4005         return 0;
4006 }
4007
4008 /*
4009   set the natgw state for a node
4010  */
4011 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4012 {
4013         int ret;
4014         TDB_DATA data;
4015         int32_t res;
4016
4017         data.dsize = sizeof(natgwstate);
4018         data.dptr  = (uint8_t *)&natgwstate;
4019
4020         ret = ctdb_control(ctdb, destnode, 0, 
4021                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4022                            NULL, NULL, &res, &timeout, NULL);
4023         if (ret != 0 || res != 0) {
4024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4025                 return -1;
4026         }
4027
4028         return 0;
4029 }
4030
4031 /*
4032   set the lmaster role for a node
4033  */
4034 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4035 {
4036         int ret;
4037         TDB_DATA data;
4038         int32_t res;
4039
4040         data.dsize = sizeof(lmasterrole);
4041         data.dptr  = (uint8_t *)&lmasterrole;
4042
4043         ret = ctdb_control(ctdb, destnode, 0, 
4044                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4045                            NULL, NULL, &res, &timeout, NULL);
4046         if (ret != 0 || res != 0) {
4047                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4048                 return -1;
4049         }
4050
4051         return 0;
4052 }
4053
4054 /*
4055   set the recmaster role for a node
4056  */
4057 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4058 {
4059         int ret;
4060         TDB_DATA data;
4061         int32_t res;
4062
4063         data.dsize = sizeof(recmasterrole);
4064         data.dptr  = (uint8_t *)&recmasterrole;
4065
4066         ret = ctdb_control(ctdb, destnode, 0, 
4067                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4068                            NULL, NULL, &res, &timeout, NULL);
4069         if (ret != 0 || res != 0) {
4070                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4071                 return -1;
4072         }
4073
4074         return 0;
4075 }
4076
4077 /* enable an eventscript
4078  */
4079 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4080 {
4081         int ret;
4082         TDB_DATA data;
4083         int32_t res;
4084
4085         data.dsize = strlen(script) + 1;
4086         data.dptr  = discard_const(script);
4087
4088         ret = ctdb_control(ctdb, destnode, 0, 
4089                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4090                            NULL, NULL, &res, &timeout, NULL);
4091         if (ret != 0 || res != 0) {
4092                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4093                 return -1;
4094         }
4095
4096         return 0;
4097 }
4098
4099 /* disable an eventscript
4100  */
4101 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4102 {
4103         int ret;
4104         TDB_DATA data;
4105         int32_t res;
4106
4107         data.dsize = strlen(script) + 1;
4108         data.dptr  = discard_const(script);
4109
4110         ret = ctdb_control(ctdb, destnode, 0, 
4111                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4112                            NULL, NULL, &res, &timeout, NULL);
4113         if (ret != 0 || res != 0) {
4114                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4115                 return -1;
4116         }
4117
4118         return 0;
4119 }
4120
4121
4122 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4123 {
4124         int ret;
4125         TDB_DATA data;
4126         int32_t res;
4127
4128         data.dsize = sizeof(*bantime);
4129         data.dptr  = (uint8_t *)bantime;
4130
4131         ret = ctdb_control(ctdb, destnode, 0, 
4132                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4133                            NULL, NULL, &res, &timeout, NULL);
4134         if (ret != 0 || res != 0) {
4135                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4136                 return -1;
4137         }
4138
4139         return 0;
4140 }
4141
4142
4143 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4144 {
4145         int ret;
4146         TDB_DATA outdata;
4147         int32_t res;
4148         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4149
4150         ret = ctdb_control(ctdb, destnode, 0, 
4151                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4152                            tmp_ctx, &outdata, &res, &timeout, NULL);
4153         if (ret != 0 || res != 0) {
4154                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4155                 talloc_free(tmp_ctx);
4156                 return -1;
4157         }
4158
4159         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4160         talloc_free(tmp_ctx);
4161
4162         return 0;
4163 }
4164
4165
4166 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4167 {
4168         int ret;
4169         int32_t res;
4170         TDB_DATA data;
4171         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4172
4173         data.dptr = (uint8_t*)db_prio;
4174         data.dsize = sizeof(*db_prio);
4175
4176         ret = ctdb_control(ctdb, destnode, 0, 
4177                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4178                            tmp_ctx, NULL, &res, &timeout, NULL);
4179         if (ret != 0 || res != 0) {
4180                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4181                 talloc_free(tmp_ctx);
4182                 return -1;
4183         }
4184
4185         talloc_free(tmp_ctx);
4186
4187         return 0;
4188 }
4189
4190 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4191 {
4192         int ret;
4193         int32_t res;
4194         TDB_DATA data;
4195         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4196
4197         data.dptr = (uint8_t*)&db_id;
4198         data.dsize = sizeof(db_id);
4199
4200         ret = ctdb_control(ctdb, destnode, 0, 
4201                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4202                            tmp_ctx, NULL, &res, &timeout, NULL);
4203         if (ret != 0 || res < 0) {
4204                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4205                 talloc_free(tmp_ctx);
4206                 return -1;
4207         }
4208
4209         if (priority) {
4210                 *priority = res;
4211         }
4212
4213         talloc_free(tmp_ctx);
4214
4215         return 0;
4216 }
4217
4218 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4219 {
4220         int ret;
4221         TDB_DATA outdata;
4222         int32_t res;
4223
4224         ret = ctdb_control(ctdb, destnode, 0, 
4225                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4226                            mem_ctx, &outdata, &res, &timeout, NULL);
4227         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4228                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4229                 return -1;
4230         }
4231
4232         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4233         talloc_free(outdata.dptr);
4234                     
4235         return 0;
4236 }