f97d739a3b54ecaf340c96926fa5adaec07c28c0
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36 struct ctdb_record_handle {
37         struct ctdb_db_context *ctdb_db;
38         TDB_DATA key;
39         TDB_DATA *data;
40         struct ctdb_ltdb_header header;
41 };
42
43
44 /*
45   make a recv call to the local ctdb daemon - called from client context
46
47   This is called when the program wants to wait for a ctdb_call to complete and get the 
48   results. This call will block unless the call has already completed.
49 */
50 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
51 {
52         if (state == NULL) {
53                 return -1;
54         }
55
56         while (state->state < CTDB_CALL_DONE) {
57                 event_loop_once(state->ctdb_db->ctdb->ev);
58         }
59         if (state->state != CTDB_CALL_DONE) {
60                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
61                 talloc_free(state);
62                 return -1;
63         }
64
65         if (state->call->reply_data.dsize) {
66                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
67                                                       state->call->reply_data.dptr,
68                                                       state->call->reply_data.dsize);
69                 call->reply_data.dsize = state->call->reply_data.dsize;
70         } else {
71                 call->reply_data.dptr = NULL;
72                 call->reply_data.dsize = 0;
73         }
74         call->status = state->call->status;
75         talloc_free(state);
76
77         return 0;
78 }
79
80
81
82
83
84
85
86 /*
87   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
88 */
89 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
90 {
91         struct ctdb_client_call_state *state;
92
93         state = ctdb_call_send(ctdb_db, call);
94         return ctdb_call_recv(state, call);
95 }
96
97
98
99
100
101 /*
102   cancel a ctdb_fetch_lock operation, releasing the lock
103  */
104 static int fetch_lock_destructor(struct ctdb_record_handle *h)
105 {
106         ctdb_ltdb_unlock(h->ctdb_db, h->key);
107         return 0;
108 }
109
110 /*
111   force the migration of a record to this node
112  */
113 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
114 {
115         struct ctdb_call call;
116         ZERO_STRUCT(call);
117         call.call_id = CTDB_NULL_FUNC;
118         call.key = key;
119         call.flags = CTDB_IMMEDIATE_MIGRATION;
120         return ctdb_call(ctdb_db, &call);
121 }
122
123 /*
124   get a lock on a record, and return the records data. Blocks until it gets the lock
125  */
126 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
127                                            TDB_DATA key, TDB_DATA *data)
128 {
129         int ret;
130         struct ctdb_record_handle *h;
131
132         /*
133           procedure is as follows:
134
135           1) get the chain lock. 
136           2) check if we are dmaster
137           3) if we are the dmaster then return handle 
138           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
139              reply from ctdbd
140           5) when we get the reply, goto (1)
141          */
142
143         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
144         if (h == NULL) {
145                 return NULL;
146         }
147
148         h->ctdb_db = ctdb_db;
149         h->key     = key;
150         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
151         if (h->key.dptr == NULL) {
152                 talloc_free(h);
153                 return NULL;
154         }
155         h->data    = data;
156
157         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
158                  (const char *)key.dptr));
159
160 again:
161         /* step 1 - get the chain lock */
162         ret = ctdb_ltdb_lock(ctdb_db, key);
163         if (ret != 0) {
164                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
170
171         talloc_set_destructor(h, fetch_lock_destructor);
172
173         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
174
175         /* when torturing, ensure we test the remote path */
176         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
177             random() % 5 == 0) {
178                 h->header.dmaster = (uint32_t)-1;
179         }
180
181
182         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
183
184         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
185                 ctdb_ltdb_unlock(ctdb_db, key);
186                 ret = ctdb_client_force_migration(ctdb_db, key);
187                 if (ret != 0) {
188                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
189                         talloc_free(h);
190                         return NULL;
191                 }
192                 goto again;
193         }
194
195         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
196         return h;
197 }
198
199 /*
200   store some data to the record that was locked with ctdb_fetch_lock()
201 */
202 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
203 {
204         if (h->ctdb_db->persistent) {
205                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
206                 return -1;
207         }
208
209         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
210 }
211
212 /*
213   non-locking fetch of a record
214  */
215 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
216                TDB_DATA key, TDB_DATA *data)
217 {
218         struct ctdb_call call;
219         int ret;
220
221         call.call_id = CTDB_FETCH_FUNC;
222         call.call_data.dptr = NULL;
223         call.call_data.dsize = 0;
224
225         ret = ctdb_call(ctdb_db, &call);
226
227         if (ret == 0) {
228                 *data = call.reply_data;
229                 talloc_steal(mem_ctx, data->dptr);
230         }
231
232         return ret;
233 }
234
235
236
237
238
239
240
241
242 /*
243   send a ctdb control message
244   timeout specifies how long we should wait for a reply.
245   if timeout is NULL we wait indefinitely
246  */
247 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
248                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
249                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
250                  struct timeval *timeout,
251                  char **errormsg)
252 {
253         struct ctdb_client_control_state *state;
254
255         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
256                         flags, data, mem_ctx,
257                         timeout, errormsg);
258         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
259                         errormsg);
260 }
261
262
263
264
265 /*
266   a process exists call. Returns 0 if process exists, -1 otherwise
267  */
268 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
269 {
270         int ret;
271         TDB_DATA data;
272         int32_t status;
273
274         data.dptr = (uint8_t*)&pid;
275         data.dsize = sizeof(pid);
276
277         ret = ctdb_control(ctdb, destnode, 0, 
278                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
279                            NULL, NULL, &status, NULL, NULL);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
282                 return -1;
283         }
284
285         return status;
286 }
287
288 /*
289   get remote statistics
290  */
291 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
292 {
293         int ret;
294         TDB_DATA data;
295         int32_t res;
296
297         ret = ctdb_control(ctdb, destnode, 0, 
298                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
299                            ctdb, &data, &res, NULL, NULL);
300         if (ret != 0 || res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
302                 return -1;
303         }
304
305         if (data.dsize != sizeof(struct ctdb_statistics)) {
306                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
307                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
308                       return -1;
309         }
310
311         *status = *(struct ctdb_statistics *)data.dptr;
312         talloc_free(data.dptr);
313                         
314         return 0;
315 }
316
317 /*
318   shutdown a remote ctdb node
319  */
320 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
321 {
322         struct ctdb_client_control_state *state;
323
324         state = ctdb_control_send(ctdb, destnode, 0, 
325                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
326                            NULL, &timeout, NULL);
327         if (state == NULL) {
328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
329                 return -1;
330         }
331
332         return 0;
333 }
334
335 /*
336   get vnn map from a remote node
337  */
338 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
339 {
340         int ret;
341         TDB_DATA outdata;
342         int32_t res;
343         struct ctdb_vnn_map_wire *map;
344
345         ret = ctdb_control(ctdb, destnode, 0, 
346                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
347                            mem_ctx, &outdata, &res, &timeout, NULL);
348         if (ret != 0 || res != 0) {
349                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
350                 return -1;
351         }
352         
353         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
354         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
355             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
356                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
357                 return -1;
358         }
359
360         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
361         CTDB_NO_MEMORY(ctdb, *vnnmap);
362         (*vnnmap)->generation = map->generation;
363         (*vnnmap)->size       = map->size;
364         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
365
366         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
367         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
368         talloc_free(outdata.dptr);
369                     
370         return 0;
371 }
372
373
374 /*
375   get the recovery mode of a remote node
376  */
377 struct ctdb_client_control_state *
378 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
379 {
380         return ctdb_control_send(ctdb, destnode, 0, 
381                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
382                            mem_ctx, &timeout, NULL);
383 }
384
385 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
386 {
387         int ret;
388         int32_t res;
389
390         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
391         if (ret != 0) {
392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
393                 return -1;
394         }
395
396         if (recmode) {
397                 *recmode = (uint32_t)res;
398         }
399
400         return 0;
401 }
402
403 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
404 {
405         struct ctdb_client_control_state *state;
406
407         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
408         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
409 }
410
411
412
413
414 /*
415   set the recovery mode of a remote node
416  */
417 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
418 {
419         int ret;
420         TDB_DATA data;
421         int32_t res;
422
423         data.dsize = sizeof(uint32_t);
424         data.dptr = (unsigned char *)&recmode;
425
426         ret = ctdb_control(ctdb, destnode, 0, 
427                            CTDB_CONTROL_SET_RECMODE, 0, data, 
428                            NULL, NULL, &res, &timeout, NULL);
429         if (ret != 0 || res != 0) {
430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
431                 return -1;
432         }
433
434         return 0;
435 }
436
437
438
439 /*
440   get the recovery master of a remote node
441  */
442 struct ctdb_client_control_state *
443 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
444                         struct timeval timeout, uint32_t destnode)
445 {
446         return ctdb_control_send(ctdb, destnode, 0, 
447                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
448                            mem_ctx, &timeout, NULL);
449 }
450
451 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
452 {
453         int ret;
454         int32_t res;
455
456         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
457         if (ret != 0) {
458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
459                 return -1;
460         }
461
462         if (recmaster) {
463                 *recmaster = (uint32_t)res;
464         }
465
466         return 0;
467 }
468
469 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
470 {
471         struct ctdb_client_control_state *state;
472
473         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
474         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
475 }
476
477
478 /*
479   set the recovery master of a remote node
480  */
481 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
482 {
483         int ret;
484         TDB_DATA data;
485         int32_t res;
486
487         ZERO_STRUCT(data);
488         data.dsize = sizeof(uint32_t);
489         data.dptr = (unsigned char *)&recmaster;
490
491         ret = ctdb_control(ctdb, destnode, 0, 
492                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
493                            NULL, NULL, &res, &timeout, NULL);
494         if (ret != 0 || res != 0) {
495                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
496                 return -1;
497         }
498
499         return 0;
500 }
501
502
503 /*
504   get a list of databases off a remote node
505  */
506 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
507                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
508 {
509         int ret;
510         TDB_DATA outdata;
511         int32_t res;
512
513         ret = ctdb_control(ctdb, destnode, 0, 
514                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
515                            mem_ctx, &outdata, &res, &timeout, NULL);
516         if (ret != 0 || res != 0) {
517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
518                 return -1;
519         }
520
521         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
522         talloc_free(outdata.dptr);
523                     
524         return 0;
525 }
526
527 /*
528   get a list of nodes (vnn and flags ) from a remote node
529  */
530 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
531                 struct timeval timeout, uint32_t destnode, 
532                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
533 {
534         int ret;
535         TDB_DATA outdata;
536         int32_t res;
537
538         ret = ctdb_control(ctdb, destnode, 0, 
539                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
540                            mem_ctx, &outdata, &res, &timeout, NULL);
541         if (ret == 0 && res == -1 && outdata.dsize == 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
543                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
544         }
545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
547                 return -1;
548         }
549
550         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
551         talloc_free(outdata.dptr);
552                     
553         return 0;
554 }
555
556 /*
557   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
558  */
559 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
560                 struct timeval timeout, uint32_t destnode, 
561                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
562 {
563         int ret, i, len;
564         TDB_DATA outdata;
565         struct ctdb_node_mapv4 *nodemapv4;
566         int32_t res;
567
568         ret = ctdb_control(ctdb, destnode, 0, 
569                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
570                            mem_ctx, &outdata, &res, &timeout, NULL);
571         if (ret != 0 || res != 0 || outdata.dsize == 0) {
572                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
573                 return -1;
574         }
575
576         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
577
578         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
579         (*nodemap) = talloc_zero_size(mem_ctx, len);
580         CTDB_NO_MEMORY(ctdb, (*nodemap));
581
582         (*nodemap)->num = nodemapv4->num;
583         for (i=0; i<nodemapv4->num; i++) {
584                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
585                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
586                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
587                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
588         }
589                 
590         talloc_free(outdata.dptr);
591                     
592         return 0;
593 }
594
595 /*
596   drop the transport, reload the nodes file and restart the transport
597  */
598 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
599                     struct timeval timeout, uint32_t destnode)
600 {
601         int ret;
602         int32_t res;
603
604         ret = ctdb_control(ctdb, destnode, 0, 
605                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
606                            NULL, NULL, &res, &timeout, NULL);
607         if (ret != 0 || res != 0) {
608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
609                 return -1;
610         }
611
612         return 0;
613 }
614
615
616 /*
617   set vnn map on a node
618  */
619 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
620                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
621 {
622         int ret;
623         TDB_DATA data;
624         int32_t res;
625         struct ctdb_vnn_map_wire *map;
626         size_t len;
627
628         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
629         map = talloc_size(mem_ctx, len);
630         CTDB_NO_MEMORY(ctdb, map);
631
632         map->generation = vnnmap->generation;
633         map->size = vnnmap->size;
634         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
635         
636         data.dsize = len;
637         data.dptr  = (uint8_t *)map;
638
639         ret = ctdb_control(ctdb, destnode, 0, 
640                            CTDB_CONTROL_SETVNNMAP, 0, data, 
641                            NULL, NULL, &res, &timeout, NULL);
642         if (ret != 0 || res != 0) {
643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
644                 return -1;
645         }
646
647         talloc_free(map);
648
649         return 0;
650 }
651
652
653 /*
654   async send for pull database
655  */
656 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
657         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
658         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
659 {
660         TDB_DATA indata;
661         struct ctdb_control_pulldb *pull;
662         struct ctdb_client_control_state *state;
663
664         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
665         CTDB_NO_MEMORY_NULL(ctdb, pull);
666
667         pull->db_id   = dbid;
668         pull->lmaster = lmaster;
669
670         indata.dsize = sizeof(struct ctdb_control_pulldb);
671         indata.dptr  = (unsigned char *)pull;
672
673         state = ctdb_control_send(ctdb, destnode, 0, 
674                                   CTDB_CONTROL_PULL_DB, 0, indata, 
675                                   mem_ctx, &timeout, NULL);
676         talloc_free(pull);
677
678         return state;
679 }
680
681 /*
682   async recv for pull database
683  */
684 int ctdb_ctrl_pulldb_recv(
685         struct ctdb_context *ctdb, 
686         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
687         TDB_DATA *outdata)
688 {
689         int ret;
690         int32_t res;
691
692         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
693         if ( (ret != 0) || (res != 0) ){
694                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
695                 return -1;
696         }
697
698         return 0;
699 }
700
701 /*
702   pull all keys and records for a specific database on a node
703  */
704 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
705                 uint32_t dbid, uint32_t lmaster, 
706                 TALLOC_CTX *mem_ctx, struct timeval timeout,
707                 TDB_DATA *outdata)
708 {
709         struct ctdb_client_control_state *state;
710
711         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
712                                       timeout);
713         
714         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
715 }
716
717
718 /*
719   change dmaster for all keys in the database to the new value
720  */
721 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
722                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
723 {
724         int ret;
725         TDB_DATA indata;
726         int32_t res;
727
728         indata.dsize = 2*sizeof(uint32_t);
729         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
730
731         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
732         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
733
734         ret = ctdb_control(ctdb, destnode, 0, 
735                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
736                            NULL, NULL, &res, &timeout, NULL);
737         if (ret != 0 || res != 0) {
738                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
739                 return -1;
740         }
741
742         return 0;
743 }
744
745 /*
746   ping a node, return number of clients connected
747  */
748 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
749 {
750         int ret;
751         int32_t res;
752
753         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
754                            tdb_null, NULL, NULL, &res, NULL, NULL);
755         if (ret != 0) {
756                 return -1;
757         }
758         return res;
759 }
760
761 /*
762   find the real path to a ltdb 
763  */
764 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
765                    const char **path)
766 {
767         int ret;
768         int32_t res;
769         TDB_DATA data;
770
771         data.dptr = (uint8_t *)&dbid;
772         data.dsize = sizeof(dbid);
773
774         ret = ctdb_control(ctdb, destnode, 0, 
775                            CTDB_CONTROL_GETDBPATH, 0, data, 
776                            mem_ctx, &data, &res, &timeout, NULL);
777         if (ret != 0 || res != 0) {
778                 return -1;
779         }
780
781         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
782         if ((*path) == NULL) {
783                 return -1;
784         }
785
786         talloc_free(data.dptr);
787
788         return 0;
789 }
790
791 /*
792   find the name of a db 
793  */
794 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
795                    const char **name)
796 {
797         int ret;
798         int32_t res;
799         TDB_DATA data;
800
801         data.dptr = (uint8_t *)&dbid;
802         data.dsize = sizeof(dbid);
803
804         ret = ctdb_control(ctdb, destnode, 0, 
805                            CTDB_CONTROL_GET_DBNAME, 0, data, 
806                            mem_ctx, &data, &res, &timeout, NULL);
807         if (ret != 0 || res != 0) {
808                 return -1;
809         }
810
811         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
812         if ((*name) == NULL) {
813                 return -1;
814         }
815
816         talloc_free(data.dptr);
817
818         return 0;
819 }
820
821 /*
822   get the health status of a db
823  */
824 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
825                           struct timeval timeout,
826                           uint32_t destnode,
827                           uint32_t dbid, TALLOC_CTX *mem_ctx,
828                           const char **reason)
829 {
830         int ret;
831         int32_t res;
832         TDB_DATA data;
833
834         data.dptr = (uint8_t *)&dbid;
835         data.dsize = sizeof(dbid);
836
837         ret = ctdb_control(ctdb, destnode, 0,
838                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
839                            mem_ctx, &data, &res, &timeout, NULL);
840         if (ret != 0 || res != 0) {
841                 return -1;
842         }
843
844         if (data.dsize == 0) {
845                 (*reason) = NULL;
846                 return 0;
847         }
848
849         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
850         if ((*reason) == NULL) {
851                 return -1;
852         }
853
854         talloc_free(data.dptr);
855
856         return 0;
857 }
858
859 /*
860   create a database
861  */
862 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
863                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
864 {
865         int ret;
866         int32_t res;
867         TDB_DATA data;
868
869         data.dptr = discard_const(name);
870         data.dsize = strlen(name)+1;
871
872         ret = ctdb_control(ctdb, destnode, 0, 
873                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
874                            0, data, 
875                            mem_ctx, &data, &res, &timeout, NULL);
876
877         if (ret != 0 || res != 0) {
878                 return -1;
879         }
880
881         return 0;
882 }
883
884 /*
885   get debug level on a node
886  */
887 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
888 {
889         int ret;
890         int32_t res;
891         TDB_DATA data;
892
893         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
894                            ctdb, &data, &res, NULL, NULL);
895         if (ret != 0 || res != 0) {
896                 return -1;
897         }
898         if (data.dsize != sizeof(int32_t)) {
899                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
900                          (unsigned)data.dsize));
901                 return -1;
902         }
903         *level = *(int32_t *)data.dptr;
904         talloc_free(data.dptr);
905         return 0;
906 }
907
908 /*
909   set debug level on a node
910  */
911 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
912 {
913         int ret;
914         int32_t res;
915         TDB_DATA data;
916
917         data.dptr = (uint8_t *)&level;
918         data.dsize = sizeof(level);
919
920         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
921                            NULL, NULL, &res, NULL, NULL);
922         if (ret != 0 || res != 0) {
923                 return -1;
924         }
925         return 0;
926 }
927
928
929 /*
930   get a list of connected nodes
931  */
932 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
933                                 struct timeval timeout,
934                                 TALLOC_CTX *mem_ctx,
935                                 uint32_t *num_nodes)
936 {
937         struct ctdb_node_map *map=NULL;
938         int ret, i;
939         uint32_t *nodes;
940
941         *num_nodes = 0;
942
943         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
944         if (ret != 0) {
945                 return NULL;
946         }
947
948         nodes = talloc_array(mem_ctx, uint32_t, map->num);
949         if (nodes == NULL) {
950                 return NULL;
951         }
952
953         for (i=0;i<map->num;i++) {
954                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
955                         nodes[*num_nodes] = map->nodes[i].pnn;
956                         (*num_nodes)++;
957                 }
958         }
959
960         return nodes;
961 }
962
963
964 /*
965   reset remote status
966  */
967 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
968 {
969         int ret;
970         int32_t res;
971
972         ret = ctdb_control(ctdb, destnode, 0, 
973                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
974                            NULL, NULL, &res, NULL, NULL);
975         if (ret != 0 || res != 0) {
976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
977                 return -1;
978         }
979         return 0;
980 }
981
982 /*
983   this is the dummy null procedure that all databases support
984 */
985 static int ctdb_null_func(struct ctdb_call_info *call)
986 {
987         return 0;
988 }
989
990 /*
991   this is a plain fetch procedure that all databases support
992 */
993 static int ctdb_fetch_func(struct ctdb_call_info *call)
994 {
995         call->reply_data = &call->record_data;
996         return 0;
997 }
998
999 /*
1000   attach to a specific database - client call
1001 */
1002 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1003 {
1004         struct ctdb_db_context *ctdb_db;
1005         TDB_DATA data;
1006         int ret;
1007         int32_t res;
1008
1009         ctdb_db = ctdb_db_handle(ctdb, name);
1010         if (ctdb_db) {
1011                 return ctdb_db;
1012         }
1013
1014         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1015         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1016
1017         ctdb_db->ctdb = ctdb;
1018         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1019         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1020
1021         data.dptr = discard_const(name);
1022         data.dsize = strlen(name)+1;
1023
1024         /* tell ctdb daemon to attach */
1025         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1026                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1027                            0, data, ctdb_db, &data, &res, NULL, NULL);
1028         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1029                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1030                 talloc_free(ctdb_db);
1031                 return NULL;
1032         }
1033         
1034         ctdb_db->db_id = *(uint32_t *)data.dptr;
1035         talloc_free(data.dptr);
1036
1037         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1038         if (ret != 0) {
1039                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1040                 talloc_free(ctdb_db);
1041                 return NULL;
1042         }
1043
1044         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1045         if (ctdb->valgrinding) {
1046                 tdb_flags |= TDB_NOMMAP;
1047         }
1048         tdb_flags |= TDB_DISALLOW_NESTING;
1049
1050         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1051         if (ctdb_db->ltdb == NULL) {
1052                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1053                 talloc_free(ctdb_db);
1054                 return NULL;
1055         }
1056
1057         ctdb_db->persistent = persistent;
1058
1059         DLIST_ADD(ctdb->db_list, ctdb_db);
1060
1061         /* add well known functions */
1062         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1063         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1064
1065         return ctdb_db;
1066 }
1067
1068
1069 /*
1070   setup a call for a database
1071  */
1072 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1073 {
1074         struct ctdb_registered_call *call;
1075
1076 #if 0
1077         TDB_DATA data;
1078         int32_t status;
1079         struct ctdb_control_set_call c;
1080         int ret;
1081
1082         /* this is no longer valid with the separate daemon architecture */
1083         c.db_id = ctdb_db->db_id;
1084         c.fn    = fn;
1085         c.id    = id;
1086
1087         data.dptr = (uint8_t *)&c;
1088         data.dsize = sizeof(c);
1089
1090         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1091                            data, NULL, NULL, &status, NULL, NULL);
1092         if (ret != 0 || status != 0) {
1093                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1094                 return -1;
1095         }
1096 #endif
1097
1098         /* also register locally */
1099         call = talloc(ctdb_db, struct ctdb_registered_call);
1100         call->fn = fn;
1101         call->id = id;
1102
1103         DLIST_ADD(ctdb_db->calls, call);        
1104         return 0;
1105 }
1106
1107
1108 struct traverse_state {
1109         bool done;
1110         uint32_t count;
1111         ctdb_traverse_func fn;
1112         void *private_data;
1113 };
1114
1115 /*
1116   called on each key during a ctdb_traverse
1117  */
1118 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1119 {
1120         struct traverse_state *state = (struct traverse_state *)p;
1121         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1122         TDB_DATA key;
1123
1124         if (data.dsize < sizeof(uint32_t) ||
1125             d->length != data.dsize) {
1126                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1127                 state->done = True;
1128                 return;
1129         }
1130
1131         key.dsize = d->keylen;
1132         key.dptr  = &d->data[0];
1133         data.dsize = d->datalen;
1134         data.dptr = &d->data[d->keylen];
1135
1136         if (key.dsize == 0 && data.dsize == 0) {
1137                 /* end of traverse */
1138                 state->done = True;
1139                 return;
1140         }
1141
1142         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1143                 /* empty records are deleted records in ctdb */
1144                 return;
1145         }
1146
1147         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1148                 state->done = True;
1149         }
1150
1151         state->count++;
1152 }
1153
1154
1155 /*
1156   start a cluster wide traverse, calling the supplied fn on each record
1157   return the number of records traversed, or -1 on error
1158  */
1159 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1160 {
1161         TDB_DATA data;
1162         struct ctdb_traverse_start t;
1163         int32_t status;
1164         int ret;
1165         uint64_t srvid = (getpid() | 0xFLL<<60);
1166         struct traverse_state state;
1167
1168         state.done = False;
1169         state.count = 0;
1170         state.private_data = private_data;
1171         state.fn = fn;
1172
1173         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1174         if (ret != 0) {
1175                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1176                 return -1;
1177         }
1178
1179         t.db_id = ctdb_db->db_id;
1180         t.srvid = srvid;
1181         t.reqid = 0;
1182
1183         data.dptr = (uint8_t *)&t;
1184         data.dsize = sizeof(t);
1185
1186         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1187                            data, NULL, NULL, &status, NULL, NULL);
1188         if (ret != 0 || status != 0) {
1189                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1190                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1191                 return -1;
1192         }
1193
1194         while (!state.done) {
1195                 event_loop_once(ctdb_db->ctdb->ev);
1196         }
1197
1198         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1201                 return -1;
1202         }
1203
1204         return state.count;
1205 }
1206
1207 #define ISASCII(x) ((x>31)&&(x<128))
1208 /*
1209   called on each key during a catdb
1210  */
1211 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1212 {
1213         int i;
1214         FILE *f = (FILE *)p;
1215         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1216
1217         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1218         for (i=0;i<key.dsize;i++) {
1219                 if (ISASCII(key.dptr[i])) {
1220                         fprintf(f, "%c", key.dptr[i]);
1221                 } else {
1222                         fprintf(f, "\\%02X", key.dptr[i]);
1223                 }
1224         }
1225         fprintf(f, "\"\n");
1226
1227         fprintf(f, "dmaster: %u\n", h->dmaster);
1228         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1229
1230         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1231         for (i=sizeof(*h);i<data.dsize;i++) {
1232                 if (ISASCII(data.dptr[i])) {
1233                         fprintf(f, "%c", data.dptr[i]);
1234                 } else {
1235                         fprintf(f, "\\%02X", data.dptr[i]);
1236                 }
1237         }
1238         fprintf(f, "\"\n");
1239
1240         fprintf(f, "\n");
1241
1242         return 0;
1243 }
1244
1245 /*
1246   convenience function to list all keys to stdout
1247  */
1248 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1249 {
1250         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1251 }
1252
1253 /*
1254   get the pid of a ctdb daemon
1255  */
1256 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1257 {
1258         int ret;
1259         int32_t res;
1260
1261         ret = ctdb_control(ctdb, destnode, 0, 
1262                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1263                            NULL, NULL, &res, &timeout, NULL);
1264         if (ret != 0) {
1265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1266                 return -1;
1267         }
1268
1269         *pid = res;
1270
1271         return 0;
1272 }
1273
1274
1275 /*
1276   async freeze send control
1277  */
1278 struct ctdb_client_control_state *
1279 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1280 {
1281         return ctdb_control_send(ctdb, destnode, priority, 
1282                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1283                            mem_ctx, &timeout, NULL);
1284 }
1285
1286 /* 
1287    async freeze recv control
1288 */
1289 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1290 {
1291         int ret;
1292         int32_t res;
1293
1294         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1295         if ( (ret != 0) || (res != 0) ){
1296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1297                 return -1;
1298         }
1299
1300         return 0;
1301 }
1302
1303 /*
1304   freeze databases of a certain priority
1305  */
1306 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1307 {
1308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1309         struct ctdb_client_control_state *state;
1310         int ret;
1311
1312         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1313         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1314         talloc_free(tmp_ctx);
1315
1316         return ret;
1317 }
1318
1319 /* Freeze all databases */
1320 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1321 {
1322         int i;
1323
1324         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1325                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1326                         return -1;
1327                 }
1328         }
1329         return 0;
1330 }
1331
1332 /*
1333   thaw databases of a certain priority
1334  */
1335 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1336 {
1337         int ret;
1338         int32_t res;
1339
1340         ret = ctdb_control(ctdb, destnode, priority, 
1341                            CTDB_CONTROL_THAW, 0, tdb_null, 
1342                            NULL, NULL, &res, &timeout, NULL);
1343         if (ret != 0 || res != 0) {
1344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1345                 return -1;
1346         }
1347
1348         return 0;
1349 }
1350
1351 /* thaw all databases */
1352 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1355 }
1356
1357 /*
1358   get pnn of a node, or -1
1359  */
1360 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1361 {
1362         int ret;
1363         int32_t res;
1364
1365         ret = ctdb_control(ctdb, destnode, 0, 
1366                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1367                            NULL, NULL, &res, &timeout, NULL);
1368         if (ret != 0) {
1369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1370                 return -1;
1371         }
1372
1373         return res;
1374 }
1375
1376 /*
1377   get the monitoring mode of a remote node
1378  */
1379 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1380 {
1381         int ret;
1382         int32_t res;
1383
1384         ret = ctdb_control(ctdb, destnode, 0, 
1385                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1386                            NULL, NULL, &res, &timeout, NULL);
1387         if (ret != 0) {
1388                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1389                 return -1;
1390         }
1391
1392         *monmode = res;
1393
1394         return 0;
1395 }
1396
1397
1398 /*
1399  set the monitoring mode of a remote node to active
1400  */
1401 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1402 {
1403         int ret;
1404         
1405
1406         ret = ctdb_control(ctdb, destnode, 0, 
1407                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1408                            NULL, NULL,NULL, &timeout, NULL);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1411                 return -1;
1412         }
1413
1414         
1415
1416         return 0;
1417 }
1418
1419 /*
1420   set the monitoring mode of a remote node to disable
1421  */
1422 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1423 {
1424         int ret;
1425         
1426
1427         ret = ctdb_control(ctdb, destnode, 0, 
1428                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1429                            NULL, NULL, NULL, &timeout, NULL);
1430         if (ret != 0) {
1431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1432                 return -1;
1433         }
1434
1435         
1436
1437         return 0;
1438 }
1439
1440
1441
1442 /* 
1443   sent to a node to make it take over an ip address
1444 */
1445 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1446                           uint32_t destnode, struct ctdb_public_ip *ip)
1447 {
1448         TDB_DATA data;
1449         struct ctdb_public_ipv4 ipv4;
1450         int ret;
1451         int32_t res;
1452
1453         if (ip->addr.sa.sa_family == AF_INET) {
1454                 ipv4.pnn = ip->pnn;
1455                 ipv4.sin = ip->addr.ip;
1456
1457                 data.dsize = sizeof(ipv4);
1458                 data.dptr  = (uint8_t *)&ipv4;
1459
1460                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1461                            NULL, &res, &timeout, NULL);
1462         } else {
1463                 data.dsize = sizeof(*ip);
1464                 data.dptr  = (uint8_t *)ip;
1465
1466                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1467                            NULL, &res, &timeout, NULL);
1468         }
1469
1470         if (ret != 0 || res != 0) {
1471                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1472                 return -1;
1473         }
1474
1475         return 0;       
1476 }
1477
1478
1479 /* 
1480   sent to a node to make it release an ip address
1481 */
1482 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1483                          uint32_t destnode, struct ctdb_public_ip *ip)
1484 {
1485         TDB_DATA data;
1486         struct ctdb_public_ipv4 ipv4;
1487         int ret;
1488         int32_t res;
1489
1490         if (ip->addr.sa.sa_family == AF_INET) {
1491                 ipv4.pnn = ip->pnn;
1492                 ipv4.sin = ip->addr.ip;
1493
1494                 data.dsize = sizeof(ipv4);
1495                 data.dptr  = (uint8_t *)&ipv4;
1496
1497                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1498                                    NULL, &res, &timeout, NULL);
1499         } else {
1500                 data.dsize = sizeof(*ip);
1501                 data.dptr  = (uint8_t *)ip;
1502
1503                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1504                                    NULL, &res, &timeout, NULL);
1505         }
1506
1507         if (ret != 0 || res != 0) {
1508                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1509                 return -1;
1510         }
1511
1512         return 0;       
1513 }
1514
1515
1516 /*
1517   get a tunable
1518  */
1519 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1520                           struct timeval timeout, 
1521                           uint32_t destnode,
1522                           const char *name, uint32_t *value)
1523 {
1524         struct ctdb_control_get_tunable *t;
1525         TDB_DATA data, outdata;
1526         int32_t res;
1527         int ret;
1528
1529         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1530         data.dptr  = talloc_size(ctdb, data.dsize);
1531         CTDB_NO_MEMORY(ctdb, data.dptr);
1532
1533         t = (struct ctdb_control_get_tunable *)data.dptr;
1534         t->length = strlen(name)+1;
1535         memcpy(t->name, name, t->length);
1536
1537         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1538                            &outdata, &res, &timeout, NULL);
1539         talloc_free(data.dptr);
1540         if (ret != 0 || res != 0) {
1541                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1542                 return -1;
1543         }
1544
1545         if (outdata.dsize != sizeof(uint32_t)) {
1546                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1547                 talloc_free(outdata.dptr);
1548                 return -1;
1549         }
1550         
1551         *value = *(uint32_t *)outdata.dptr;
1552         talloc_free(outdata.dptr);
1553
1554         return 0;
1555 }
1556
1557 /*
1558   set a tunable
1559  */
1560 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1561                           struct timeval timeout, 
1562                           uint32_t destnode,
1563                           const char *name, uint32_t value)
1564 {
1565         struct ctdb_control_set_tunable *t;
1566         TDB_DATA data;
1567         int32_t res;
1568         int ret;
1569
1570         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1571         data.dptr  = talloc_size(ctdb, data.dsize);
1572         CTDB_NO_MEMORY(ctdb, data.dptr);
1573
1574         t = (struct ctdb_control_set_tunable *)data.dptr;
1575         t->length = strlen(name)+1;
1576         memcpy(t->name, name, t->length);
1577         t->value = value;
1578
1579         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1580                            NULL, &res, &timeout, NULL);
1581         talloc_free(data.dptr);
1582         if (ret != 0 || res != 0) {
1583                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1584                 return -1;
1585         }
1586
1587         return 0;
1588 }
1589
1590 /*
1591   list tunables
1592  */
1593 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1594                             struct timeval timeout, 
1595                             uint32_t destnode,
1596                             TALLOC_CTX *mem_ctx,
1597                             const char ***list, uint32_t *count)
1598 {
1599         TDB_DATA outdata;
1600         int32_t res;
1601         int ret;
1602         struct ctdb_control_list_tunable *t;
1603         char *p, *s, *ptr;
1604
1605         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1606                            mem_ctx, &outdata, &res, &timeout, NULL);
1607         if (ret != 0 || res != 0) {
1608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1609                 return -1;
1610         }
1611
1612         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1613         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1614             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1615                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1616                 talloc_free(outdata.dptr);
1617                 return -1;              
1618         }
1619         
1620         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1621         CTDB_NO_MEMORY(ctdb, p);
1622
1623         talloc_free(outdata.dptr);
1624         
1625         (*list) = NULL;
1626         (*count) = 0;
1627
1628         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1629                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1630                 CTDB_NO_MEMORY(ctdb, *list);
1631                 (*list)[*count] = talloc_strdup(*list, s);
1632                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1633                 (*count)++;
1634         }
1635
1636         talloc_free(p);
1637
1638         return 0;
1639 }
1640
1641
1642 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1643                                    struct timeval timeout, uint32_t destnode,
1644                                    TALLOC_CTX *mem_ctx,
1645                                    uint32_t flags,
1646                                    struct ctdb_all_public_ips **ips)
1647 {
1648         int ret;
1649         TDB_DATA outdata;
1650         int32_t res;
1651
1652         ret = ctdb_control(ctdb, destnode, 0, 
1653                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1654                            mem_ctx, &outdata, &res, &timeout, NULL);
1655         if (ret == 0 && res == -1) {
1656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1657                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1658         }
1659         if (ret != 0 || res != 0) {
1660           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1661                 return -1;
1662         }
1663
1664         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1665         talloc_free(outdata.dptr);
1666                     
1667         return 0;
1668 }
1669
1670 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1671                              struct timeval timeout, uint32_t destnode,
1672                              TALLOC_CTX *mem_ctx,
1673                              struct ctdb_all_public_ips **ips)
1674 {
1675         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1676                                               destnode, mem_ctx,
1677                                               0, ips);
1678 }
1679
1680 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1681                         struct timeval timeout, uint32_t destnode, 
1682                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1683 {
1684         int ret, i, len;
1685         TDB_DATA outdata;
1686         int32_t res;
1687         struct ctdb_all_public_ipsv4 *ipsv4;
1688
1689         ret = ctdb_control(ctdb, destnode, 0, 
1690                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1691                            mem_ctx, &outdata, &res, &timeout, NULL);
1692         if (ret != 0 || res != 0) {
1693                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1694                 return -1;
1695         }
1696
1697         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1698         len = offsetof(struct ctdb_all_public_ips, ips) +
1699                 ipsv4->num*sizeof(struct ctdb_public_ip);
1700         *ips = talloc_zero_size(mem_ctx, len);
1701         CTDB_NO_MEMORY(ctdb, *ips);
1702         (*ips)->num = ipsv4->num;
1703         for (i=0; i<ipsv4->num; i++) {
1704                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1705                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1706         }
1707
1708         talloc_free(outdata.dptr);
1709                     
1710         return 0;
1711 }
1712
1713 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1714                                  struct timeval timeout, uint32_t destnode,
1715                                  TALLOC_CTX *mem_ctx,
1716                                  const ctdb_sock_addr *addr,
1717                                  struct ctdb_control_public_ip_info **_info)
1718 {
1719         int ret;
1720         TDB_DATA indata;
1721         TDB_DATA outdata;
1722         int32_t res;
1723         struct ctdb_control_public_ip_info *info;
1724         uint32_t len;
1725         uint32_t i;
1726
1727         indata.dptr = discard_const_p(uint8_t, addr);
1728         indata.dsize = sizeof(*addr);
1729
1730         ret = ctdb_control(ctdb, destnode, 0,
1731                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1732                            mem_ctx, &outdata, &res, &timeout, NULL);
1733         if (ret != 0 || res != 0) {
1734                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1735                                 "failed ret:%d res:%d\n",
1736                                 ret, res));
1737                 return -1;
1738         }
1739
1740         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1741         if (len > outdata.dsize) {
1742                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1743                                 "returned invalid data with size %u > %u\n",
1744                                 (unsigned int)outdata.dsize,
1745                                 (unsigned int)len));
1746                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1747                 return -1;
1748         }
1749
1750         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1751         len += info->num*sizeof(struct ctdb_control_iface_info);
1752
1753         if (len > outdata.dsize) {
1754                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1755                                 "returned invalid data with size %u > %u\n",
1756                                 (unsigned int)outdata.dsize,
1757                                 (unsigned int)len));
1758                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1759                 return -1;
1760         }
1761
1762         /* make sure we null terminate the returned strings */
1763         for (i=0; i < info->num; i++) {
1764                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1765         }
1766
1767         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1768                                                                 outdata.dptr,
1769                                                                 outdata.dsize);
1770         talloc_free(outdata.dptr);
1771         if (*_info == NULL) {
1772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1773                                 "talloc_memdup size %u failed\n",
1774                                 (unsigned int)outdata.dsize));
1775                 return -1;
1776         }
1777
1778         return 0;
1779 }
1780
1781 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1782                          struct timeval timeout, uint32_t destnode,
1783                          TALLOC_CTX *mem_ctx,
1784                          struct ctdb_control_get_ifaces **_ifaces)
1785 {
1786         int ret;
1787         TDB_DATA outdata;
1788         int32_t res;
1789         struct ctdb_control_get_ifaces *ifaces;
1790         uint32_t len;
1791         uint32_t i;
1792
1793         ret = ctdb_control(ctdb, destnode, 0,
1794                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1795                            mem_ctx, &outdata, &res, &timeout, NULL);
1796         if (ret != 0 || res != 0) {
1797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1798                                 "failed ret:%d res:%d\n",
1799                                 ret, res));
1800                 return -1;
1801         }
1802
1803         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1804         if (len > outdata.dsize) {
1805                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1806                                 "returned invalid data with size %u > %u\n",
1807                                 (unsigned int)outdata.dsize,
1808                                 (unsigned int)len));
1809                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1810                 return -1;
1811         }
1812
1813         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1814         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1815
1816         if (len > outdata.dsize) {
1817                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1818                                 "returned invalid data with size %u > %u\n",
1819                                 (unsigned int)outdata.dsize,
1820                                 (unsigned int)len));
1821                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1822                 return -1;
1823         }
1824
1825         /* make sure we null terminate the returned strings */
1826         for (i=0; i < ifaces->num; i++) {
1827                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1828         }
1829
1830         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1831                                                                   outdata.dptr,
1832                                                                   outdata.dsize);
1833         talloc_free(outdata.dptr);
1834         if (*_ifaces == NULL) {
1835                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1836                                 "talloc_memdup size %u failed\n",
1837                                 (unsigned int)outdata.dsize));
1838                 return -1;
1839         }
1840
1841         return 0;
1842 }
1843
1844 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1845                              struct timeval timeout, uint32_t destnode,
1846                              TALLOC_CTX *mem_ctx,
1847                              const struct ctdb_control_iface_info *info)
1848 {
1849         int ret;
1850         TDB_DATA indata;
1851         int32_t res;
1852
1853         indata.dptr = discard_const_p(uint8_t, info);
1854         indata.dsize = sizeof(*info);
1855
1856         ret = ctdb_control(ctdb, destnode, 0,
1857                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1858                            mem_ctx, NULL, &res, &timeout, NULL);
1859         if (ret != 0 || res != 0) {
1860                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1861                                 "failed ret:%d res:%d\n",
1862                                 ret, res));
1863                 return -1;
1864         }
1865
1866         return 0;
1867 }
1868
1869 /*
1870   set/clear the permanent disabled bit on a remote node
1871  */
1872 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1873                        uint32_t set, uint32_t clear)
1874 {
1875         int ret;
1876         TDB_DATA data;
1877         struct ctdb_node_map *nodemap=NULL;
1878         struct ctdb_node_flag_change c;
1879         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1880         uint32_t recmaster;
1881         uint32_t *nodes;
1882
1883
1884         /* find the recovery master */
1885         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1886         if (ret != 0) {
1887                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1888                 talloc_free(tmp_ctx);
1889                 return ret;
1890         }
1891
1892
1893         /* read the node flags from the recmaster */
1894         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1897                 talloc_free(tmp_ctx);
1898                 return -1;
1899         }
1900         if (destnode >= nodemap->num) {
1901                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1902                 talloc_free(tmp_ctx);
1903                 return -1;
1904         }
1905
1906         c.pnn       = destnode;
1907         c.old_flags = nodemap->nodes[destnode].flags;
1908         c.new_flags = c.old_flags;
1909         c.new_flags |= set;
1910         c.new_flags &= ~clear;
1911
1912         data.dsize = sizeof(c);
1913         data.dptr = (unsigned char *)&c;
1914
1915         /* send the flags update to all connected nodes */
1916         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1917
1918         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1919                                         nodes, 0,
1920                                         timeout, false, data,
1921                                         NULL, NULL,
1922                                         NULL) != 0) {
1923                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1924
1925                 talloc_free(tmp_ctx);
1926                 return -1;
1927         }
1928
1929         talloc_free(tmp_ctx);
1930         return 0;
1931 }
1932
1933
1934 /*
1935   get all tunables
1936  */
1937 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1938                                struct timeval timeout, 
1939                                uint32_t destnode,
1940                                struct ctdb_tunable *tunables)
1941 {
1942         TDB_DATA outdata;
1943         int ret;
1944         int32_t res;
1945
1946         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1947                            &outdata, &res, &timeout, NULL);
1948         if (ret != 0 || res != 0) {
1949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1950                 return -1;
1951         }
1952
1953         if (outdata.dsize != sizeof(*tunables)) {
1954                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1955                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1956                 return -1;              
1957         }
1958
1959         *tunables = *(struct ctdb_tunable *)outdata.dptr;
1960         talloc_free(outdata.dptr);
1961         return 0;
1962 }
1963
1964 /*
1965   add a public address to a node
1966  */
1967 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
1968                       struct timeval timeout, 
1969                       uint32_t destnode,
1970                       struct ctdb_control_ip_iface *pub)
1971 {
1972         TDB_DATA data;
1973         int32_t res;
1974         int ret;
1975
1976         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
1977         data.dptr  = (unsigned char *)pub;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
1980                            NULL, &res, &timeout, NULL);
1981         if (ret != 0 || res != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
1983                 return -1;
1984         }
1985
1986         return 0;
1987 }
1988
1989 /*
1990   delete a public address from a node
1991  */
1992 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
1993                       struct timeval timeout, 
1994                       uint32_t destnode,
1995                       struct ctdb_control_ip_iface *pub)
1996 {
1997         TDB_DATA data;
1998         int32_t res;
1999         int ret;
2000
2001         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2002         data.dptr  = (unsigned char *)pub;
2003
2004         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2005                            NULL, &res, &timeout, NULL);
2006         if (ret != 0 || res != 0) {
2007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2008                 return -1;
2009         }
2010
2011         return 0;
2012 }
2013
2014 /*
2015   kill a tcp connection
2016  */
2017 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2018                       struct timeval timeout, 
2019                       uint32_t destnode,
2020                       struct ctdb_control_killtcp *killtcp)
2021 {
2022         TDB_DATA data;
2023         int32_t res;
2024         int ret;
2025
2026         data.dsize = sizeof(struct ctdb_control_killtcp);
2027         data.dptr  = (unsigned char *)killtcp;
2028
2029         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2030                            NULL, &res, &timeout, NULL);
2031         if (ret != 0 || res != 0) {
2032                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2033                 return -1;
2034         }
2035
2036         return 0;
2037 }
2038
2039 /*
2040   send a gratious arp
2041  */
2042 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2043                       struct timeval timeout, 
2044                       uint32_t destnode,
2045                       ctdb_sock_addr *addr,
2046                       const char *ifname)
2047 {
2048         TDB_DATA data;
2049         int32_t res;
2050         int ret, len;
2051         struct ctdb_control_gratious_arp *gratious_arp;
2052         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2053
2054
2055         len = strlen(ifname)+1;
2056         gratious_arp = talloc_size(tmp_ctx, 
2057                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2058         CTDB_NO_MEMORY(ctdb, gratious_arp);
2059
2060         gratious_arp->addr = *addr;
2061         gratious_arp->len = len;
2062         memcpy(&gratious_arp->iface[0], ifname, len);
2063
2064
2065         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2066         data.dptr  = (unsigned char *)gratious_arp;
2067
2068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2069                            NULL, &res, &timeout, NULL);
2070         if (ret != 0 || res != 0) {
2071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2072                 talloc_free(tmp_ctx);
2073                 return -1;
2074         }
2075
2076         talloc_free(tmp_ctx);
2077         return 0;
2078 }
2079
2080 /*
2081   get a list of all tcp tickles that a node knows about for a particular vnn
2082  */
2083 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2084                               struct timeval timeout, uint32_t destnode, 
2085                               TALLOC_CTX *mem_ctx, 
2086                               ctdb_sock_addr *addr,
2087                               struct ctdb_control_tcp_tickle_list **list)
2088 {
2089         int ret;
2090         TDB_DATA data, outdata;
2091         int32_t status;
2092
2093         data.dptr = (uint8_t*)addr;
2094         data.dsize = sizeof(ctdb_sock_addr);
2095
2096         ret = ctdb_control(ctdb, destnode, 0, 
2097                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2098                            mem_ctx, &outdata, &status, NULL, NULL);
2099         if (ret != 0 || status != 0) {
2100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2101                 return -1;
2102         }
2103
2104         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2105
2106         return status;
2107 }
2108
2109 /*
2110   register a server id
2111  */
2112 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2113                       struct timeval timeout, 
2114                       struct ctdb_server_id *id)
2115 {
2116         TDB_DATA data;
2117         int32_t res;
2118         int ret;
2119
2120         data.dsize = sizeof(struct ctdb_server_id);
2121         data.dptr  = (unsigned char *)id;
2122
2123         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2124                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2125                         0, data, NULL,
2126                         NULL, &res, &timeout, NULL);
2127         if (ret != 0 || res != 0) {
2128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2129                 return -1;
2130         }
2131
2132         return 0;
2133 }
2134
2135 /*
2136   unregister a server id
2137  */
2138 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2139                       struct timeval timeout, 
2140                       struct ctdb_server_id *id)
2141 {
2142         TDB_DATA data;
2143         int32_t res;
2144         int ret;
2145
2146         data.dsize = sizeof(struct ctdb_server_id);
2147         data.dptr  = (unsigned char *)id;
2148
2149         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2150                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2151                         0, data, NULL,
2152                         NULL, &res, &timeout, NULL);
2153         if (ret != 0 || res != 0) {
2154                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2155                 return -1;
2156         }
2157
2158         return 0;
2159 }
2160
2161
2162 /*
2163   check if a server id exists
2164
2165   if a server id does exist, return *status == 1, otherwise *status == 0
2166  */
2167 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2168                       struct timeval timeout, 
2169                       uint32_t destnode,
2170                       struct ctdb_server_id *id,
2171                       uint32_t *status)
2172 {
2173         TDB_DATA data;
2174         int32_t res;
2175         int ret;
2176
2177         data.dsize = sizeof(struct ctdb_server_id);
2178         data.dptr  = (unsigned char *)id;
2179
2180         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2181                         0, data, NULL,
2182                         NULL, &res, &timeout, NULL);
2183         if (ret != 0) {
2184                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2185                 return -1;
2186         }
2187
2188         if (res) {
2189                 *status = 1;
2190         } else {
2191                 *status = 0;
2192         }
2193
2194         return 0;
2195 }
2196
2197 /*
2198    get the list of server ids that are registered on a node
2199 */
2200 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2201                 TALLOC_CTX *mem_ctx,
2202                 struct timeval timeout, uint32_t destnode, 
2203                 struct ctdb_server_id_list **svid_list)
2204 {
2205         int ret;
2206         TDB_DATA outdata;
2207         int32_t res;
2208
2209         ret = ctdb_control(ctdb, destnode, 0, 
2210                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2211                            mem_ctx, &outdata, &res, &timeout, NULL);
2212         if (ret != 0 || res != 0) {
2213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2214                 return -1;
2215         }
2216
2217         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2218                     
2219         return 0;
2220 }
2221
2222 /*
2223   set some ctdb flags
2224 */
2225 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2226 {
2227         ctdb->flags |= flags;
2228 }
2229
2230
2231 /*
2232   return the pnn of this node
2233 */
2234 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2235 {
2236         return ctdb->pnn;
2237 }
2238
2239
2240 /*
2241   get the uptime of a remote node
2242  */
2243 struct ctdb_client_control_state *
2244 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2245 {
2246         return ctdb_control_send(ctdb, destnode, 0, 
2247                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2248                            mem_ctx, &timeout, NULL);
2249 }
2250
2251 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2252 {
2253         int ret;
2254         int32_t res;
2255         TDB_DATA outdata;
2256
2257         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2258         if (ret != 0 || res != 0) {
2259                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2260                 return -1;
2261         }
2262
2263         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2264
2265         return 0;
2266 }
2267
2268 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2269 {
2270         struct ctdb_client_control_state *state;
2271
2272         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2273         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2274 }
2275
2276 /*
2277   send a control to execute the "recovered" event script on a node
2278  */
2279 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2280 {
2281         int ret;
2282         int32_t status;
2283
2284         ret = ctdb_control(ctdb, destnode, 0, 
2285                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2286                            NULL, NULL, &status, &timeout, NULL);
2287         if (ret != 0 || status != 0) {
2288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2289                 return -1;
2290         }
2291
2292         return 0;
2293 }
2294
2295 /* 
2296   callback for the async helpers used when sending the same control
2297   to multiple nodes in parallell.
2298 */
2299 static void async_callback(struct ctdb_client_control_state *state)
2300 {
2301         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2302         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2303         int ret;
2304         TDB_DATA outdata;
2305         int32_t res;
2306         uint32_t destnode = state->c->hdr.destnode;
2307
2308         /* one more node has responded with recmode data */
2309         data->count--;
2310
2311         /* if we failed to push the db, then return an error and let
2312            the main loop try again.
2313         */
2314         if (state->state != CTDB_CONTROL_DONE) {
2315                 if ( !data->dont_log_errors) {
2316                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2317                 }
2318                 data->fail_count++;
2319                 if (data->fail_callback) {
2320                         data->fail_callback(ctdb, destnode, res, outdata,
2321                                         data->callback_data);
2322                 }
2323                 return;
2324         }
2325         
2326         state->async.fn = NULL;
2327
2328         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2329         if ((ret != 0) || (res != 0)) {
2330                 if ( !data->dont_log_errors) {
2331                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2332                 }
2333                 data->fail_count++;
2334                 if (data->fail_callback) {
2335                         data->fail_callback(ctdb, destnode, res, outdata,
2336                                         data->callback_data);
2337                 }
2338         }
2339         if ((ret == 0) && (data->callback != NULL)) {
2340                 data->callback(ctdb, destnode, res, outdata,
2341                                         data->callback_data);
2342         }
2343 }
2344
2345
2346 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2347 {
2348         /* set up the callback functions */
2349         state->async.fn = async_callback;
2350         state->async.private_data = data;
2351         
2352         /* one more control to wait for to complete */
2353         data->count++;
2354 }
2355
2356
2357 /* wait for up to the maximum number of seconds allowed
2358    or until all nodes we expect a response from has replied
2359 */
2360 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2361 {
2362         while (data->count > 0) {
2363                 event_loop_once(ctdb->ev);
2364         }
2365         if (data->fail_count != 0) {
2366                 if (!data->dont_log_errors) {
2367                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2368                                  data->fail_count));
2369                 }
2370                 return -1;
2371         }
2372         return 0;
2373 }
2374
2375
2376 /* 
2377    perform a simple control on the listed nodes
2378    The control cannot return data
2379  */
2380 int ctdb_client_async_control(struct ctdb_context *ctdb,
2381                                 enum ctdb_controls opcode,
2382                                 uint32_t *nodes,
2383                                 uint64_t srvid,
2384                                 struct timeval timeout,
2385                                 bool dont_log_errors,
2386                                 TDB_DATA data,
2387                                 client_async_callback client_callback,
2388                                 client_async_callback fail_callback,
2389                                 void *callback_data)
2390 {
2391         struct client_async_data *async_data;
2392         struct ctdb_client_control_state *state;
2393         int j, num_nodes;
2394
2395         async_data = talloc_zero(ctdb, struct client_async_data);
2396         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2397         async_data->dont_log_errors = dont_log_errors;
2398         async_data->callback = client_callback;
2399         async_data->fail_callback = fail_callback;
2400         async_data->callback_data = callback_data;
2401         async_data->opcode        = opcode;
2402
2403         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2404
2405         /* loop over all nodes and send an async control to each of them */
2406         for (j=0; j<num_nodes; j++) {
2407                 uint32_t pnn = nodes[j];
2408
2409                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2410                                           0, data, async_data, &timeout, NULL);
2411                 if (state == NULL) {
2412                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2413                         talloc_free(async_data);
2414                         return -1;
2415                 }
2416                 
2417                 ctdb_client_async_add(async_data, state);
2418         }
2419
2420         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2421                 talloc_free(async_data);
2422                 return -1;
2423         }
2424
2425         talloc_free(async_data);
2426         return 0;
2427 }
2428
2429 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2430                                 struct ctdb_vnn_map *vnn_map,
2431                                 TALLOC_CTX *mem_ctx,
2432                                 bool include_self)
2433 {
2434         int i, j, num_nodes;
2435         uint32_t *nodes;
2436
2437         for (i=num_nodes=0;i<vnn_map->size;i++) {
2438                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2439                         continue;
2440                 }
2441                 num_nodes++;
2442         } 
2443
2444         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2445         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2446
2447         for (i=j=0;i<vnn_map->size;i++) {
2448                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2449                         continue;
2450                 }
2451                 nodes[j++] = vnn_map->map[i];
2452         } 
2453
2454         return nodes;
2455 }
2456
2457 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2458                                 struct ctdb_node_map *node_map,
2459                                 TALLOC_CTX *mem_ctx,
2460                                 bool include_self)
2461 {
2462         int i, j, num_nodes;
2463         uint32_t *nodes;
2464
2465         for (i=num_nodes=0;i<node_map->num;i++) {
2466                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2467                         continue;
2468                 }
2469                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2470                         continue;
2471                 }
2472                 num_nodes++;
2473         } 
2474
2475         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2476         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2477
2478         for (i=j=0;i<node_map->num;i++) {
2479                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2480                         continue;
2481                 }
2482                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2483                         continue;
2484                 }
2485                 nodes[j++] = node_map->nodes[i].pnn;
2486         } 
2487
2488         return nodes;
2489 }
2490
2491 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2492                                 struct ctdb_node_map *node_map,
2493                                 TALLOC_CTX *mem_ctx,
2494                                 uint32_t pnn)
2495 {
2496         int i, j, num_nodes;
2497         uint32_t *nodes;
2498
2499         for (i=num_nodes=0;i<node_map->num;i++) {
2500                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2501                         continue;
2502                 }
2503                 if (node_map->nodes[i].pnn == pnn) {
2504                         continue;
2505                 }
2506                 num_nodes++;
2507         } 
2508
2509         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2510         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2511
2512         for (i=j=0;i<node_map->num;i++) {
2513                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2514                         continue;
2515                 }
2516                 if (node_map->nodes[i].pnn == pnn) {
2517                         continue;
2518                 }
2519                 nodes[j++] = node_map->nodes[i].pnn;
2520         } 
2521
2522         return nodes;
2523 }
2524
2525 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2526                                 struct ctdb_node_map *node_map,
2527                                 TALLOC_CTX *mem_ctx,
2528                                 bool include_self)
2529 {
2530         int i, j, num_nodes;
2531         uint32_t *nodes;
2532
2533         for (i=num_nodes=0;i<node_map->num;i++) {
2534                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2535                         continue;
2536                 }
2537                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2538                         continue;
2539                 }
2540                 num_nodes++;
2541         } 
2542
2543         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2544         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2545
2546         for (i=j=0;i<node_map->num;i++) {
2547                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2548                         continue;
2549                 }
2550                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2551                         continue;
2552                 }
2553                 nodes[j++] = node_map->nodes[i].pnn;
2554         } 
2555
2556         return nodes;
2557 }
2558
2559 /* 
2560   this is used to test if a pnn lock exists and if it exists will return
2561   the number of connections that pnn has reported or -1 if that recovery
2562   daemon is not running.
2563 */
2564 int
2565 ctdb_read_pnn_lock(int fd, int32_t pnn)
2566 {
2567         struct flock lock;
2568         char c;
2569
2570         lock.l_type = F_WRLCK;
2571         lock.l_whence = SEEK_SET;
2572         lock.l_start = pnn;
2573         lock.l_len = 1;
2574         lock.l_pid = 0;
2575
2576         if (fcntl(fd, F_GETLK, &lock) != 0) {
2577                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2578                 return -1;
2579         }
2580
2581         if (lock.l_type == F_UNLCK) {
2582                 return -1;
2583         }
2584
2585         if (pread(fd, &c, 1, pnn) == -1) {
2586                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2587                 return -1;
2588         }
2589
2590         return c;
2591 }
2592
2593 /*
2594   get capabilities of a remote node
2595  */
2596 struct ctdb_client_control_state *
2597 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2598 {
2599         return ctdb_control_send(ctdb, destnode, 0, 
2600                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2601                            mem_ctx, &timeout, NULL);
2602 }
2603
2604 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2605 {
2606         int ret;
2607         int32_t res;
2608         TDB_DATA outdata;
2609
2610         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2611         if ( (ret != 0) || (res != 0) ) {
2612                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2613                 return -1;
2614         }
2615
2616         if (capabilities) {
2617                 *capabilities = *((uint32_t *)outdata.dptr);
2618         }
2619
2620         return 0;
2621 }
2622
2623 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2624 {
2625         struct ctdb_client_control_state *state;
2626         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2627         int ret;
2628
2629         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2630         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2631         talloc_free(tmp_ctx);
2632         return ret;
2633 }
2634
2635 /**
2636  * check whether a transaction is active on a given db on a given node
2637  */
2638 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2639                                      uint32_t destnode,
2640                                      uint32_t db_id)
2641 {
2642         int32_t status;
2643         int ret;
2644         TDB_DATA indata;
2645
2646         indata.dptr = (uint8_t *)&db_id;
2647         indata.dsize = sizeof(db_id);
2648
2649         ret = ctdb_control(ctdb, destnode, 0,
2650                            CTDB_CONTROL_TRANS2_ACTIVE,
2651                            0, indata, NULL, NULL, &status,
2652                            NULL, NULL);
2653
2654         if (ret != 0) {
2655                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2656                 return -1;
2657         }
2658
2659         return status;
2660 }
2661
2662
2663 struct ctdb_transaction_handle {
2664         struct ctdb_db_context *ctdb_db;
2665         bool in_replay;
2666         /*
2667          * we store the reads and writes done under a transaction:
2668          * - one list stores both reads and writes (m_all),
2669          * - the other just writes (m_write)
2670          */
2671         struct ctdb_marshall_buffer *m_all;
2672         struct ctdb_marshall_buffer *m_write;
2673 };
2674
2675 /* start a transaction on a database */
2676 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2677 {
2678         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2679         return 0;
2680 }
2681
2682 /* start a transaction on a database */
2683 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2684 {
2685         struct ctdb_record_handle *rh;
2686         TDB_DATA key;
2687         TDB_DATA data;
2688         struct ctdb_ltdb_header header;
2689         TALLOC_CTX *tmp_ctx;
2690         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2691         int ret;
2692         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2693         pid_t pid;
2694         int32_t status;
2695
2696         key.dptr = discard_const(keyname);
2697         key.dsize = strlen(keyname);
2698
2699         if (!ctdb_db->persistent) {
2700                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2701                 return -1;
2702         }
2703
2704 again:
2705         tmp_ctx = talloc_new(h);
2706
2707         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2708         if (rh == NULL) {
2709                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2710                 talloc_free(tmp_ctx);
2711                 return -1;
2712         }
2713
2714         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2715                                               CTDB_CURRENT_NODE,
2716                                               ctdb_db->db_id);
2717         if (status == 1) {
2718                 unsigned long int usec = (1000 + random()) % 100000;
2719                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2720                                     "on db_id[0x%08x]. waiting for %lu "
2721                                     "microseconds\n",
2722                                     ctdb_db->db_id, usec));
2723                 talloc_free(tmp_ctx);
2724                 usleep(usec);
2725                 goto again;
2726         }
2727
2728         /*
2729          * store the pid in the database:
2730          * it is not enough that the node is dmaster...
2731          */
2732         pid = getpid();
2733         data.dptr = (unsigned char *)&pid;
2734         data.dsize = sizeof(pid_t);
2735         rh->header.rsn++;
2736         rh->header.dmaster = ctdb_db->ctdb->pnn;
2737         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2738         if (ret != 0) {
2739                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2740                                   "transaction record\n"));
2741                 talloc_free(tmp_ctx);
2742                 return -1;
2743         }
2744
2745         talloc_free(rh);
2746
2747         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2748         if (ret != 0) {
2749                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2750                 talloc_free(tmp_ctx);
2751                 return -1;
2752         }
2753
2754         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2755         if (ret != 0) {
2756                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2757                                  "lock record inside transaction\n"));
2758                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2759                 talloc_free(tmp_ctx);
2760                 goto again;
2761         }
2762
2763         if (header.dmaster != ctdb_db->ctdb->pnn) {
2764                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2765                                    "transaction lock record\n"));
2766                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2767                 talloc_free(tmp_ctx);
2768                 goto again;
2769         }
2770
2771         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2772                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2773                                     "the transaction lock record\n"));
2774                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2775                 talloc_free(tmp_ctx);
2776                 goto again;
2777         }
2778
2779         talloc_free(tmp_ctx);
2780
2781         return 0;
2782 }
2783
2784
2785 /* start a transaction on a database */
2786 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2787                                                        TALLOC_CTX *mem_ctx)
2788 {
2789         struct ctdb_transaction_handle *h;
2790         int ret;
2791
2792         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2793         if (h == NULL) {
2794                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2795                 return NULL;
2796         }
2797
2798         h->ctdb_db = ctdb_db;
2799
2800         ret = ctdb_transaction_fetch_start(h);
2801         if (ret != 0) {
2802                 talloc_free(h);
2803                 return NULL;
2804         }
2805
2806         talloc_set_destructor(h, ctdb_transaction_destructor);
2807
2808         return h;
2809 }
2810
2811
2812
2813 /*
2814   fetch a record inside a transaction
2815  */
2816 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2817                            TALLOC_CTX *mem_ctx, 
2818                            TDB_DATA key, TDB_DATA *data)
2819 {
2820         struct ctdb_ltdb_header header;
2821         int ret;
2822
2823         ZERO_STRUCT(header);
2824
2825         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2826         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2827                 /* record doesn't exist yet */
2828                 *data = tdb_null;
2829                 ret = 0;
2830         }
2831         
2832         if (ret != 0) {
2833                 return ret;
2834         }
2835
2836         if (!h->in_replay) {
2837                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2838                 if (h->m_all == NULL) {
2839                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2840                         return -1;
2841                 }
2842         }
2843
2844         return 0;
2845 }
2846
2847 /*
2848   stores a record inside a transaction
2849  */
2850 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2851                            TDB_DATA key, TDB_DATA data)
2852 {
2853         TALLOC_CTX *tmp_ctx = talloc_new(h);
2854         struct ctdb_ltdb_header header;
2855         TDB_DATA olddata;
2856         int ret;
2857
2858         ZERO_STRUCT(header);
2859
2860         /* we need the header so we can update the RSN */
2861         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2862         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2863                 /* the record doesn't exist - create one with us as dmaster.
2864                    This is only safe because we are in a transaction and this
2865                    is a persistent database */
2866                 ZERO_STRUCT(header);
2867         } else if (ret != 0) {
2868                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2869                 talloc_free(tmp_ctx);
2870                 return ret;
2871         }
2872
2873         if (data.dsize == olddata.dsize &&
2874             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2875                 /* save writing the same data */
2876                 talloc_free(tmp_ctx);
2877                 return 0;
2878         }
2879
2880         header.dmaster = h->ctdb_db->ctdb->pnn;
2881         header.rsn++;
2882
2883         if (!h->in_replay) {
2884                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2885                 if (h->m_all == NULL) {
2886                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2887                         talloc_free(tmp_ctx);
2888                         return -1;
2889                 }
2890         }               
2891
2892         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2893         if (h->m_write == NULL) {
2894                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2895                 talloc_free(tmp_ctx);
2896                 return -1;
2897         }
2898         
2899         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2900
2901         talloc_free(tmp_ctx);
2902         
2903         return ret;
2904 }
2905
2906 /*
2907   replay a transaction
2908  */
2909 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2910 {
2911         int ret, i;
2912         struct ctdb_rec_data *rec = NULL;
2913
2914         h->in_replay = true;
2915         talloc_free(h->m_write);
2916         h->m_write = NULL;
2917
2918         ret = ctdb_transaction_fetch_start(h);
2919         if (ret != 0) {
2920                 return ret;
2921         }
2922
2923         for (i=0;i<h->m_all->count;i++) {
2924                 TDB_DATA key, data;
2925
2926                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2927                 if (rec == NULL) {
2928                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2929                         goto failed;
2930                 }
2931
2932                 if (rec->reqid == 0) {
2933                         /* its a store */
2934                         if (ctdb_transaction_store(h, key, data) != 0) {
2935                                 goto failed;
2936                         }
2937                 } else {
2938                         TDB_DATA data2;
2939                         TALLOC_CTX *tmp_ctx = talloc_new(h);
2940
2941                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
2942                                 talloc_free(tmp_ctx);
2943                                 goto failed;
2944                         }
2945                         if (data2.dsize != data.dsize ||
2946                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
2947                                 /* the record has changed on us - we have to give up */
2948                                 talloc_free(tmp_ctx);
2949                                 goto failed;
2950                         }
2951                         talloc_free(tmp_ctx);
2952                 }
2953         }
2954         
2955         return 0;
2956
2957 failed:
2958         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2959         return -1;
2960 }
2961
2962
2963 /*
2964   commit a transaction
2965  */
2966 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2967 {
2968         int ret, retries=0;
2969         int32_t status;
2970         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
2971         struct timeval timeout;
2972         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
2973
2974         talloc_set_destructor(h, NULL);
2975
2976         /* our commit strategy is quite complex.
2977
2978            - we first try to commit the changes to all other nodes
2979
2980            - if that works, then we commit locally and we are done
2981
2982            - if a commit on another node fails, then we need to cancel
2983              the transaction, then restart the transaction (thus
2984              opening a window of time for a pending recovery to
2985              complete), then replay the transaction, checking all the
2986              reads and writes (checking that reads give the same data,
2987              and writes succeed). Then we retry the transaction to the
2988              other nodes
2989         */
2990
2991 again:
2992         if (h->m_write == NULL) {
2993                 /* no changes were made */
2994                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2995                 talloc_free(h);
2996                 return 0;
2997         }
2998
2999         /* tell ctdbd to commit to the other nodes */
3000         timeout = timeval_current_ofs(1, 0);
3001         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3002                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3003                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3004                            &timeout, NULL);
3005         if (ret != 0 || status != 0) {
3006                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3007                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3008                                      ", retrying after 1 second...\n",
3009                                      (retries==0)?"":"retry "));
3010                 sleep(1);
3011
3012                 if (ret != 0) {
3013                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3014                 } else {
3015                         /* work out what error code we will give if we 
3016                            have to fail the operation */
3017                         switch ((enum ctdb_trans2_commit_error)status) {
3018                         case CTDB_TRANS2_COMMIT_SUCCESS:
3019                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3020                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3021                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3022                                 break;
3023                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3024                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3025                                 break;
3026                         }
3027                 }
3028
3029                 if (++retries == 100) {
3030                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3031                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3032                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3033                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3034                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3035                         talloc_free(h);
3036                         return -1;
3037                 }               
3038
3039                 if (ctdb_replay_transaction(h) != 0) {
3040                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3041                                           "transaction on db 0x%08x, "
3042                                           "failure control =%u\n",
3043                                           h->ctdb_db->db_id,
3044                                           (unsigned)failure_control));
3045                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3046                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3047                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3048                         talloc_free(h);
3049                         return -1;
3050                 }
3051                 goto again;
3052         } else {
3053                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3054         }
3055
3056         /* do the real commit locally */
3057         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3058         if (ret != 0) {
3059                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3060                                   "on db id 0x%08x locally, "
3061                                   "failure_control=%u\n",
3062                                   h->ctdb_db->db_id,
3063                                   (unsigned)failure_control));
3064                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3065                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3066                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3067                 talloc_free(h);
3068                 return ret;
3069         }
3070
3071         /* tell ctdbd that we are finished with our local commit */
3072         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3073                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3074                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3075         talloc_free(h);
3076         return 0;
3077 }
3078
3079 /*
3080   recovery daemon ping to main daemon
3081  */
3082 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3083 {
3084         int ret;
3085         int32_t res;
3086
3087         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3088                            ctdb, NULL, &res, NULL, NULL);
3089         if (ret != 0 || res != 0) {
3090                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3091                 return -1;
3092         }
3093
3094         return 0;
3095 }
3096
3097 /* when forking the main daemon and the child process needs to connect back
3098  * to the daemon as a client process, this function can be used to change
3099  * the ctdb context from daemon into client mode
3100  */
3101 int switch_from_server_to_client(struct ctdb_context *ctdb)
3102 {
3103         int ret;
3104
3105         /* shutdown the transport */
3106         if (ctdb->methods) {
3107                 ctdb->methods->shutdown(ctdb);
3108         }
3109
3110         /* get a new event context */
3111         talloc_free(ctdb->ev);
3112         ctdb->ev = event_context_init(ctdb);
3113
3114         close(ctdb->daemon.sd);
3115         ctdb->daemon.sd = -1;
3116
3117         /* initialise ctdb */
3118         ret = ctdb_socket_connect(ctdb);
3119         if (ret != 0) {
3120                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3121                 return -1;
3122         }
3123
3124          return 0;
3125 }
3126
3127 /*
3128   get the status of running the monitor eventscripts: NULL means never run.
3129  */
3130 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3131                 struct timeval timeout, uint32_t destnode, 
3132                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3133                 struct ctdb_scripts_wire **script_status)
3134 {
3135         int ret;
3136         TDB_DATA outdata, indata;
3137         int32_t res;
3138         uint32_t uinttype = type;
3139
3140         indata.dptr = (uint8_t *)&uinttype;
3141         indata.dsize = sizeof(uinttype);
3142
3143         ret = ctdb_control(ctdb, destnode, 0, 
3144                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3145                            mem_ctx, &outdata, &res, &timeout, NULL);
3146         if (ret != 0 || res != 0) {
3147                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3148                 return -1;
3149         }
3150
3151         if (outdata.dsize == 0) {
3152                 *script_status = NULL;
3153         } else {
3154                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3155                 talloc_free(outdata.dptr);
3156         }
3157                     
3158         return 0;
3159 }
3160
3161 /*
3162   tell the main daemon how long it took to lock the reclock file
3163  */
3164 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3165 {
3166         int ret;
3167         int32_t res;
3168         TDB_DATA data;
3169
3170         data.dptr = (uint8_t *)&latency;
3171         data.dsize = sizeof(latency);
3172
3173         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3174                            ctdb, NULL, &res, NULL, NULL);
3175         if (ret != 0 || res != 0) {
3176                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3177                 return -1;
3178         }
3179
3180         return 0;
3181 }
3182
3183 /*
3184   get the name of the reclock file
3185  */
3186 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3187                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3188                          const char **name)
3189 {
3190         int ret;
3191         int32_t res;
3192         TDB_DATA data;
3193
3194         ret = ctdb_control(ctdb, destnode, 0, 
3195                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3196                            mem_ctx, &data, &res, &timeout, NULL);
3197         if (ret != 0 || res != 0) {
3198                 return -1;
3199         }
3200
3201         if (data.dsize == 0) {
3202                 *name = NULL;
3203         } else {
3204                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3205         }
3206         talloc_free(data.dptr);
3207
3208         return 0;
3209 }
3210
3211 /*
3212   set the reclock filename for a node
3213  */
3214 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3215 {
3216         int ret;
3217         TDB_DATA data;
3218         int32_t res;
3219
3220         if (reclock == NULL) {
3221                 data.dsize = 0;
3222                 data.dptr  = NULL;
3223         } else {
3224                 data.dsize = strlen(reclock) + 1;
3225                 data.dptr  = discard_const(reclock);
3226         }
3227
3228         ret = ctdb_control(ctdb, destnode, 0, 
3229                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3230                            NULL, NULL, &res, &timeout, NULL);
3231         if (ret != 0 || res != 0) {
3232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3233                 return -1;
3234         }
3235
3236         return 0;
3237 }
3238
3239 /*
3240   stop a node
3241  */
3242 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3243 {
3244         int ret;
3245         int32_t res;
3246
3247         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3248                            ctdb, NULL, &res, &timeout, NULL);
3249         if (ret != 0 || res != 0) {
3250                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3251                 return -1;
3252         }
3253
3254         return 0;
3255 }
3256
3257 /*
3258   continue a node
3259  */
3260 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3261 {
3262         int ret;
3263
3264         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3265                            ctdb, NULL, NULL, &timeout, NULL);
3266         if (ret != 0) {
3267                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3268                 return -1;
3269         }
3270
3271         return 0;
3272 }
3273
3274 /*
3275   set the natgw state for a node
3276  */
3277 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3278 {
3279         int ret;
3280         TDB_DATA data;
3281         int32_t res;
3282
3283         data.dsize = sizeof(natgwstate);
3284         data.dptr  = (uint8_t *)&natgwstate;
3285
3286         ret = ctdb_control(ctdb, destnode, 0, 
3287                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3288                            NULL, NULL, &res, &timeout, NULL);
3289         if (ret != 0 || res != 0) {
3290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3291                 return -1;
3292         }
3293
3294         return 0;
3295 }
3296
3297 /*
3298   set the lmaster role for a node
3299  */
3300 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3301 {
3302         int ret;
3303         TDB_DATA data;
3304         int32_t res;
3305
3306         data.dsize = sizeof(lmasterrole);
3307         data.dptr  = (uint8_t *)&lmasterrole;
3308
3309         ret = ctdb_control(ctdb, destnode, 0, 
3310                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3311                            NULL, NULL, &res, &timeout, NULL);
3312         if (ret != 0 || res != 0) {
3313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3314                 return -1;
3315         }
3316
3317         return 0;
3318 }
3319
3320 /*
3321   set the recmaster role for a node
3322  */
3323 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3324 {
3325         int ret;
3326         TDB_DATA data;
3327         int32_t res;
3328
3329         data.dsize = sizeof(recmasterrole);
3330         data.dptr  = (uint8_t *)&recmasterrole;
3331
3332         ret = ctdb_control(ctdb, destnode, 0, 
3333                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3334                            NULL, NULL, &res, &timeout, NULL);
3335         if (ret != 0 || res != 0) {
3336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3337                 return -1;
3338         }
3339
3340         return 0;
3341 }
3342
3343 /* enable an eventscript
3344  */
3345 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3346 {
3347         int ret;
3348         TDB_DATA data;
3349         int32_t res;
3350
3351         data.dsize = strlen(script) + 1;
3352         data.dptr  = discard_const(script);
3353
3354         ret = ctdb_control(ctdb, destnode, 0, 
3355                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3356                            NULL, NULL, &res, &timeout, NULL);
3357         if (ret != 0 || res != 0) {
3358                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3359                 return -1;
3360         }
3361
3362         return 0;
3363 }
3364
3365 /* disable an eventscript
3366  */
3367 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3368 {
3369         int ret;
3370         TDB_DATA data;
3371         int32_t res;
3372
3373         data.dsize = strlen(script) + 1;
3374         data.dptr  = discard_const(script);
3375
3376         ret = ctdb_control(ctdb, destnode, 0, 
3377                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3378                            NULL, NULL, &res, &timeout, NULL);
3379         if (ret != 0 || res != 0) {
3380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3381                 return -1;
3382         }
3383
3384         return 0;
3385 }
3386
3387
3388 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3389 {
3390         int ret;
3391         TDB_DATA data;
3392         int32_t res;
3393
3394         data.dsize = sizeof(*bantime);
3395         data.dptr  = (uint8_t *)bantime;
3396
3397         ret = ctdb_control(ctdb, destnode, 0, 
3398                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3399                            NULL, NULL, &res, &timeout, NULL);
3400         if (ret != 0 || res != 0) {
3401                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3402                 return -1;
3403         }
3404
3405         return 0;
3406 }
3407
3408
3409 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3410 {
3411         int ret;
3412         TDB_DATA outdata;
3413         int32_t res;
3414         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3415
3416         ret = ctdb_control(ctdb, destnode, 0, 
3417                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3418                            tmp_ctx, &outdata, &res, &timeout, NULL);
3419         if (ret != 0 || res != 0) {
3420                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3421                 talloc_free(tmp_ctx);
3422                 return -1;
3423         }
3424
3425         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3426         talloc_free(tmp_ctx);
3427
3428         return 0;
3429 }
3430
3431
3432 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3433 {
3434         int ret;
3435         int32_t res;
3436         TDB_DATA data;
3437         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3438
3439         data.dptr = (uint8_t*)db_prio;
3440         data.dsize = sizeof(*db_prio);
3441
3442         ret = ctdb_control(ctdb, destnode, 0, 
3443                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3444                            tmp_ctx, NULL, &res, &timeout, NULL);
3445         if (ret != 0 || res != 0) {
3446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3447                 talloc_free(tmp_ctx);
3448                 return -1;
3449         }
3450
3451         talloc_free(tmp_ctx);
3452
3453         return 0;
3454 }
3455
3456 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3457 {
3458         int ret;
3459         int32_t res;
3460         TDB_DATA data;
3461         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3462
3463         data.dptr = (uint8_t*)&db_id;
3464         data.dsize = sizeof(db_id);
3465
3466         ret = ctdb_control(ctdb, destnode, 0, 
3467                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3468                            tmp_ctx, NULL, &res, &timeout, NULL);
3469         if (ret != 0 || res < 0) {
3470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3471                 talloc_free(tmp_ctx);
3472                 return -1;
3473         }
3474
3475         if (priority) {
3476                 *priority = res;
3477         }
3478
3479         talloc_free(tmp_ctx);
3480
3481         return 0;
3482 }
3483
3484 /* time out handler for ctdb_control */
3485 void ctdb_control_timeout_func(struct event_context *ev, struct timed_event *te, 
3486         struct timeval t, void *private_data)
3487 {
3488         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
3489
3490         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
3491                          "dstnode:%u\n", state->reqid, state->c->opcode,
3492                          state->c->hdr.destnode));
3493
3494         state->state = CTDB_CONTROL_TIMEOUT;
3495
3496         /* if we had a callback registered for this control, pull the response
3497            and call the callback.
3498         */
3499         if (state->async.fn) {
3500                 event_add_timed(state->ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
3501         }
3502 }
3503