add a function ctdb_writerecord() to write a record to the database.
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36
37 /*
38   non-locking fetch of a record
39  */
40 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
41                TDB_DATA key, TDB_DATA *data)
42 {
43         struct ctdb_call call;
44         int ret;
45
46         call.call_id = CTDB_FETCH_FUNC;
47         call.call_data.dptr = NULL;
48         call.call_data.dsize = 0;
49
50         ret = ctdb_call(ctdb_db, &call);
51
52         if (ret == 0) {
53                 *data = call.reply_data;
54                 talloc_steal(mem_ctx, data->dptr);
55         }
56
57         return ret;
58 }
59
60
61
62
63
64
65
66
67 /*
68   send a ctdb control message
69   timeout specifies how long we should wait for a reply.
70   if timeout is NULL we wait indefinitely
71  */
72 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
73                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
74                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
75                  struct timeval *timeout,
76                  char **errormsg)
77 {
78         struct ctdb_client_control_state *state;
79
80         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
81                         flags, data, mem_ctx,
82                         errormsg);
83         if (state != NULL && timeout && !timeval_is_zero(timeout)) {
84                 event_add_timed(ctdb->ev, state, *timeout, ctdb_control_timeout_func, state);
85         }
86
87         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
88                         errormsg);
89 }
90
91
92
93
94 /*
95   a process exists call. Returns 0 if process exists, -1 otherwise
96  */
97 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
98 {
99         int ret;
100         TDB_DATA data;
101         int32_t status;
102
103         data.dptr = (uint8_t*)&pid;
104         data.dsize = sizeof(pid);
105
106         ret = ctdb_control(ctdb, destnode, 0, 
107                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
108                            NULL, NULL, &status, NULL, NULL);
109         if (ret != 0) {
110                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
111                 return -1;
112         }
113
114         return status;
115 }
116
117 /*
118   get remote statistics
119  */
120 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
121 {
122         int ret;
123         TDB_DATA data;
124         int32_t res;
125
126         ret = ctdb_control(ctdb, destnode, 0, 
127                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
128                            ctdb, &data, &res, NULL, NULL);
129         if (ret != 0 || res != 0) {
130                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
131                 return -1;
132         }
133
134         if (data.dsize != sizeof(struct ctdb_statistics)) {
135                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
136                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
137                       return -1;
138         }
139
140         *status = *(struct ctdb_statistics *)data.dptr;
141         talloc_free(data.dptr);
142                         
143         return 0;
144 }
145
146 /*
147   shutdown a remote ctdb node
148  */
149 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
150 {
151         struct ctdb_client_control_state *state;
152
153         state = ctdb_control_send(ctdb, destnode, 0, 
154                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
155                            NULL, NULL);
156         if (state == NULL) {
157                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
158                 return -1;
159         }
160         if (!timeval_is_zero(&timeout)) {
161                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
162         }
163
164         return 0;
165 }
166
167 /*
168   get vnn map from a remote node
169  */
170 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
171 {
172         int ret;
173         TDB_DATA outdata;
174         int32_t res;
175         struct ctdb_vnn_map_wire *map;
176
177         ret = ctdb_control(ctdb, destnode, 0, 
178                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
179                            mem_ctx, &outdata, &res, &timeout, NULL);
180         if (ret != 0 || res != 0) {
181                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
182                 return -1;
183         }
184         
185         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
186         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
187             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
188                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
189                 return -1;
190         }
191
192         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
193         CTDB_NO_MEMORY(ctdb, *vnnmap);
194         (*vnnmap)->generation = map->generation;
195         (*vnnmap)->size       = map->size;
196         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
197
198         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
199         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
200         talloc_free(outdata.dptr);
201                     
202         return 0;
203 }
204
205
206 /*
207   get the recovery mode of a remote node
208  */
209 struct ctdb_client_control_state *
210 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
211 {
212         struct ctdb_client_control_state *state;
213
214         state = ctdb_control_send(ctdb, destnode, 0, 
215                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
216                            mem_ctx, NULL);
217
218         if (state != NULL && !timeval_is_zero(&timeout)) {
219                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
220         }
221
222         return state;
223 }
224
225 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
226 {
227         int ret;
228         int32_t res;
229
230         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
231         if (ret != 0) {
232                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
233                 return -1;
234         }
235
236         if (recmode) {
237                 *recmode = (uint32_t)res;
238         }
239
240         return 0;
241 }
242
243 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
244 {
245         struct ctdb_client_control_state *state;
246
247         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
248         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
249 }
250
251
252
253
254 /*
255   set the recovery mode of a remote node
256  */
257 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
258 {
259         int ret;
260         TDB_DATA data;
261         int32_t res;
262
263         data.dsize = sizeof(uint32_t);
264         data.dptr = (unsigned char *)&recmode;
265
266         ret = ctdb_control(ctdb, destnode, 0, 
267                            CTDB_CONTROL_SET_RECMODE, 0, data, 
268                            NULL, NULL, &res, &timeout, NULL);
269         if (ret != 0 || res != 0) {
270                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
271                 return -1;
272         }
273
274         return 0;
275 }
276
277
278
279 /*
280   get the recovery master of a remote node
281  */
282 struct ctdb_client_control_state *
283 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
284                         struct timeval timeout, uint32_t destnode)
285 {
286         struct ctdb_client_control_state *state;
287
288         state = ctdb_control_send(ctdb, destnode, 0, 
289                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
290                            mem_ctx, NULL);
291         if (state != NULL && !timeval_is_zero(&timeout)) {
292                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
293         }
294
295         return state;
296 }
297
298 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
299 {
300         int ret;
301         int32_t res;
302
303         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
304         if (ret != 0) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
306                 return -1;
307         }
308
309         if (recmaster) {
310                 *recmaster = (uint32_t)res;
311         }
312
313         return 0;
314 }
315
316 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
317 {
318         struct ctdb_client_control_state *state;
319
320         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
321         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
322 }
323
324
325 /*
326   set the recovery master of a remote node
327  */
328 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
329 {
330         int ret;
331         TDB_DATA data;
332         int32_t res;
333
334         ZERO_STRUCT(data);
335         data.dsize = sizeof(uint32_t);
336         data.dptr = (unsigned char *)&recmaster;
337
338         ret = ctdb_control(ctdb, destnode, 0, 
339                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
340                            NULL, NULL, &res, &timeout, NULL);
341         if (ret != 0 || res != 0) {
342                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
343                 return -1;
344         }
345
346         return 0;
347 }
348
349
350 /*
351   get a list of databases off a remote node
352  */
353 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
354                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
355 {
356         int ret;
357         TDB_DATA outdata;
358         int32_t res;
359
360         ret = ctdb_control(ctdb, destnode, 0, 
361                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
362                            mem_ctx, &outdata, &res, &timeout, NULL);
363         if (ret != 0 || res != 0) {
364                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
365                 return -1;
366         }
367
368         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
369         talloc_free(outdata.dptr);
370                     
371         return 0;
372 }
373
374 /*
375   get a list of nodes (vnn and flags ) from a remote node
376  */
377 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
378                 struct timeval timeout, uint32_t destnode, 
379                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
380 {
381         int ret;
382         TDB_DATA outdata;
383         int32_t res;
384
385         ret = ctdb_control(ctdb, destnode, 0, 
386                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
387                            mem_ctx, &outdata, &res, &timeout, NULL);
388         if (ret == 0 && res == -1 && outdata.dsize == 0) {
389                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
390                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
391         }
392         if (ret != 0 || res != 0 || outdata.dsize == 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
394                 return -1;
395         }
396
397         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
398         talloc_free(outdata.dptr);
399                     
400         return 0;
401 }
402
403 /*
404   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
405  */
406 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
407                 struct timeval timeout, uint32_t destnode, 
408                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
409 {
410         int ret, i, len;
411         TDB_DATA outdata;
412         struct ctdb_node_mapv4 *nodemapv4;
413         int32_t res;
414
415         ret = ctdb_control(ctdb, destnode, 0, 
416                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
417                            mem_ctx, &outdata, &res, &timeout, NULL);
418         if (ret != 0 || res != 0 || outdata.dsize == 0) {
419                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
420                 return -1;
421         }
422
423         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
424
425         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
426         (*nodemap) = talloc_zero_size(mem_ctx, len);
427         CTDB_NO_MEMORY(ctdb, (*nodemap));
428
429         (*nodemap)->num = nodemapv4->num;
430         for (i=0; i<nodemapv4->num; i++) {
431                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
432                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
433                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
434                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
435         }
436                 
437         talloc_free(outdata.dptr);
438                     
439         return 0;
440 }
441
442 /*
443   drop the transport, reload the nodes file and restart the transport
444  */
445 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
446                     struct timeval timeout, uint32_t destnode)
447 {
448         int ret;
449         int32_t res;
450
451         ret = ctdb_control(ctdb, destnode, 0, 
452                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
453                            NULL, NULL, &res, &timeout, NULL);
454         if (ret != 0 || res != 0) {
455                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
456                 return -1;
457         }
458
459         return 0;
460 }
461
462
463 /*
464   set vnn map on a node
465  */
466 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
467                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
468 {
469         int ret;
470         TDB_DATA data;
471         int32_t res;
472         struct ctdb_vnn_map_wire *map;
473         size_t len;
474
475         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
476         map = talloc_size(mem_ctx, len);
477         CTDB_NO_MEMORY(ctdb, map);
478
479         map->generation = vnnmap->generation;
480         map->size = vnnmap->size;
481         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
482         
483         data.dsize = len;
484         data.dptr  = (uint8_t *)map;
485
486         ret = ctdb_control(ctdb, destnode, 0, 
487                            CTDB_CONTROL_SETVNNMAP, 0, data, 
488                            NULL, NULL, &res, &timeout, NULL);
489         if (ret != 0 || res != 0) {
490                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
491                 return -1;
492         }
493
494         talloc_free(map);
495
496         return 0;
497 }
498
499
500 /*
501   async send for pull database
502  */
503 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
504         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
505         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
506 {
507         TDB_DATA indata;
508         struct ctdb_control_pulldb *pull;
509         struct ctdb_client_control_state *state;
510
511         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
512         CTDB_NO_MEMORY_NULL(ctdb, pull);
513
514         pull->db_id   = dbid;
515         pull->lmaster = lmaster;
516
517         indata.dsize = sizeof(struct ctdb_control_pulldb);
518         indata.dptr  = (unsigned char *)pull;
519
520         state = ctdb_control_send(ctdb, destnode, 0, 
521                                   CTDB_CONTROL_PULL_DB, 0, indata, 
522                                   mem_ctx, NULL);
523         if (state != NULL && !timeval_is_zero(&timeout)) {
524                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
525         }
526
527         talloc_free(pull);
528
529         return state;
530 }
531
532 /*
533   async recv for pull database
534  */
535 int ctdb_ctrl_pulldb_recv(
536         struct ctdb_context *ctdb, 
537         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
538         TDB_DATA *outdata)
539 {
540         int ret;
541         int32_t res;
542
543         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
544         if ( (ret != 0) || (res != 0) ){
545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
546                 return -1;
547         }
548
549         return 0;
550 }
551
552 /*
553   pull all keys and records for a specific database on a node
554  */
555 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
556                 uint32_t dbid, uint32_t lmaster, 
557                 TALLOC_CTX *mem_ctx, struct timeval timeout,
558                 TDB_DATA *outdata)
559 {
560         struct ctdb_client_control_state *state;
561
562         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
563                                       timeout);
564         
565         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
566 }
567
568
569 /*
570   change dmaster for all keys in the database to the new value
571  */
572 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
573                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
574 {
575         int ret;
576         TDB_DATA indata;
577         int32_t res;
578
579         indata.dsize = 2*sizeof(uint32_t);
580         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
581
582         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
583         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
584
585         ret = ctdb_control(ctdb, destnode, 0, 
586                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
587                            NULL, NULL, &res, &timeout, NULL);
588         if (ret != 0 || res != 0) {
589                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
590                 return -1;
591         }
592
593         return 0;
594 }
595
596 /*
597   ping a node, return number of clients connected
598  */
599 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
600 {
601         int ret;
602         int32_t res;
603
604         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
605                            tdb_null, NULL, NULL, &res, NULL, NULL);
606         if (ret != 0) {
607                 return -1;
608         }
609         return res;
610 }
611
612 /*
613   find the real path to a ltdb 
614  */
615 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
616                    const char **path)
617 {
618         int ret;
619         const char *tmppath = NULL;
620         ctdb_handle *handle;
621
622         handle = ctdb_getdbpath_send(ctdb, destnode, dbid, NULL, NULL);
623         if (handle == NULL) {
624                 DEBUG(DEBUG_ERR, (__location__  " Failed to send getdbpath control\n"));
625                 return -1;
626         }
627
628         if (!timeval_is_zero(&timeout)) {
629                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
630         }
631
632         ret = ctdb_getdbpath_recv(ctdb, handle, &tmppath);
633         if (ret != 0) {
634                 DEBUG(DEBUG_ERR,(__location__ " ctdb control for getdbpath failed\n"));
635                 if (tmppath != NULL) {
636                         talloc_free(discard_const(tmppath));
637                 }
638                 return -1;
639         }
640
641         if (tmppath == NULL) {
642                 return -1;
643         }
644
645         *path = talloc_strdup(mem_ctx, (const char *)tmppath);
646         talloc_free(discard_const(tmppath));
647
648         if (*path == NULL) {
649                 return -1;
650         }
651
652         return 0;
653 }
654
655 /*
656   find the name of a db 
657  */
658 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
659                    const char **name)
660 {
661         int ret;
662         int32_t res;
663         TDB_DATA data;
664
665         data.dptr = (uint8_t *)&dbid;
666         data.dsize = sizeof(dbid);
667
668         ret = ctdb_control(ctdb, destnode, 0, 
669                            CTDB_CONTROL_GET_DBNAME, 0, data, 
670                            mem_ctx, &data, &res, &timeout, NULL);
671         if (ret != 0 || res != 0) {
672                 return -1;
673         }
674
675         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
676         if ((*name) == NULL) {
677                 return -1;
678         }
679
680         talloc_free(data.dptr);
681
682         return 0;
683 }
684
685 /*
686   get the health status of a db
687  */
688 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
689                           struct timeval timeout,
690                           uint32_t destnode,
691                           uint32_t dbid, TALLOC_CTX *mem_ctx,
692                           const char **reason)
693 {
694         int ret;
695         int32_t res;
696         TDB_DATA data;
697
698         data.dptr = (uint8_t *)&dbid;
699         data.dsize = sizeof(dbid);
700
701         ret = ctdb_control(ctdb, destnode, 0,
702                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
703                            mem_ctx, &data, &res, &timeout, NULL);
704         if (ret != 0 || res != 0) {
705                 return -1;
706         }
707
708         if (data.dsize == 0) {
709                 (*reason) = NULL;
710                 return 0;
711         }
712
713         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
714         if ((*reason) == NULL) {
715                 return -1;
716         }
717
718         talloc_free(data.dptr);
719
720         return 0;
721 }
722
723 /*
724   create a database
725  */
726 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
727                         uint32_t destnode, 
728                         const char *name, bool persistent)
729 {
730         int ret;
731         ctdb_handle *handle;
732
733         handle = ctdb_createdb_send(ctdb, destnode, name, persistent, 0, NULL, NULL);
734         if (handle == NULL) {
735                 DEBUG(DEBUG_ERR, (__location__  " Failed to send createdb control\n"));
736                 return -1;
737         }
738
739         if (!timeval_is_zero(&timeout)) {
740                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
741         }
742
743         ret = ctdb_createdb_recv(ctdb, handle, NULL);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR,(__location__ " ctdb control for createdb failed\n"));
746                 return -1;
747         }
748
749         return 0;
750 }
751
752 /*
753   get debug level on a node
754  */
755 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
756 {
757         int ret;
758         int32_t res;
759         TDB_DATA data;
760
761         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
762                            ctdb, &data, &res, NULL, NULL);
763         if (ret != 0 || res != 0) {
764                 return -1;
765         }
766         if (data.dsize != sizeof(int32_t)) {
767                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
768                          (unsigned)data.dsize));
769                 return -1;
770         }
771         *level = *(int32_t *)data.dptr;
772         talloc_free(data.dptr);
773         return 0;
774 }
775
776 /*
777   set debug level on a node
778  */
779 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
780 {
781         int ret;
782         int32_t res;
783         TDB_DATA data;
784
785         data.dptr = (uint8_t *)&level;
786         data.dsize = sizeof(level);
787
788         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
789                            NULL, NULL, &res, NULL, NULL);
790         if (ret != 0 || res != 0) {
791                 return -1;
792         }
793         return 0;
794 }
795
796
797 /*
798   get a list of connected nodes
799  */
800 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
801                                 struct timeval timeout,
802                                 TALLOC_CTX *mem_ctx,
803                                 uint32_t *num_nodes)
804 {
805         struct ctdb_node_map *map=NULL;
806         int ret, i;
807         uint32_t *nodes;
808
809         *num_nodes = 0;
810
811         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
812         if (ret != 0) {
813                 return NULL;
814         }
815
816         nodes = talloc_array(mem_ctx, uint32_t, map->num);
817         if (nodes == NULL) {
818                 return NULL;
819         }
820
821         for (i=0;i<map->num;i++) {
822                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
823                         nodes[*num_nodes] = map->nodes[i].pnn;
824                         (*num_nodes)++;
825                 }
826         }
827
828         return nodes;
829 }
830
831
832 /*
833   reset remote status
834  */
835 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
836 {
837         int ret;
838         int32_t res;
839
840         ret = ctdb_control(ctdb, destnode, 0, 
841                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
842                            NULL, NULL, &res, NULL, NULL);
843         if (ret != 0 || res != 0) {
844                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
845                 return -1;
846         }
847         return 0;
848 }
849
850 struct traverse_state {
851         bool done;
852         uint32_t count;
853         ctdb_traverse_func fn;
854         void *private_data;
855 };
856
857 /*
858   called on each key during a ctdb_traverse
859  */
860 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
861 {
862         struct traverse_state *state = (struct traverse_state *)p;
863         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
864         TDB_DATA key;
865
866         if (data.dsize < sizeof(uint32_t) ||
867             d->length != data.dsize) {
868                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
869                 state->done = True;
870                 return;
871         }
872
873         key.dsize = d->keylen;
874         key.dptr  = &d->data[0];
875         data.dsize = d->datalen;
876         data.dptr = &d->data[d->keylen];
877
878         if (key.dsize == 0 && data.dsize == 0) {
879                 /* end of traverse */
880                 state->done = True;
881                 return;
882         }
883
884         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
885                 /* empty records are deleted records in ctdb */
886                 return;
887         }
888
889         if (state->fn(ctdb, key, data, state->private_data) != 0) {
890                 state->done = True;
891         }
892
893         state->count++;
894 }
895
896
897 /*
898   start a cluster wide traverse, calling the supplied fn on each record
899   return the number of records traversed, or -1 on error
900  */
901 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
902 {
903         TDB_DATA data;
904         struct ctdb_traverse_start t;
905         int32_t status;
906         int ret;
907         uint64_t srvid = (getpid() | 0xFLL<<60);
908         struct traverse_state state;
909
910         state.done = False;
911         state.count = 0;
912         state.private_data = private_data;
913         state.fn = fn;
914
915         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
916         if (ret != 0) {
917                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
918                 return -1;
919         }
920
921         t.db_id = ctdb_db->db_id;
922         t.srvid = srvid;
923         t.reqid = 0;
924
925         data.dptr = (uint8_t *)&t;
926         data.dsize = sizeof(t);
927
928         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
929                            data, NULL, NULL, &status, NULL, NULL);
930         if (ret != 0 || status != 0) {
931                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
932                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid);
933                 return -1;
934         }
935
936         while (!state.done) {
937                 event_loop_once(ctdb_db->ctdb->ev);
938         }
939
940         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid);
941         if (ret != 0) {
942                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
943                 return -1;
944         }
945
946         return state.count;
947 }
948
949 #define ISASCII(x) ((x>31)&&(x<128))
950 /*
951   called on each key during a catdb
952  */
953 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
954 {
955         int i;
956         FILE *f = (FILE *)p;
957         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
958
959         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
960         for (i=0;i<key.dsize;i++) {
961                 if (ISASCII(key.dptr[i])) {
962                         fprintf(f, "%c", key.dptr[i]);
963                 } else {
964                         fprintf(f, "\\%02X", key.dptr[i]);
965                 }
966         }
967         fprintf(f, "\"\n");
968
969         fprintf(f, "dmaster: %u\n", h->dmaster);
970         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
971
972         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
973         for (i=sizeof(*h);i<data.dsize;i++) {
974                 if (ISASCII(data.dptr[i])) {
975                         fprintf(f, "%c", data.dptr[i]);
976                 } else {
977                         fprintf(f, "\\%02X", data.dptr[i]);
978                 }
979         }
980         fprintf(f, "\"\n");
981
982         fprintf(f, "\n");
983
984         return 0;
985 }
986
987 /*
988   convenience function to list all keys to stdout
989  */
990 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
991 {
992         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
993 }
994
995 /*
996   get the pid of a ctdb daemon
997  */
998 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
999 {
1000         int ret;
1001         int32_t res;
1002
1003         ret = ctdb_control(ctdb, destnode, 0, 
1004                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1005                            NULL, NULL, &res, &timeout, NULL);
1006         if (ret != 0) {
1007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1008                 return -1;
1009         }
1010
1011         *pid = res;
1012
1013         return 0;
1014 }
1015
1016
1017 /*
1018   async freeze send control
1019  */
1020 struct ctdb_client_control_state *
1021 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1022 {
1023         struct ctdb_client_control_state *state;
1024
1025         state = ctdb_control_send(ctdb, destnode, priority, 
1026                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1027                            mem_ctx, NULL);
1028         if (state != NULL && !timeval_is_zero(&timeout)) {
1029                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
1030         }
1031
1032         return state;
1033 }
1034
1035 /* 
1036    async freeze recv control
1037 */
1038 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1039 {
1040         int ret;
1041         int32_t res;
1042
1043         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1044         if ( (ret != 0) || (res != 0) ){
1045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1046                 return -1;
1047         }
1048
1049         return 0;
1050 }
1051
1052 /*
1053   freeze databases of a certain priority
1054  */
1055 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1056 {
1057         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1058         struct ctdb_client_control_state *state;
1059         int ret;
1060
1061         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1062         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1063         talloc_free(tmp_ctx);
1064
1065         return ret;
1066 }
1067
1068 /* Freeze all databases */
1069 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1070 {
1071         int i;
1072
1073         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1074                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1075                         return -1;
1076                 }
1077         }
1078         return 0;
1079 }
1080
1081 /*
1082   thaw databases of a certain priority
1083  */
1084 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1085 {
1086         int ret;
1087         int32_t res;
1088
1089         ret = ctdb_control(ctdb, destnode, priority, 
1090                            CTDB_CONTROL_THAW, 0, tdb_null, 
1091                            NULL, NULL, &res, &timeout, NULL);
1092         if (ret != 0 || res != 0) {
1093                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1094                 return -1;
1095         }
1096
1097         return 0;
1098 }
1099
1100 /* thaw all databases */
1101 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1102 {
1103         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1104 }
1105
1106 /*
1107   get pnn of a node, or -1
1108  */
1109 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1110 {
1111         int ret;
1112         uint32_t pnn;
1113         ctdb_handle *handle;
1114
1115         handle = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
1116         if (handle == NULL) {
1117                 DEBUG(DEBUG_ERR, (__location__ " Failed to send getpnn control\n"));
1118                 return -1;
1119         }
1120
1121         if (!timeval_is_zero(&timeout)) {
1122                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
1123         }
1124
1125         ret = ctdb_getpnn_recv(ctdb, handle, &pnn);
1126         if (ret != 0) {
1127                 DEBUG(DEBUG_ERR,(__location__ " ctdb control for getpnn failed\n"));
1128                 return -1;
1129         }
1130
1131         return pnn;
1132 }
1133
1134 /*
1135   get the monitoring mode of a remote node
1136  */
1137 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1138 {
1139         int ret;
1140         int32_t res;
1141
1142         ret = ctdb_control(ctdb, destnode, 0, 
1143                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1144                            NULL, NULL, &res, &timeout, NULL);
1145         if (ret != 0) {
1146                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1147                 return -1;
1148         }
1149
1150         *monmode = res;
1151
1152         return 0;
1153 }
1154
1155
1156 /*
1157  set the monitoring mode of a remote node to active
1158  */
1159 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1160 {
1161         int ret;
1162         
1163
1164         ret = ctdb_control(ctdb, destnode, 0, 
1165                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1166                            NULL, NULL,NULL, &timeout, NULL);
1167         if (ret != 0) {
1168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1169                 return -1;
1170         }
1171
1172         
1173
1174         return 0;
1175 }
1176
1177 /*
1178   set the monitoring mode of a remote node to disable
1179  */
1180 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1181 {
1182         int ret;
1183         
1184
1185         ret = ctdb_control(ctdb, destnode, 0, 
1186                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1187                            NULL, NULL, NULL, &timeout, NULL);
1188         if (ret != 0) {
1189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1190                 return -1;
1191         }
1192
1193         
1194
1195         return 0;
1196 }
1197
1198
1199
1200 /* 
1201   sent to a node to make it take over an ip address
1202 */
1203 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1204                           uint32_t destnode, struct ctdb_public_ip *ip)
1205 {
1206         TDB_DATA data;
1207         struct ctdb_public_ipv4 ipv4;
1208         int ret;
1209         int32_t res;
1210
1211         if (ip->addr.sa.sa_family == AF_INET) {
1212                 ipv4.pnn = ip->pnn;
1213                 ipv4.sin = ip->addr.ip;
1214
1215                 data.dsize = sizeof(ipv4);
1216                 data.dptr  = (uint8_t *)&ipv4;
1217
1218                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1219                            NULL, &res, &timeout, NULL);
1220         } else {
1221                 data.dsize = sizeof(*ip);
1222                 data.dptr  = (uint8_t *)ip;
1223
1224                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1225                            NULL, &res, &timeout, NULL);
1226         }
1227
1228         if (ret != 0 || res != 0) {
1229                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1230                 return -1;
1231         }
1232
1233         return 0;       
1234 }
1235
1236
1237 /* 
1238   sent to a node to make it release an ip address
1239 */
1240 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1241                          uint32_t destnode, struct ctdb_public_ip *ip)
1242 {
1243         TDB_DATA data;
1244         struct ctdb_public_ipv4 ipv4;
1245         int ret;
1246         int32_t res;
1247
1248         if (ip->addr.sa.sa_family == AF_INET) {
1249                 ipv4.pnn = ip->pnn;
1250                 ipv4.sin = ip->addr.ip;
1251
1252                 data.dsize = sizeof(ipv4);
1253                 data.dptr  = (uint8_t *)&ipv4;
1254
1255                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1256                                    NULL, &res, &timeout, NULL);
1257         } else {
1258                 data.dsize = sizeof(*ip);
1259                 data.dptr  = (uint8_t *)ip;
1260
1261                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1262                                    NULL, &res, &timeout, NULL);
1263         }
1264
1265         if (ret != 0 || res != 0) {
1266                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1267                 return -1;
1268         }
1269
1270         return 0;       
1271 }
1272
1273
1274 /*
1275   get a tunable
1276  */
1277 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1278                           struct timeval timeout, 
1279                           uint32_t destnode,
1280                           const char *name, uint32_t *value)
1281 {
1282         struct ctdb_control_get_tunable *t;
1283         TDB_DATA data, outdata;
1284         int32_t res;
1285         int ret;
1286
1287         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1288         data.dptr  = talloc_size(ctdb, data.dsize);
1289         CTDB_NO_MEMORY(ctdb, data.dptr);
1290
1291         t = (struct ctdb_control_get_tunable *)data.dptr;
1292         t->length = strlen(name)+1;
1293         memcpy(t->name, name, t->length);
1294
1295         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1296                            &outdata, &res, &timeout, NULL);
1297         talloc_free(data.dptr);
1298         if (ret != 0 || res != 0) {
1299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1300                 return -1;
1301         }
1302
1303         if (outdata.dsize != sizeof(uint32_t)) {
1304                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1305                 talloc_free(outdata.dptr);
1306                 return -1;
1307         }
1308         
1309         *value = *(uint32_t *)outdata.dptr;
1310         talloc_free(outdata.dptr);
1311
1312         return 0;
1313 }
1314
1315 /*
1316   set a tunable
1317  */
1318 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1319                           struct timeval timeout, 
1320                           uint32_t destnode,
1321                           const char *name, uint32_t value)
1322 {
1323         struct ctdb_control_set_tunable *t;
1324         TDB_DATA data;
1325         int32_t res;
1326         int ret;
1327
1328         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1329         data.dptr  = talloc_size(ctdb, data.dsize);
1330         CTDB_NO_MEMORY(ctdb, data.dptr);
1331
1332         t = (struct ctdb_control_set_tunable *)data.dptr;
1333         t->length = strlen(name)+1;
1334         memcpy(t->name, name, t->length);
1335         t->value = value;
1336
1337         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1338                            NULL, &res, &timeout, NULL);
1339         talloc_free(data.dptr);
1340         if (ret != 0 || res != 0) {
1341                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1342                 return -1;
1343         }
1344
1345         return 0;
1346 }
1347
1348 /*
1349   list tunables
1350  */
1351 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1352                             struct timeval timeout, 
1353                             uint32_t destnode,
1354                             TALLOC_CTX *mem_ctx,
1355                             const char ***list, uint32_t *count)
1356 {
1357         TDB_DATA outdata;
1358         int32_t res;
1359         int ret;
1360         struct ctdb_control_list_tunable *t;
1361         char *p, *s, *ptr;
1362
1363         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1364                            mem_ctx, &outdata, &res, &timeout, NULL);
1365         if (ret != 0 || res != 0) {
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1367                 return -1;
1368         }
1369
1370         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1371         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1372             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1373                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1374                 talloc_free(outdata.dptr);
1375                 return -1;              
1376         }
1377         
1378         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1379         CTDB_NO_MEMORY(ctdb, p);
1380
1381         talloc_free(outdata.dptr);
1382         
1383         (*list) = NULL;
1384         (*count) = 0;
1385
1386         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1387                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1388                 CTDB_NO_MEMORY(ctdb, *list);
1389                 (*list)[*count] = talloc_strdup(*list, s);
1390                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1391                 (*count)++;
1392         }
1393
1394         talloc_free(p);
1395
1396         return 0;
1397 }
1398
1399
1400 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1401                                    struct timeval timeout, uint32_t destnode,
1402                                    TALLOC_CTX *mem_ctx,
1403                                    uint32_t flags,
1404                                    struct ctdb_all_public_ips **ips)
1405 {
1406         int ret;
1407         TDB_DATA outdata;
1408         int32_t res;
1409
1410         ret = ctdb_control(ctdb, destnode, 0, 
1411                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1412                            mem_ctx, &outdata, &res, &timeout, NULL);
1413         if (ret == 0 && res == -1) {
1414                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1415                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1416         }
1417         if (ret != 0 || res != 0) {
1418           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1419                 return -1;
1420         }
1421
1422         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1423         talloc_free(outdata.dptr);
1424                     
1425         return 0;
1426 }
1427
1428 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1429                              struct timeval timeout, uint32_t destnode,
1430                              TALLOC_CTX *mem_ctx,
1431                              struct ctdb_all_public_ips **ips)
1432 {
1433         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1434                                               destnode, mem_ctx,
1435                                               0, ips);
1436 }
1437
1438 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1439                         struct timeval timeout, uint32_t destnode, 
1440                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1441 {
1442         int ret, i, len;
1443         TDB_DATA outdata;
1444         int32_t res;
1445         struct ctdb_all_public_ipsv4 *ipsv4;
1446
1447         ret = ctdb_control(ctdb, destnode, 0, 
1448                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1449                            mem_ctx, &outdata, &res, &timeout, NULL);
1450         if (ret != 0 || res != 0) {
1451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1452                 return -1;
1453         }
1454
1455         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1456         len = offsetof(struct ctdb_all_public_ips, ips) +
1457                 ipsv4->num*sizeof(struct ctdb_public_ip);
1458         *ips = talloc_zero_size(mem_ctx, len);
1459         CTDB_NO_MEMORY(ctdb, *ips);
1460         (*ips)->num = ipsv4->num;
1461         for (i=0; i<ipsv4->num; i++) {
1462                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1463                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1464         }
1465
1466         talloc_free(outdata.dptr);
1467                     
1468         return 0;
1469 }
1470
1471 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1472                                  struct timeval timeout, uint32_t destnode,
1473                                  TALLOC_CTX *mem_ctx,
1474                                  const ctdb_sock_addr *addr,
1475                                  struct ctdb_control_public_ip_info **_info)
1476 {
1477         int ret;
1478         TDB_DATA indata;
1479         TDB_DATA outdata;
1480         int32_t res;
1481         struct ctdb_control_public_ip_info *info;
1482         uint32_t len;
1483         uint32_t i;
1484
1485         indata.dptr = discard_const_p(uint8_t, addr);
1486         indata.dsize = sizeof(*addr);
1487
1488         ret = ctdb_control(ctdb, destnode, 0,
1489                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1490                            mem_ctx, &outdata, &res, &timeout, NULL);
1491         if (ret != 0 || res != 0) {
1492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1493                                 "failed ret:%d res:%d\n",
1494                                 ret, res));
1495                 return -1;
1496         }
1497
1498         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1499         if (len > outdata.dsize) {
1500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1501                                 "returned invalid data with size %u > %u\n",
1502                                 (unsigned int)outdata.dsize,
1503                                 (unsigned int)len));
1504                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1505                 return -1;
1506         }
1507
1508         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1509         len += info->num*sizeof(struct ctdb_control_iface_info);
1510
1511         if (len > outdata.dsize) {
1512                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1513                                 "returned invalid data with size %u > %u\n",
1514                                 (unsigned int)outdata.dsize,
1515                                 (unsigned int)len));
1516                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1517                 return -1;
1518         }
1519
1520         /* make sure we null terminate the returned strings */
1521         for (i=0; i < info->num; i++) {
1522                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1523         }
1524
1525         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1526                                                                 outdata.dptr,
1527                                                                 outdata.dsize);
1528         talloc_free(outdata.dptr);
1529         if (*_info == NULL) {
1530                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1531                                 "talloc_memdup size %u failed\n",
1532                                 (unsigned int)outdata.dsize));
1533                 return -1;
1534         }
1535
1536         return 0;
1537 }
1538
1539 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1540                          struct timeval timeout, uint32_t destnode,
1541                          TALLOC_CTX *mem_ctx,
1542                          struct ctdb_control_get_ifaces **_ifaces)
1543 {
1544         int ret;
1545         TDB_DATA outdata;
1546         int32_t res;
1547         struct ctdb_control_get_ifaces *ifaces;
1548         uint32_t len;
1549         uint32_t i;
1550
1551         ret = ctdb_control(ctdb, destnode, 0,
1552                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1553                            mem_ctx, &outdata, &res, &timeout, NULL);
1554         if (ret != 0 || res != 0) {
1555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1556                                 "failed ret:%d res:%d\n",
1557                                 ret, res));
1558                 return -1;
1559         }
1560
1561         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1562         if (len > outdata.dsize) {
1563                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1564                                 "returned invalid data with size %u > %u\n",
1565                                 (unsigned int)outdata.dsize,
1566                                 (unsigned int)len));
1567                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1568                 return -1;
1569         }
1570
1571         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1572         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1573
1574         if (len > outdata.dsize) {
1575                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1576                                 "returned invalid data with size %u > %u\n",
1577                                 (unsigned int)outdata.dsize,
1578                                 (unsigned int)len));
1579                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1580                 return -1;
1581         }
1582
1583         /* make sure we null terminate the returned strings */
1584         for (i=0; i < ifaces->num; i++) {
1585                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1586         }
1587
1588         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1589                                                                   outdata.dptr,
1590                                                                   outdata.dsize);
1591         talloc_free(outdata.dptr);
1592         if (*_ifaces == NULL) {
1593                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1594                                 "talloc_memdup size %u failed\n",
1595                                 (unsigned int)outdata.dsize));
1596                 return -1;
1597         }
1598
1599         return 0;
1600 }
1601
1602 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1603                              struct timeval timeout, uint32_t destnode,
1604                              TALLOC_CTX *mem_ctx,
1605                              const struct ctdb_control_iface_info *info)
1606 {
1607         int ret;
1608         TDB_DATA indata;
1609         int32_t res;
1610
1611         indata.dptr = discard_const_p(uint8_t, info);
1612         indata.dsize = sizeof(*info);
1613
1614         ret = ctdb_control(ctdb, destnode, 0,
1615                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1616                            mem_ctx, NULL, &res, &timeout, NULL);
1617         if (ret != 0 || res != 0) {
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1619                                 "failed ret:%d res:%d\n",
1620                                 ret, res));
1621                 return -1;
1622         }
1623
1624         return 0;
1625 }
1626
1627 /*
1628   set/clear the permanent disabled bit on a remote node
1629  */
1630 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1631                        uint32_t set, uint32_t clear)
1632 {
1633         int ret;
1634         TDB_DATA data;
1635         struct ctdb_node_map *nodemap=NULL;
1636         struct ctdb_node_flag_change c;
1637         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1638         uint32_t recmaster;
1639         uint32_t *nodes;
1640
1641
1642         /* find the recovery master */
1643         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1644         if (ret != 0) {
1645                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1646                 talloc_free(tmp_ctx);
1647                 return ret;
1648         }
1649
1650
1651         /* read the node flags from the recmaster */
1652         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1653         if (ret != 0) {
1654                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1655                 talloc_free(tmp_ctx);
1656                 return -1;
1657         }
1658         if (destnode >= nodemap->num) {
1659                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1660                 talloc_free(tmp_ctx);
1661                 return -1;
1662         }
1663
1664         c.pnn       = destnode;
1665         c.old_flags = nodemap->nodes[destnode].flags;
1666         c.new_flags = c.old_flags;
1667         c.new_flags |= set;
1668         c.new_flags &= ~clear;
1669
1670         data.dsize = sizeof(c);
1671         data.dptr = (unsigned char *)&c;
1672
1673         /* send the flags update to all connected nodes */
1674         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1675
1676         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1677                                         nodes, 0,
1678                                         timeout, false, data,
1679                                         NULL, NULL,
1680                                         NULL) != 0) {
1681                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1682
1683                 talloc_free(tmp_ctx);
1684                 return -1;
1685         }
1686
1687         talloc_free(tmp_ctx);
1688         return 0;
1689 }
1690
1691
1692 /*
1693   get all tunables
1694  */
1695 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1696                                struct timeval timeout, 
1697                                uint32_t destnode,
1698                                struct ctdb_tunable *tunables)
1699 {
1700         TDB_DATA outdata;
1701         int ret;
1702         int32_t res;
1703
1704         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1705                            &outdata, &res, &timeout, NULL);
1706         if (ret != 0 || res != 0) {
1707                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1708                 return -1;
1709         }
1710
1711         if (outdata.dsize != sizeof(*tunables)) {
1712                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1713                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1714                 return -1;              
1715         }
1716
1717         *tunables = *(struct ctdb_tunable *)outdata.dptr;
1718         talloc_free(outdata.dptr);
1719         return 0;
1720 }
1721
1722 /*
1723   add a public address to a node
1724  */
1725 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
1726                       struct timeval timeout, 
1727                       uint32_t destnode,
1728                       struct ctdb_control_ip_iface *pub)
1729 {
1730         TDB_DATA data;
1731         int32_t res;
1732         int ret;
1733
1734         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
1735         data.dptr  = (unsigned char *)pub;
1736
1737         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
1738                            NULL, &res, &timeout, NULL);
1739         if (ret != 0 || res != 0) {
1740                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
1741                 return -1;
1742         }
1743
1744         return 0;
1745 }
1746
1747 /*
1748   delete a public address from a node
1749  */
1750 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
1751                       struct timeval timeout, 
1752                       uint32_t destnode,
1753                       struct ctdb_control_ip_iface *pub)
1754 {
1755         TDB_DATA data;
1756         int32_t res;
1757         int ret;
1758
1759         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
1760         data.dptr  = (unsigned char *)pub;
1761
1762         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
1763                            NULL, &res, &timeout, NULL);
1764         if (ret != 0 || res != 0) {
1765                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
1766                 return -1;
1767         }
1768
1769         return 0;
1770 }
1771
1772 /*
1773   kill a tcp connection
1774  */
1775 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
1776                       struct timeval timeout, 
1777                       uint32_t destnode,
1778                       struct ctdb_control_killtcp *killtcp)
1779 {
1780         TDB_DATA data;
1781         int32_t res;
1782         int ret;
1783
1784         data.dsize = sizeof(struct ctdb_control_killtcp);
1785         data.dptr  = (unsigned char *)killtcp;
1786
1787         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
1788                            NULL, &res, &timeout, NULL);
1789         if (ret != 0 || res != 0) {
1790                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
1791                 return -1;
1792         }
1793
1794         return 0;
1795 }
1796
1797 /*
1798   send a gratious arp
1799  */
1800 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
1801                       struct timeval timeout, 
1802                       uint32_t destnode,
1803                       ctdb_sock_addr *addr,
1804                       const char *ifname)
1805 {
1806         TDB_DATA data;
1807         int32_t res;
1808         int ret, len;
1809         struct ctdb_control_gratious_arp *gratious_arp;
1810         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1811
1812
1813         len = strlen(ifname)+1;
1814         gratious_arp = talloc_size(tmp_ctx, 
1815                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
1816         CTDB_NO_MEMORY(ctdb, gratious_arp);
1817
1818         gratious_arp->addr = *addr;
1819         gratious_arp->len = len;
1820         memcpy(&gratious_arp->iface[0], ifname, len);
1821
1822
1823         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
1824         data.dptr  = (unsigned char *)gratious_arp;
1825
1826         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
1827                            NULL, &res, &timeout, NULL);
1828         if (ret != 0 || res != 0) {
1829                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
1830                 talloc_free(tmp_ctx);
1831                 return -1;
1832         }
1833
1834         talloc_free(tmp_ctx);
1835         return 0;
1836 }
1837
1838 /*
1839   get a list of all tcp tickles that a node knows about for a particular vnn
1840  */
1841 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
1842                               struct timeval timeout, uint32_t destnode, 
1843                               TALLOC_CTX *mem_ctx, 
1844                               ctdb_sock_addr *addr,
1845                               struct ctdb_control_tcp_tickle_list **list)
1846 {
1847         int ret;
1848         TDB_DATA data, outdata;
1849         int32_t status;
1850
1851         data.dptr = (uint8_t*)addr;
1852         data.dsize = sizeof(ctdb_sock_addr);
1853
1854         ret = ctdb_control(ctdb, destnode, 0, 
1855                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
1856                            mem_ctx, &outdata, &status, NULL, NULL);
1857         if (ret != 0 || status != 0) {
1858                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
1859                 return -1;
1860         }
1861
1862         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
1863
1864         return status;
1865 }
1866
1867 /*
1868   register a server id
1869  */
1870 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
1871                       struct timeval timeout, 
1872                       struct ctdb_server_id *id)
1873 {
1874         TDB_DATA data;
1875         int32_t res;
1876         int ret;
1877
1878         data.dsize = sizeof(struct ctdb_server_id);
1879         data.dptr  = (unsigned char *)id;
1880
1881         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
1882                         CTDB_CONTROL_REGISTER_SERVER_ID, 
1883                         0, data, NULL,
1884                         NULL, &res, &timeout, NULL);
1885         if (ret != 0 || res != 0) {
1886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
1887                 return -1;
1888         }
1889
1890         return 0;
1891 }
1892
1893 /*
1894   unregister a server id
1895  */
1896 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
1897                       struct timeval timeout, 
1898                       struct ctdb_server_id *id)
1899 {
1900         TDB_DATA data;
1901         int32_t res;
1902         int ret;
1903
1904         data.dsize = sizeof(struct ctdb_server_id);
1905         data.dptr  = (unsigned char *)id;
1906
1907         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
1908                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
1909                         0, data, NULL,
1910                         NULL, &res, &timeout, NULL);
1911         if (ret != 0 || res != 0) {
1912                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
1913                 return -1;
1914         }
1915
1916         return 0;
1917 }
1918
1919
1920 /*
1921   check if a server id exists
1922
1923   if a server id does exist, return *status == 1, otherwise *status == 0
1924  */
1925 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
1926                       struct timeval timeout, 
1927                       uint32_t destnode,
1928                       struct ctdb_server_id *id,
1929                       uint32_t *status)
1930 {
1931         TDB_DATA data;
1932         int32_t res;
1933         int ret;
1934
1935         data.dsize = sizeof(struct ctdb_server_id);
1936         data.dptr  = (unsigned char *)id;
1937
1938         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
1939                         0, data, NULL,
1940                         NULL, &res, &timeout, NULL);
1941         if (ret != 0) {
1942                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
1943                 return -1;
1944         }
1945
1946         if (res) {
1947                 *status = 1;
1948         } else {
1949                 *status = 0;
1950         }
1951
1952         return 0;
1953 }
1954
1955 /*
1956    get the list of server ids that are registered on a node
1957 */
1958 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
1959                 TALLOC_CTX *mem_ctx,
1960                 struct timeval timeout, uint32_t destnode, 
1961                 struct ctdb_server_id_list **svid_list)
1962 {
1963         int ret;
1964         TDB_DATA outdata;
1965         int32_t res;
1966
1967         ret = ctdb_control(ctdb, destnode, 0, 
1968                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
1969                            mem_ctx, &outdata, &res, &timeout, NULL);
1970         if (ret != 0 || res != 0) {
1971                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
1972                 return -1;
1973         }
1974
1975         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
1976                     
1977         return 0;
1978 }
1979
1980 /*
1981   set some ctdb flags
1982 */
1983 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
1984 {
1985         ctdb->flags |= flags;
1986 }
1987
1988
1989 /*
1990   return the pnn of this node
1991 */
1992 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
1993 {
1994         return ctdb->pnn;
1995 }
1996
1997
1998 /*
1999   get the uptime of a remote node
2000  */
2001 struct ctdb_client_control_state *
2002 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2003 {
2004         struct ctdb_client_control_state *state;
2005
2006         state = ctdb_control_send(ctdb, destnode, 0, 
2007                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2008                            mem_ctx, NULL);
2009         if (state != NULL && !timeval_is_zero(&timeout)) {
2010                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2011         }
2012
2013         return state;
2014 }
2015
2016 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2017 {
2018         int ret;
2019         int32_t res;
2020         TDB_DATA outdata;
2021
2022         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2023         if (ret != 0 || res != 0) {
2024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2025                 return -1;
2026         }
2027
2028         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2029
2030         return 0;
2031 }
2032
2033 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2034 {
2035         struct ctdb_client_control_state *state;
2036
2037         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2038         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2039 }
2040
2041 /*
2042   send a control to execute the "recovered" event script on a node
2043  */
2044 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2045 {
2046         int ret;
2047         int32_t status;
2048
2049         ret = ctdb_control(ctdb, destnode, 0, 
2050                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2051                            NULL, NULL, &status, &timeout, NULL);
2052         if (ret != 0 || status != 0) {
2053                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2054                 return -1;
2055         }
2056
2057         return 0;
2058 }
2059
2060 /* 
2061   callback for the async helpers used when sending the same control
2062   to multiple nodes in parallell.
2063 */
2064 static void async_callback(struct ctdb_client_control_state *state)
2065 {
2066         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2067         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2068         int ret;
2069         TDB_DATA outdata;
2070         int32_t res;
2071         uint32_t destnode = state->c->hdr.destnode;
2072
2073         /* one more node has responded with recmode data */
2074         data->count--;
2075
2076         /* if we failed to push the db, then return an error and let
2077            the main loop try again.
2078         */
2079         if (state->state != CTDB_CONTROL_DONE) {
2080                 if ( !data->dont_log_errors) {
2081                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2082                 }
2083                 data->fail_count++;
2084                 if (data->fail_callback) {
2085                         data->fail_callback(ctdb, destnode, res, outdata,
2086                                         data->callback_data);
2087                 }
2088                 return;
2089         }
2090         
2091         state->async.fn = NULL;
2092
2093         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2094         if ((ret != 0) || (res != 0)) {
2095                 if ( !data->dont_log_errors) {
2096                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2097                 }
2098                 data->fail_count++;
2099                 if (data->fail_callback) {
2100                         data->fail_callback(ctdb, destnode, res, outdata,
2101                                         data->callback_data);
2102                 }
2103         }
2104         if ((ret == 0) && (data->callback != NULL)) {
2105                 data->callback(ctdb, destnode, res, outdata,
2106                                         data->callback_data);
2107         }
2108 }
2109
2110
2111 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2112 {
2113         /* set up the callback functions */
2114         state->async.fn = async_callback;
2115         state->async.private_data = data;
2116         
2117         /* one more control to wait for to complete */
2118         data->count++;
2119 }
2120
2121
2122 /* wait for up to the maximum number of seconds allowed
2123    or until all nodes we expect a response from has replied
2124 */
2125 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2126 {
2127         while (data->count > 0) {
2128                 event_loop_once(ctdb->ev);
2129         }
2130         if (data->fail_count != 0) {
2131                 if (!data->dont_log_errors) {
2132                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2133                                  data->fail_count));
2134                 }
2135                 return -1;
2136         }
2137         return 0;
2138 }
2139
2140
2141 /* 
2142    perform a simple control on the listed nodes
2143    The control cannot return data
2144  */
2145 int ctdb_client_async_control(struct ctdb_context *ctdb,
2146                                 enum ctdb_controls opcode,
2147                                 uint32_t *nodes,
2148                                 uint64_t srvid,
2149                                 struct timeval timeout,
2150                                 bool dont_log_errors,
2151                                 TDB_DATA data,
2152                                 client_async_callback client_callback,
2153                                 client_async_callback fail_callback,
2154                                 void *callback_data)
2155 {
2156         struct client_async_data *async_data;
2157         struct ctdb_client_control_state *state;
2158         int j, num_nodes;
2159
2160         async_data = talloc_zero(ctdb, struct client_async_data);
2161         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2162         async_data->dont_log_errors = dont_log_errors;
2163         async_data->callback = client_callback;
2164         async_data->fail_callback = fail_callback;
2165         async_data->callback_data = callback_data;
2166         async_data->opcode        = opcode;
2167
2168         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2169
2170         /* loop over all nodes and send an async control to each of them */
2171         for (j=0; j<num_nodes; j++) {
2172                 uint32_t pnn = nodes[j];
2173
2174                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2175                                           0, data, async_data, NULL);
2176                 if (state == NULL) {
2177                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2178                         talloc_free(async_data);
2179                         return -1;
2180                 }
2181                 if (!timeval_is_zero(&timeout)) {
2182                         event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2183                 }
2184                 
2185                 ctdb_client_async_add(async_data, state);
2186         }
2187
2188         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2189                 talloc_free(async_data);
2190                 return -1;
2191         }
2192
2193         talloc_free(async_data);
2194         return 0;
2195 }
2196
2197 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2198                                 struct ctdb_vnn_map *vnn_map,
2199                                 TALLOC_CTX *mem_ctx,
2200                                 bool include_self)
2201 {
2202         int i, j, num_nodes;
2203         uint32_t *nodes;
2204
2205         for (i=num_nodes=0;i<vnn_map->size;i++) {
2206                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2207                         continue;
2208                 }
2209                 num_nodes++;
2210         } 
2211
2212         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2213         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2214
2215         for (i=j=0;i<vnn_map->size;i++) {
2216                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2217                         continue;
2218                 }
2219                 nodes[j++] = vnn_map->map[i];
2220         } 
2221
2222         return nodes;
2223 }
2224
2225 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2226                                 struct ctdb_node_map *node_map,
2227                                 TALLOC_CTX *mem_ctx,
2228                                 bool include_self)
2229 {
2230         int i, j, num_nodes;
2231         uint32_t *nodes;
2232
2233         for (i=num_nodes=0;i<node_map->num;i++) {
2234                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2235                         continue;
2236                 }
2237                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2238                         continue;
2239                 }
2240                 num_nodes++;
2241         } 
2242
2243         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2244         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2245
2246         for (i=j=0;i<node_map->num;i++) {
2247                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2248                         continue;
2249                 }
2250                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2251                         continue;
2252                 }
2253                 nodes[j++] = node_map->nodes[i].pnn;
2254         } 
2255
2256         return nodes;
2257 }
2258
2259 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2260                                 struct ctdb_node_map *node_map,
2261                                 TALLOC_CTX *mem_ctx,
2262                                 uint32_t pnn)
2263 {
2264         int i, j, num_nodes;
2265         uint32_t *nodes;
2266
2267         for (i=num_nodes=0;i<node_map->num;i++) {
2268                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2269                         continue;
2270                 }
2271                 if (node_map->nodes[i].pnn == pnn) {
2272                         continue;
2273                 }
2274                 num_nodes++;
2275         } 
2276
2277         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2278         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2279
2280         for (i=j=0;i<node_map->num;i++) {
2281                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2282                         continue;
2283                 }
2284                 if (node_map->nodes[i].pnn == pnn) {
2285                         continue;
2286                 }
2287                 nodes[j++] = node_map->nodes[i].pnn;
2288         } 
2289
2290         return nodes;
2291 }
2292
2293 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2294                                 struct ctdb_node_map *node_map,
2295                                 TALLOC_CTX *mem_ctx,
2296                                 bool include_self)
2297 {
2298         int i, j, num_nodes;
2299         uint32_t *nodes;
2300
2301         for (i=num_nodes=0;i<node_map->num;i++) {
2302                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2303                         continue;
2304                 }
2305                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2306                         continue;
2307                 }
2308                 num_nodes++;
2309         } 
2310
2311         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2312         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2313
2314         for (i=j=0;i<node_map->num;i++) {
2315                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2316                         continue;
2317                 }
2318                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2319                         continue;
2320                 }
2321                 nodes[j++] = node_map->nodes[i].pnn;
2322         } 
2323
2324         return nodes;
2325 }
2326
2327 /* 
2328   this is used to test if a pnn lock exists and if it exists will return
2329   the number of connections that pnn has reported or -1 if that recovery
2330   daemon is not running.
2331 */
2332 int
2333 ctdb_read_pnn_lock(int fd, int32_t pnn)
2334 {
2335         struct flock lock;
2336         char c;
2337
2338         lock.l_type = F_WRLCK;
2339         lock.l_whence = SEEK_SET;
2340         lock.l_start = pnn;
2341         lock.l_len = 1;
2342         lock.l_pid = 0;
2343
2344         if (fcntl(fd, F_GETLK, &lock) != 0) {
2345                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2346                 return -1;
2347         }
2348
2349         if (lock.l_type == F_UNLCK) {
2350                 return -1;
2351         }
2352
2353         if (pread(fd, &c, 1, pnn) == -1) {
2354                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2355                 return -1;
2356         }
2357
2358         return c;
2359 }
2360
2361 /*
2362   get capabilities of a remote node
2363  */
2364 struct ctdb_client_control_state *
2365 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2366 {
2367         struct ctdb_client_control_state *state;
2368
2369         state = ctdb_control_send(ctdb, destnode, 0, 
2370                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2371                            mem_ctx, NULL);
2372         if (state != NULL && !timeval_is_zero(&timeout)) {
2373                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2374         }
2375
2376         return state;
2377 }
2378
2379 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2380 {
2381         int ret;
2382         int32_t res;
2383         TDB_DATA outdata;
2384
2385         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2386         if ( (ret != 0) || (res != 0) ) {
2387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2388                 return -1;
2389         }
2390
2391         if (capabilities) {
2392                 *capabilities = *((uint32_t *)outdata.dptr);
2393         }
2394
2395         return 0;
2396 }
2397
2398 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2399 {
2400         struct ctdb_client_control_state *state;
2401         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2402         int ret;
2403
2404         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2405         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2406         talloc_free(tmp_ctx);
2407         return ret;
2408 }
2409
2410 /**
2411  * check whether a transaction is active on a given db on a given node
2412  */
2413 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2414                                      uint32_t destnode,
2415                                      uint32_t db_id)
2416 {
2417         int32_t status;
2418         int ret;
2419         TDB_DATA indata;
2420
2421         indata.dptr = (uint8_t *)&db_id;
2422         indata.dsize = sizeof(db_id);
2423
2424         ret = ctdb_control(ctdb, destnode, 0,
2425                            CTDB_CONTROL_TRANS2_ACTIVE,
2426                            0, indata, NULL, NULL, &status,
2427                            NULL, NULL);
2428
2429         if (ret != 0) {
2430                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2431                 return -1;
2432         }
2433
2434         return status;
2435 }
2436
2437
2438 struct ctdb_transaction_handle {
2439         struct ctdb_db_context *ctdb_db;
2440         bool in_replay;
2441         /*
2442          * we store the reads and writes done under a transaction:
2443          * - one list stores both reads and writes (m_all),
2444          * - the other just writes (m_write)
2445          */
2446         struct ctdb_marshall_buffer *m_all;
2447         struct ctdb_marshall_buffer *m_write;
2448 };
2449
2450 /* start a transaction on a database */
2451 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2452 {
2453         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2454         return 0;
2455 }
2456
2457 /* start a transaction on a database */
2458 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2459 {
2460         struct ctdb_record_handle *rh;
2461         TDB_DATA key;
2462         TDB_DATA data;
2463         struct ctdb_ltdb_header header;
2464         TALLOC_CTX *tmp_ctx;
2465         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2466         int ret;
2467         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2468         pid_t pid;
2469         int32_t status;
2470
2471         key.dptr = discard_const(keyname);
2472         key.dsize = strlen(keyname);
2473
2474         if (!ctdb_db->persistent) {
2475                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2476                 return -1;
2477         }
2478
2479 again:
2480         tmp_ctx = talloc_new(h);
2481
2482         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2483         if (rh == NULL) {
2484                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2485                 talloc_free(tmp_ctx);
2486                 return -1;
2487         }
2488
2489         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2490                                               CTDB_CURRENT_NODE,
2491                                               ctdb_db->db_id);
2492         if (status == 1) {
2493                 unsigned long int usec = (1000 + random()) % 100000;
2494                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2495                                     "on db_id[0x%08x]. waiting for %lu "
2496                                     "microseconds\n",
2497                                     ctdb_db->db_id, usec));
2498                 talloc_free(tmp_ctx);
2499                 usleep(usec);
2500                 goto again;
2501         }
2502
2503         /*
2504          * store the pid in the database:
2505          * it is not enough that the node is dmaster...
2506          */
2507         pid = getpid();
2508         data.dptr = (unsigned char *)&pid;
2509         data.dsize = sizeof(pid_t);
2510         rh->header.rsn++;
2511         rh->header.dmaster = ctdb_db->ctdb->pnn;
2512         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2513         if (ret != 0) {
2514                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2515                                   "transaction record\n"));
2516                 talloc_free(tmp_ctx);
2517                 return -1;
2518         }
2519
2520         talloc_free(rh);
2521
2522         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2523         if (ret != 0) {
2524                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2525                 talloc_free(tmp_ctx);
2526                 return -1;
2527         }
2528
2529         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2530         if (ret != 0) {
2531                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2532                                  "lock record inside transaction\n"));
2533                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2534                 talloc_free(tmp_ctx);
2535                 goto again;
2536         }
2537
2538         if (header.dmaster != ctdb_db->ctdb->pnn) {
2539                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2540                                    "transaction lock record\n"));
2541                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2542                 talloc_free(tmp_ctx);
2543                 goto again;
2544         }
2545
2546         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2547                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2548                                     "the transaction lock record\n"));
2549                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2550                 talloc_free(tmp_ctx);
2551                 goto again;
2552         }
2553
2554         talloc_free(tmp_ctx);
2555
2556         return 0;
2557 }
2558
2559
2560 /* start a transaction on a database */
2561 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2562                                                        TALLOC_CTX *mem_ctx)
2563 {
2564         struct ctdb_transaction_handle *h;
2565         int ret;
2566
2567         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2568         if (h == NULL) {
2569                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2570                 return NULL;
2571         }
2572
2573         h->ctdb_db = ctdb_db;
2574
2575         ret = ctdb_transaction_fetch_start(h);
2576         if (ret != 0) {
2577                 talloc_free(h);
2578                 return NULL;
2579         }
2580
2581         talloc_set_destructor(h, ctdb_transaction_destructor);
2582
2583         return h;
2584 }
2585
2586
2587
2588 /*
2589   fetch a record inside a transaction
2590  */
2591 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2592                            TALLOC_CTX *mem_ctx, 
2593                            TDB_DATA key, TDB_DATA *data)
2594 {
2595         struct ctdb_ltdb_header header;
2596         int ret;
2597
2598         ZERO_STRUCT(header);
2599
2600         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2601         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2602                 /* record doesn't exist yet */
2603                 *data = tdb_null;
2604                 ret = 0;
2605         }
2606         
2607         if (ret != 0) {
2608                 return ret;
2609         }
2610
2611         if (!h->in_replay) {
2612                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2613                 if (h->m_all == NULL) {
2614                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2615                         return -1;
2616                 }
2617         }
2618
2619         return 0;
2620 }
2621
2622 /*
2623   stores a record inside a transaction
2624  */
2625 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2626                            TDB_DATA key, TDB_DATA data)
2627 {
2628         TALLOC_CTX *tmp_ctx = talloc_new(h);
2629         struct ctdb_ltdb_header header;
2630         TDB_DATA olddata;
2631         int ret;
2632
2633         ZERO_STRUCT(header);
2634
2635         /* we need the header so we can update the RSN */
2636         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2637         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2638                 /* the record doesn't exist - create one with us as dmaster.
2639                    This is only safe because we are in a transaction and this
2640                    is a persistent database */
2641                 ZERO_STRUCT(header);
2642         } else if (ret != 0) {
2643                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2644                 talloc_free(tmp_ctx);
2645                 return ret;
2646         }
2647
2648         if (data.dsize == olddata.dsize &&
2649             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2650                 /* save writing the same data */
2651                 talloc_free(tmp_ctx);
2652                 return 0;
2653         }
2654
2655         header.dmaster = h->ctdb_db->ctdb->pnn;
2656         header.rsn++;
2657
2658         if (!h->in_replay) {
2659                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2660                 if (h->m_all == NULL) {
2661                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2662                         talloc_free(tmp_ctx);
2663                         return -1;
2664                 }
2665         }               
2666
2667         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2668         if (h->m_write == NULL) {
2669                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2670                 talloc_free(tmp_ctx);
2671                 return -1;
2672         }
2673         
2674         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2675
2676         talloc_free(tmp_ctx);
2677         
2678         return ret;
2679 }
2680
2681 /*
2682   replay a transaction
2683  */
2684 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2685 {
2686         int ret, i;
2687         struct ctdb_rec_data *rec = NULL;
2688
2689         h->in_replay = true;
2690         talloc_free(h->m_write);
2691         h->m_write = NULL;
2692
2693         ret = ctdb_transaction_fetch_start(h);
2694         if (ret != 0) {
2695                 return ret;
2696         }
2697
2698         for (i=0;i<h->m_all->count;i++) {
2699                 TDB_DATA key, data;
2700
2701                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2702                 if (rec == NULL) {
2703                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2704                         goto failed;
2705                 }
2706
2707                 if (rec->reqid == 0) {
2708                         /* its a store */
2709                         if (ctdb_transaction_store(h, key, data) != 0) {
2710                                 goto failed;
2711                         }
2712                 } else {
2713                         TDB_DATA data2;
2714                         TALLOC_CTX *tmp_ctx = talloc_new(h);
2715
2716                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
2717                                 talloc_free(tmp_ctx);
2718                                 goto failed;
2719                         }
2720                         if (data2.dsize != data.dsize ||
2721                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
2722                                 /* the record has changed on us - we have to give up */
2723                                 talloc_free(tmp_ctx);
2724                                 goto failed;
2725                         }
2726                         talloc_free(tmp_ctx);
2727                 }
2728         }
2729         
2730         return 0;
2731
2732 failed:
2733         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2734         return -1;
2735 }
2736
2737
2738 /*
2739   commit a transaction
2740  */
2741 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2742 {
2743         int ret, retries=0;
2744         int32_t status;
2745         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
2746         struct timeval timeout;
2747         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
2748
2749         talloc_set_destructor(h, NULL);
2750
2751         /* our commit strategy is quite complex.
2752
2753            - we first try to commit the changes to all other nodes
2754
2755            - if that works, then we commit locally and we are done
2756
2757            - if a commit on another node fails, then we need to cancel
2758              the transaction, then restart the transaction (thus
2759              opening a window of time for a pending recovery to
2760              complete), then replay the transaction, checking all the
2761              reads and writes (checking that reads give the same data,
2762              and writes succeed). Then we retry the transaction to the
2763              other nodes
2764         */
2765
2766 again:
2767         if (h->m_write == NULL) {
2768                 /* no changes were made */
2769                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2770                 talloc_free(h);
2771                 return 0;
2772         }
2773
2774         /* tell ctdbd to commit to the other nodes */
2775         timeout = timeval_current_ofs(1, 0);
2776         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
2777                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
2778                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
2779                            &timeout, NULL);
2780         if (ret != 0 || status != 0) {
2781                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2782                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
2783                                      ", retrying after 1 second...\n",
2784                                      (retries==0)?"":"retry "));
2785                 sleep(1);
2786
2787                 if (ret != 0) {
2788                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
2789                 } else {
2790                         /* work out what error code we will give if we 
2791                            have to fail the operation */
2792                         switch ((enum ctdb_trans2_commit_error)status) {
2793                         case CTDB_TRANS2_COMMIT_SUCCESS:
2794                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
2795                         case CTDB_TRANS2_COMMIT_TIMEOUT:
2796                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
2797                                 break;
2798                         case CTDB_TRANS2_COMMIT_ALLFAIL:
2799                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
2800                                 break;
2801                         }
2802                 }
2803
2804                 if (++retries == 100) {
2805                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
2806                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
2807                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
2808                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
2809                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
2810                         talloc_free(h);
2811                         return -1;
2812                 }               
2813
2814                 if (ctdb_replay_transaction(h) != 0) {
2815                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
2816                                           "transaction on db 0x%08x, "
2817                                           "failure control =%u\n",
2818                                           h->ctdb_db->db_id,
2819                                           (unsigned)failure_control));
2820                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
2821                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
2822                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
2823                         talloc_free(h);
2824                         return -1;
2825                 }
2826                 goto again;
2827         } else {
2828                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
2829         }
2830
2831         /* do the real commit locally */
2832         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
2833         if (ret != 0) {
2834                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
2835                                   "on db id 0x%08x locally, "
2836                                   "failure_control=%u\n",
2837                                   h->ctdb_db->db_id,
2838                                   (unsigned)failure_control));
2839                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
2840                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
2841                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
2842                 talloc_free(h);
2843                 return ret;
2844         }
2845
2846         /* tell ctdbd that we are finished with our local commit */
2847         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
2848                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
2849                      tdb_null, NULL, NULL, NULL, NULL, NULL);
2850         talloc_free(h);
2851         return 0;
2852 }
2853
2854 /*
2855   recovery daemon ping to main daemon
2856  */
2857 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
2858 {
2859         int ret;
2860         int32_t res;
2861
2862         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
2863                            ctdb, NULL, &res, NULL, NULL);
2864         if (ret != 0 || res != 0) {
2865                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
2866                 return -1;
2867         }
2868
2869         return 0;
2870 }
2871
2872 /* when forking the main daemon and the child process needs to connect back
2873  * to the daemon as a client process, this function can be used to change
2874  * the ctdb context from daemon into client mode
2875  */
2876 int switch_from_server_to_client(struct ctdb_context *ctdb)
2877 {
2878         int ret;
2879
2880         /* shutdown the transport */
2881         if (ctdb->methods) {
2882                 ctdb->methods->shutdown(ctdb);
2883         }
2884
2885         /* get a new event context */
2886         talloc_free(ctdb->ev);
2887         ctdb->ev = event_context_init(ctdb);
2888
2889         close(ctdb->daemon.sd);
2890         ctdb->daemon.sd = -1;
2891
2892         /* initialise ctdb */
2893         ret = ctdb_socket_connect(ctdb);
2894         if (ret != 0) {
2895                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
2896                 return -1;
2897         }
2898
2899          return 0;
2900 }
2901
2902 /*
2903   get the status of running the monitor eventscripts: NULL means never run.
2904  */
2905 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
2906                 struct timeval timeout, uint32_t destnode, 
2907                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
2908                 struct ctdb_scripts_wire **script_status)
2909 {
2910         int ret;
2911         TDB_DATA outdata, indata;
2912         int32_t res;
2913         uint32_t uinttype = type;
2914
2915         indata.dptr = (uint8_t *)&uinttype;
2916         indata.dsize = sizeof(uinttype);
2917
2918         ret = ctdb_control(ctdb, destnode, 0, 
2919                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
2920                            mem_ctx, &outdata, &res, &timeout, NULL);
2921         if (ret != 0 || res != 0) {
2922                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
2923                 return -1;
2924         }
2925
2926         if (outdata.dsize == 0) {
2927                 *script_status = NULL;
2928         } else {
2929                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2930                 talloc_free(outdata.dptr);
2931         }
2932                     
2933         return 0;
2934 }
2935
2936 /*
2937   tell the main daemon how long it took to lock the reclock file
2938  */
2939 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
2940 {
2941         int ret;
2942         int32_t res;
2943         TDB_DATA data;
2944
2945         data.dptr = (uint8_t *)&latency;
2946         data.dsize = sizeof(latency);
2947
2948         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
2949                            ctdb, NULL, &res, NULL, NULL);
2950         if (ret != 0 || res != 0) {
2951                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   get the name of the reclock file
2960  */
2961 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
2962                          uint32_t destnode, TALLOC_CTX *mem_ctx,
2963                          const char **name)
2964 {
2965         int ret;
2966         int32_t res;
2967         TDB_DATA data;
2968
2969         ret = ctdb_control(ctdb, destnode, 0, 
2970                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
2971                            mem_ctx, &data, &res, &timeout, NULL);
2972         if (ret != 0 || res != 0) {
2973                 return -1;
2974         }
2975
2976         if (data.dsize == 0) {
2977                 *name = NULL;
2978         } else {
2979                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
2980         }
2981         talloc_free(data.dptr);
2982
2983         return 0;
2984 }
2985
2986 /*
2987   set the reclock filename for a node
2988  */
2989 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
2990 {
2991         int ret;
2992         TDB_DATA data;
2993         int32_t res;
2994
2995         if (reclock == NULL) {
2996                 data.dsize = 0;
2997                 data.dptr  = NULL;
2998         } else {
2999                 data.dsize = strlen(reclock) + 1;
3000                 data.dptr  = discard_const(reclock);
3001         }
3002
3003         ret = ctdb_control(ctdb, destnode, 0, 
3004                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3005                            NULL, NULL, &res, &timeout, NULL);
3006         if (ret != 0 || res != 0) {
3007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3008                 return -1;
3009         }
3010
3011         return 0;
3012 }
3013
3014 /*
3015   stop a node
3016  */
3017 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3018 {
3019         int ret;
3020         int32_t res;
3021
3022         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3023                            ctdb, NULL, &res, &timeout, NULL);
3024         if (ret != 0 || res != 0) {
3025                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3026                 return -1;
3027         }
3028
3029         return 0;
3030 }
3031
3032 /*
3033   continue a node
3034  */
3035 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3036 {
3037         int ret;
3038
3039         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3040                            ctdb, NULL, NULL, &timeout, NULL);
3041         if (ret != 0) {
3042                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3043                 return -1;
3044         }
3045
3046         return 0;
3047 }
3048
3049 /*
3050   set the natgw state for a node
3051  */
3052 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3053 {
3054         int ret;
3055         TDB_DATA data;
3056         int32_t res;
3057
3058         data.dsize = sizeof(natgwstate);
3059         data.dptr  = (uint8_t *)&natgwstate;
3060
3061         ret = ctdb_control(ctdb, destnode, 0, 
3062                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3063                            NULL, NULL, &res, &timeout, NULL);
3064         if (ret != 0 || res != 0) {
3065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3066                 return -1;
3067         }
3068
3069         return 0;
3070 }
3071
3072 /*
3073   set the lmaster role for a node
3074  */
3075 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3076 {
3077         int ret;
3078         TDB_DATA data;
3079         int32_t res;
3080
3081         data.dsize = sizeof(lmasterrole);
3082         data.dptr  = (uint8_t *)&lmasterrole;
3083
3084         ret = ctdb_control(ctdb, destnode, 0, 
3085                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3086                            NULL, NULL, &res, &timeout, NULL);
3087         if (ret != 0 || res != 0) {
3088                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3089                 return -1;
3090         }
3091
3092         return 0;
3093 }
3094
3095 /*
3096   set the recmaster role for a node
3097  */
3098 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3099 {
3100         int ret;
3101         TDB_DATA data;
3102         int32_t res;
3103
3104         data.dsize = sizeof(recmasterrole);
3105         data.dptr  = (uint8_t *)&recmasterrole;
3106
3107         ret = ctdb_control(ctdb, destnode, 0, 
3108                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3109                            NULL, NULL, &res, &timeout, NULL);
3110         if (ret != 0 || res != 0) {
3111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3112                 return -1;
3113         }
3114
3115         return 0;
3116 }
3117
3118 /* enable an eventscript
3119  */
3120 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3121 {
3122         int ret;
3123         TDB_DATA data;
3124         int32_t res;
3125
3126         data.dsize = strlen(script) + 1;
3127         data.dptr  = discard_const(script);
3128
3129         ret = ctdb_control(ctdb, destnode, 0, 
3130                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3131                            NULL, NULL, &res, &timeout, NULL);
3132         if (ret != 0 || res != 0) {
3133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3134                 return -1;
3135         }
3136
3137         return 0;
3138 }
3139
3140 /* disable an eventscript
3141  */
3142 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3143 {
3144         int ret;
3145         TDB_DATA data;
3146         int32_t res;
3147
3148         data.dsize = strlen(script) + 1;
3149         data.dptr  = discard_const(script);
3150
3151         ret = ctdb_control(ctdb, destnode, 0, 
3152                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3153                            NULL, NULL, &res, &timeout, NULL);
3154         if (ret != 0 || res != 0) {
3155                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3156                 return -1;
3157         }
3158
3159         return 0;
3160 }
3161
3162
3163 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3164 {
3165         int ret;
3166         TDB_DATA data;
3167         int32_t res;
3168
3169         data.dsize = sizeof(*bantime);
3170         data.dptr  = (uint8_t *)bantime;
3171
3172         ret = ctdb_control(ctdb, destnode, 0, 
3173                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3174                            NULL, NULL, &res, &timeout, NULL);
3175         if (ret != 0 || res != 0) {
3176                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3177                 return -1;
3178         }
3179
3180         return 0;
3181 }
3182
3183
3184 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3185 {
3186         int ret;
3187         TDB_DATA outdata;
3188         int32_t res;
3189         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3190
3191         ret = ctdb_control(ctdb, destnode, 0, 
3192                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3193                            tmp_ctx, &outdata, &res, &timeout, NULL);
3194         if (ret != 0 || res != 0) {
3195                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3196                 talloc_free(tmp_ctx);
3197                 return -1;
3198         }
3199
3200         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3201         talloc_free(tmp_ctx);
3202
3203         return 0;
3204 }
3205
3206
3207 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3208 {
3209         int ret;
3210         int32_t res;
3211         TDB_DATA data;
3212         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3213
3214         data.dptr = (uint8_t*)db_prio;
3215         data.dsize = sizeof(*db_prio);
3216
3217         ret = ctdb_control(ctdb, destnode, 0, 
3218                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3219                            tmp_ctx, NULL, &res, &timeout, NULL);
3220         if (ret != 0 || res != 0) {
3221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3222                 talloc_free(tmp_ctx);
3223                 return -1;
3224         }
3225
3226         talloc_free(tmp_ctx);
3227
3228         return 0;
3229 }
3230
3231 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3232 {
3233         int ret;
3234         int32_t res;
3235         TDB_DATA data;
3236         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3237
3238         data.dptr = (uint8_t*)&db_id;
3239         data.dsize = sizeof(db_id);
3240
3241         ret = ctdb_control(ctdb, destnode, 0, 
3242                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3243                            tmp_ctx, NULL, &res, &timeout, NULL);
3244         if (ret != 0 || res < 0) {
3245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3246                 talloc_free(tmp_ctx);
3247                 return -1;
3248         }
3249
3250         if (priority) {
3251                 *priority = res;
3252         }
3253
3254         talloc_free(tmp_ctx);
3255
3256         return 0;
3257 }
3258
3259 /* time out handler for ctdb_control */
3260 void ctdb_control_timeout_func(struct event_context *ev, struct timed_event *te, 
3261         struct timeval t, void *private_data)
3262 {
3263         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
3264
3265         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
3266                          "dstnode:%u\n", state->reqid, state->c->opcode,
3267                          state->c->hdr.destnode));
3268
3269         state->state = CTDB_CONTROL_TIMEOUT;
3270
3271         /* if we had a callback registered for this control, pull the response
3272            and call the callback.
3273         */
3274         if (state->async.fn) {
3275                 event_add_timed(state->ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
3276         }
3277 }
3278