move messaging functions into libctdb
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36 struct ctdb_record_handle {
37         struct ctdb_db_context *ctdb_db;
38         TDB_DATA key;
39         TDB_DATA *data;
40         struct ctdb_ltdb_header header;
41 };
42
43
44 /*
45   make a recv call to the local ctdb daemon - called from client context
46
47   This is called when the program wants to wait for a ctdb_call to complete and get the 
48   results. This call will block unless the call has already completed.
49 */
50 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
51 {
52         if (state == NULL) {
53                 return -1;
54         }
55
56         while (state->state < CTDB_CALL_DONE) {
57                 event_loop_once(state->ctdb_db->ctdb->ev);
58         }
59         if (state->state != CTDB_CALL_DONE) {
60                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
61                 talloc_free(state);
62                 return -1;
63         }
64
65         if (state->call->reply_data.dsize) {
66                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
67                                                       state->call->reply_data.dptr,
68                                                       state->call->reply_data.dsize);
69                 call->reply_data.dsize = state->call->reply_data.dsize;
70         } else {
71                 call->reply_data.dptr = NULL;
72                 call->reply_data.dsize = 0;
73         }
74         call->status = state->call->status;
75         talloc_free(state);
76
77         return 0;
78 }
79
80
81
82
83
84
85
86 /*
87   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
88 */
89 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
90 {
91         struct ctdb_client_call_state *state;
92
93         state = ctdb_call_send(ctdb_db, call);
94         return ctdb_call_recv(state, call);
95 }
96
97
98
99
100
101 /*
102   cancel a ctdb_fetch_lock operation, releasing the lock
103  */
104 static int fetch_lock_destructor(struct ctdb_record_handle *h)
105 {
106         ctdb_ltdb_unlock(h->ctdb_db, h->key);
107         return 0;
108 }
109
110 /*
111   force the migration of a record to this node
112  */
113 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
114 {
115         struct ctdb_call call;
116         ZERO_STRUCT(call);
117         call.call_id = CTDB_NULL_FUNC;
118         call.key = key;
119         call.flags = CTDB_IMMEDIATE_MIGRATION;
120         return ctdb_call(ctdb_db, &call);
121 }
122
123 /*
124   get a lock on a record, and return the records data. Blocks until it gets the lock
125  */
126 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
127                                            TDB_DATA key, TDB_DATA *data)
128 {
129         int ret;
130         struct ctdb_record_handle *h;
131
132         /*
133           procedure is as follows:
134
135           1) get the chain lock. 
136           2) check if we are dmaster
137           3) if we are the dmaster then return handle 
138           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
139              reply from ctdbd
140           5) when we get the reply, goto (1)
141          */
142
143         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
144         if (h == NULL) {
145                 return NULL;
146         }
147
148         h->ctdb_db = ctdb_db;
149         h->key     = key;
150         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
151         if (h->key.dptr == NULL) {
152                 talloc_free(h);
153                 return NULL;
154         }
155         h->data    = data;
156
157         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
158                  (const char *)key.dptr));
159
160 again:
161         /* step 1 - get the chain lock */
162         ret = ctdb_ltdb_lock(ctdb_db, key);
163         if (ret != 0) {
164                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
170
171         talloc_set_destructor(h, fetch_lock_destructor);
172
173         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
174
175         /* when torturing, ensure we test the remote path */
176         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
177             random() % 5 == 0) {
178                 h->header.dmaster = (uint32_t)-1;
179         }
180
181
182         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
183
184         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
185                 ctdb_ltdb_unlock(ctdb_db, key);
186                 ret = ctdb_client_force_migration(ctdb_db, key);
187                 if (ret != 0) {
188                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
189                         talloc_free(h);
190                         return NULL;
191                 }
192                 goto again;
193         }
194
195         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
196         return h;
197 }
198
199 /*
200   store some data to the record that was locked with ctdb_fetch_lock()
201 */
202 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
203 {
204         if (h->ctdb_db->persistent) {
205                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
206                 return -1;
207         }
208
209         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
210 }
211
212 /*
213   non-locking fetch of a record
214  */
215 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
216                TDB_DATA key, TDB_DATA *data)
217 {
218         struct ctdb_call call;
219         int ret;
220
221         call.call_id = CTDB_FETCH_FUNC;
222         call.call_data.dptr = NULL;
223         call.call_data.dsize = 0;
224
225         ret = ctdb_call(ctdb_db, &call);
226
227         if (ret == 0) {
228                 *data = call.reply_data;
229                 talloc_steal(mem_ctx, data->dptr);
230         }
231
232         return ret;
233 }
234
235
236
237
238
239
240
241
242 /*
243   send a ctdb control message
244   timeout specifies how long we should wait for a reply.
245   if timeout is NULL we wait indefinitely
246  */
247 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
248                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
249                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
250                  struct timeval *timeout,
251                  char **errormsg)
252 {
253         struct ctdb_client_control_state *state;
254
255         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
256                         flags, data, mem_ctx,
257                         timeout, errormsg);
258         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
259                         errormsg);
260 }
261
262
263
264
265 /*
266   a process exists call. Returns 0 if process exists, -1 otherwise
267  */
268 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
269 {
270         int ret;
271         TDB_DATA data;
272         int32_t status;
273
274         data.dptr = (uint8_t*)&pid;
275         data.dsize = sizeof(pid);
276
277         ret = ctdb_control(ctdb, destnode, 0, 
278                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
279                            NULL, NULL, &status, NULL, NULL);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
282                 return -1;
283         }
284
285         return status;
286 }
287
288 /*
289   get remote statistics
290  */
291 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
292 {
293         int ret;
294         TDB_DATA data;
295         int32_t res;
296
297         ret = ctdb_control(ctdb, destnode, 0, 
298                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
299                            ctdb, &data, &res, NULL, NULL);
300         if (ret != 0 || res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
302                 return -1;
303         }
304
305         if (data.dsize != sizeof(struct ctdb_statistics)) {
306                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
307                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
308                       return -1;
309         }
310
311         *status = *(struct ctdb_statistics *)data.dptr;
312         talloc_free(data.dptr);
313                         
314         return 0;
315 }
316
317 /*
318   shutdown a remote ctdb node
319  */
320 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
321 {
322         struct ctdb_client_control_state *state;
323
324         state = ctdb_control_send(ctdb, destnode, 0, 
325                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
326                            NULL, &timeout, NULL);
327         if (state == NULL) {
328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
329                 return -1;
330         }
331
332         return 0;
333 }
334
335 /*
336   get vnn map from a remote node
337  */
338 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
339 {
340         int ret;
341         TDB_DATA outdata;
342         int32_t res;
343         struct ctdb_vnn_map_wire *map;
344
345         ret = ctdb_control(ctdb, destnode, 0, 
346                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
347                            mem_ctx, &outdata, &res, &timeout, NULL);
348         if (ret != 0 || res != 0) {
349                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
350                 return -1;
351         }
352         
353         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
354         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
355             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
356                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
357                 return -1;
358         }
359
360         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
361         CTDB_NO_MEMORY(ctdb, *vnnmap);
362         (*vnnmap)->generation = map->generation;
363         (*vnnmap)->size       = map->size;
364         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
365
366         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
367         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
368         talloc_free(outdata.dptr);
369                     
370         return 0;
371 }
372
373
374 /*
375   get the recovery mode of a remote node
376  */
377 struct ctdb_client_control_state *
378 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
379 {
380         return ctdb_control_send(ctdb, destnode, 0, 
381                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
382                            mem_ctx, &timeout, NULL);
383 }
384
385 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
386 {
387         int ret;
388         int32_t res;
389
390         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
391         if (ret != 0) {
392                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
393                 return -1;
394         }
395
396         if (recmode) {
397                 *recmode = (uint32_t)res;
398         }
399
400         return 0;
401 }
402
403 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
404 {
405         struct ctdb_client_control_state *state;
406
407         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
408         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
409 }
410
411
412
413
414 /*
415   set the recovery mode of a remote node
416  */
417 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
418 {
419         int ret;
420         TDB_DATA data;
421         int32_t res;
422
423         data.dsize = sizeof(uint32_t);
424         data.dptr = (unsigned char *)&recmode;
425
426         ret = ctdb_control(ctdb, destnode, 0, 
427                            CTDB_CONTROL_SET_RECMODE, 0, data, 
428                            NULL, NULL, &res, &timeout, NULL);
429         if (ret != 0 || res != 0) {
430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
431                 return -1;
432         }
433
434         return 0;
435 }
436
437
438
439 /*
440   get the recovery master of a remote node
441  */
442 struct ctdb_client_control_state *
443 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
444                         struct timeval timeout, uint32_t destnode)
445 {
446         return ctdb_control_send(ctdb, destnode, 0, 
447                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
448                            mem_ctx, &timeout, NULL);
449 }
450
451 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
452 {
453         int ret;
454         int32_t res;
455
456         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
457         if (ret != 0) {
458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
459                 return -1;
460         }
461
462         if (recmaster) {
463                 *recmaster = (uint32_t)res;
464         }
465
466         return 0;
467 }
468
469 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
470 {
471         struct ctdb_client_control_state *state;
472
473         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
474         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
475 }
476
477
478 /*
479   set the recovery master of a remote node
480  */
481 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
482 {
483         int ret;
484         TDB_DATA data;
485         int32_t res;
486
487         ZERO_STRUCT(data);
488         data.dsize = sizeof(uint32_t);
489         data.dptr = (unsigned char *)&recmaster;
490
491         ret = ctdb_control(ctdb, destnode, 0, 
492                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
493                            NULL, NULL, &res, &timeout, NULL);
494         if (ret != 0 || res != 0) {
495                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
496                 return -1;
497         }
498
499         return 0;
500 }
501
502
503 /*
504   get a list of databases off a remote node
505  */
506 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
507                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
508 {
509         int ret;
510         TDB_DATA outdata;
511         int32_t res;
512
513         ret = ctdb_control(ctdb, destnode, 0, 
514                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
515                            mem_ctx, &outdata, &res, &timeout, NULL);
516         if (ret != 0 || res != 0) {
517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
518                 return -1;
519         }
520
521         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
522         talloc_free(outdata.dptr);
523                     
524         return 0;
525 }
526
527 /*
528   get a list of nodes (vnn and flags ) from a remote node
529  */
530 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
531                 struct timeval timeout, uint32_t destnode, 
532                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
533 {
534         int ret;
535         TDB_DATA outdata;
536         int32_t res;
537
538         ret = ctdb_control(ctdb, destnode, 0, 
539                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
540                            mem_ctx, &outdata, &res, &timeout, NULL);
541         if (ret == 0 && res == -1 && outdata.dsize == 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
543                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
544         }
545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
547                 return -1;
548         }
549
550         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
551         talloc_free(outdata.dptr);
552                     
553         return 0;
554 }
555
556 /*
557   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
558  */
559 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
560                 struct timeval timeout, uint32_t destnode, 
561                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
562 {
563         int ret, i, len;
564         TDB_DATA outdata;
565         struct ctdb_node_mapv4 *nodemapv4;
566         int32_t res;
567
568         ret = ctdb_control(ctdb, destnode, 0, 
569                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
570                            mem_ctx, &outdata, &res, &timeout, NULL);
571         if (ret != 0 || res != 0 || outdata.dsize == 0) {
572                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
573                 return -1;
574         }
575
576         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
577
578         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
579         (*nodemap) = talloc_zero_size(mem_ctx, len);
580         CTDB_NO_MEMORY(ctdb, (*nodemap));
581
582         (*nodemap)->num = nodemapv4->num;
583         for (i=0; i<nodemapv4->num; i++) {
584                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
585                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
586                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
587                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
588         }
589                 
590         talloc_free(outdata.dptr);
591                     
592         return 0;
593 }
594
595 /*
596   drop the transport, reload the nodes file and restart the transport
597  */
598 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
599                     struct timeval timeout, uint32_t destnode)
600 {
601         int ret;
602         int32_t res;
603
604         ret = ctdb_control(ctdb, destnode, 0, 
605                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
606                            NULL, NULL, &res, &timeout, NULL);
607         if (ret != 0 || res != 0) {
608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
609                 return -1;
610         }
611
612         return 0;
613 }
614
615
616 /*
617   set vnn map on a node
618  */
619 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
620                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
621 {
622         int ret;
623         TDB_DATA data;
624         int32_t res;
625         struct ctdb_vnn_map_wire *map;
626         size_t len;
627
628         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
629         map = talloc_size(mem_ctx, len);
630         CTDB_NO_MEMORY(ctdb, map);
631
632         map->generation = vnnmap->generation;
633         map->size = vnnmap->size;
634         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
635         
636         data.dsize = len;
637         data.dptr  = (uint8_t *)map;
638
639         ret = ctdb_control(ctdb, destnode, 0, 
640                            CTDB_CONTROL_SETVNNMAP, 0, data, 
641                            NULL, NULL, &res, &timeout, NULL);
642         if (ret != 0 || res != 0) {
643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
644                 return -1;
645         }
646
647         talloc_free(map);
648
649         return 0;
650 }
651
652
653 /*
654   async send for pull database
655  */
656 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
657         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
658         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
659 {
660         TDB_DATA indata;
661         struct ctdb_control_pulldb *pull;
662         struct ctdb_client_control_state *state;
663
664         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
665         CTDB_NO_MEMORY_NULL(ctdb, pull);
666
667         pull->db_id   = dbid;
668         pull->lmaster = lmaster;
669
670         indata.dsize = sizeof(struct ctdb_control_pulldb);
671         indata.dptr  = (unsigned char *)pull;
672
673         state = ctdb_control_send(ctdb, destnode, 0, 
674                                   CTDB_CONTROL_PULL_DB, 0, indata, 
675                                   mem_ctx, &timeout, NULL);
676         talloc_free(pull);
677
678         return state;
679 }
680
681 /*
682   async recv for pull database
683  */
684 int ctdb_ctrl_pulldb_recv(
685         struct ctdb_context *ctdb, 
686         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
687         TDB_DATA *outdata)
688 {
689         int ret;
690         int32_t res;
691
692         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
693         if ( (ret != 0) || (res != 0) ){
694                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
695                 return -1;
696         }
697
698         return 0;
699 }
700
701 /*
702   pull all keys and records for a specific database on a node
703  */
704 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
705                 uint32_t dbid, uint32_t lmaster, 
706                 TALLOC_CTX *mem_ctx, struct timeval timeout,
707                 TDB_DATA *outdata)
708 {
709         struct ctdb_client_control_state *state;
710
711         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
712                                       timeout);
713         
714         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
715 }
716
717
718 /*
719   change dmaster for all keys in the database to the new value
720  */
721 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
722                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
723 {
724         int ret;
725         TDB_DATA indata;
726         int32_t res;
727
728         indata.dsize = 2*sizeof(uint32_t);
729         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
730
731         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
732         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
733
734         ret = ctdb_control(ctdb, destnode, 0, 
735                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
736                            NULL, NULL, &res, &timeout, NULL);
737         if (ret != 0 || res != 0) {
738                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
739                 return -1;
740         }
741
742         return 0;
743 }
744
745 /*
746   ping a node, return number of clients connected
747  */
748 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
749 {
750         int ret;
751         int32_t res;
752
753         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
754                            tdb_null, NULL, NULL, &res, NULL, NULL);
755         if (ret != 0) {
756                 return -1;
757         }
758         return res;
759 }
760
761 /*
762   find the real path to a ltdb 
763  */
764 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
765                    const char **path)
766 {
767         int ret;
768         int32_t res;
769         TDB_DATA data;
770
771         data.dptr = (uint8_t *)&dbid;
772         data.dsize = sizeof(dbid);
773
774         ret = ctdb_control(ctdb, destnode, 0, 
775                            CTDB_CONTROL_GETDBPATH, 0, data, 
776                            mem_ctx, &data, &res, &timeout, NULL);
777         if (ret != 0 || res != 0) {
778                 return -1;
779         }
780
781         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
782         if ((*path) == NULL) {
783                 return -1;
784         }
785
786         talloc_free(data.dptr);
787
788         return 0;
789 }
790
791 /*
792   find the name of a db 
793  */
794 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
795                    const char **name)
796 {
797         int ret;
798         int32_t res;
799         TDB_DATA data;
800
801         data.dptr = (uint8_t *)&dbid;
802         data.dsize = sizeof(dbid);
803
804         ret = ctdb_control(ctdb, destnode, 0, 
805                            CTDB_CONTROL_GET_DBNAME, 0, data, 
806                            mem_ctx, &data, &res, &timeout, NULL);
807         if (ret != 0 || res != 0) {
808                 return -1;
809         }
810
811         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
812         if ((*name) == NULL) {
813                 return -1;
814         }
815
816         talloc_free(data.dptr);
817
818         return 0;
819 }
820
821 /*
822   get the health status of a db
823  */
824 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
825                           struct timeval timeout,
826                           uint32_t destnode,
827                           uint32_t dbid, TALLOC_CTX *mem_ctx,
828                           const char **reason)
829 {
830         int ret;
831         int32_t res;
832         TDB_DATA data;
833
834         data.dptr = (uint8_t *)&dbid;
835         data.dsize = sizeof(dbid);
836
837         ret = ctdb_control(ctdb, destnode, 0,
838                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
839                            mem_ctx, &data, &res, &timeout, NULL);
840         if (ret != 0 || res != 0) {
841                 return -1;
842         }
843
844         if (data.dsize == 0) {
845                 (*reason) = NULL;
846                 return 0;
847         }
848
849         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
850         if ((*reason) == NULL) {
851                 return -1;
852         }
853
854         talloc_free(data.dptr);
855
856         return 0;
857 }
858
859 /*
860   create a database
861  */
862 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
863                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
864 {
865         int ret;
866         int32_t res;
867         TDB_DATA data;
868
869         data.dptr = discard_const(name);
870         data.dsize = strlen(name)+1;
871
872         ret = ctdb_control(ctdb, destnode, 0, 
873                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
874                            0, data, 
875                            mem_ctx, &data, &res, &timeout, NULL);
876
877         if (ret != 0 || res != 0) {
878                 return -1;
879         }
880
881         return 0;
882 }
883
884 /*
885   get debug level on a node
886  */
887 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
888 {
889         int ret;
890         int32_t res;
891         TDB_DATA data;
892
893         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
894                            ctdb, &data, &res, NULL, NULL);
895         if (ret != 0 || res != 0) {
896                 return -1;
897         }
898         if (data.dsize != sizeof(int32_t)) {
899                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
900                          (unsigned)data.dsize));
901                 return -1;
902         }
903         *level = *(int32_t *)data.dptr;
904         talloc_free(data.dptr);
905         return 0;
906 }
907
908 /*
909   set debug level on a node
910  */
911 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
912 {
913         int ret;
914         int32_t res;
915         TDB_DATA data;
916
917         data.dptr = (uint8_t *)&level;
918         data.dsize = sizeof(level);
919
920         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
921                            NULL, NULL, &res, NULL, NULL);
922         if (ret != 0 || res != 0) {
923                 return -1;
924         }
925         return 0;
926 }
927
928
929 /*
930   get a list of connected nodes
931  */
932 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
933                                 struct timeval timeout,
934                                 TALLOC_CTX *mem_ctx,
935                                 uint32_t *num_nodes)
936 {
937         struct ctdb_node_map *map=NULL;
938         int ret, i;
939         uint32_t *nodes;
940
941         *num_nodes = 0;
942
943         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
944         if (ret != 0) {
945                 return NULL;
946         }
947
948         nodes = talloc_array(mem_ctx, uint32_t, map->num);
949         if (nodes == NULL) {
950                 return NULL;
951         }
952
953         for (i=0;i<map->num;i++) {
954                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
955                         nodes[*num_nodes] = map->nodes[i].pnn;
956                         (*num_nodes)++;
957                 }
958         }
959
960         return nodes;
961 }
962
963
964 /*
965   reset remote status
966  */
967 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
968 {
969         int ret;
970         int32_t res;
971
972         ret = ctdb_control(ctdb, destnode, 0, 
973                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
974                            NULL, NULL, &res, NULL, NULL);
975         if (ret != 0 || res != 0) {
976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
977                 return -1;
978         }
979         return 0;
980 }
981
982 /*
983   this is the dummy null procedure that all databases support
984 */
985 static int ctdb_null_func(struct ctdb_call_info *call)
986 {
987         return 0;
988 }
989
990 /*
991   this is a plain fetch procedure that all databases support
992 */
993 static int ctdb_fetch_func(struct ctdb_call_info *call)
994 {
995         call->reply_data = &call->record_data;
996         return 0;
997 }
998
999 /*
1000   attach to a specific database - client call
1001 */
1002 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1003 {
1004         struct ctdb_db_context *ctdb_db;
1005         TDB_DATA data;
1006         int ret;
1007         int32_t res;
1008
1009         ctdb_db = ctdb_db_handle(ctdb, name);
1010         if (ctdb_db) {
1011                 return ctdb_db;
1012         }
1013
1014         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1015         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1016
1017         ctdb_db->ctdb = ctdb;
1018         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1019         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1020
1021         data.dptr = discard_const(name);
1022         data.dsize = strlen(name)+1;
1023
1024         /* tell ctdb daemon to attach */
1025         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1026                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1027                            0, data, ctdb_db, &data, &res, NULL, NULL);
1028         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1029                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1030                 talloc_free(ctdb_db);
1031                 return NULL;
1032         }
1033         
1034         ctdb_db->db_id = *(uint32_t *)data.dptr;
1035         talloc_free(data.dptr);
1036
1037         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1038         if (ret != 0) {
1039                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1040                 talloc_free(ctdb_db);
1041                 return NULL;
1042         }
1043
1044         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1045         if (ctdb->valgrinding) {
1046                 tdb_flags |= TDB_NOMMAP;
1047         }
1048         tdb_flags |= TDB_DISALLOW_NESTING;
1049
1050         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1051         if (ctdb_db->ltdb == NULL) {
1052                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1053                 talloc_free(ctdb_db);
1054                 return NULL;
1055         }
1056
1057         ctdb_db->persistent = persistent;
1058
1059         DLIST_ADD(ctdb->db_list, ctdb_db);
1060
1061         /* add well known functions */
1062         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1063         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1064
1065         return ctdb_db;
1066 }
1067
1068
1069 /*
1070   setup a call for a database
1071  */
1072 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1073 {
1074         struct ctdb_registered_call *call;
1075
1076 #if 0
1077         TDB_DATA data;
1078         int32_t status;
1079         struct ctdb_control_set_call c;
1080         int ret;
1081
1082         /* this is no longer valid with the separate daemon architecture */
1083         c.db_id = ctdb_db->db_id;
1084         c.fn    = fn;
1085         c.id    = id;
1086
1087         data.dptr = (uint8_t *)&c;
1088         data.dsize = sizeof(c);
1089
1090         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1091                            data, NULL, NULL, &status, NULL, NULL);
1092         if (ret != 0 || status != 0) {
1093                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1094                 return -1;
1095         }
1096 #endif
1097
1098         /* also register locally */
1099         call = talloc(ctdb_db, struct ctdb_registered_call);
1100         call->fn = fn;
1101         call->id = id;
1102
1103         DLIST_ADD(ctdb_db->calls, call);        
1104         return 0;
1105 }
1106
1107
1108 struct traverse_state {
1109         bool done;
1110         uint32_t count;
1111         ctdb_traverse_func fn;
1112         void *private_data;
1113 };
1114
1115 /*
1116   called on each key during a ctdb_traverse
1117  */
1118 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1119 {
1120         struct traverse_state *state = (struct traverse_state *)p;
1121         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1122         TDB_DATA key;
1123
1124         if (data.dsize < sizeof(uint32_t) ||
1125             d->length != data.dsize) {
1126                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1127                 state->done = True;
1128                 return;
1129         }
1130
1131         key.dsize = d->keylen;
1132         key.dptr  = &d->data[0];
1133         data.dsize = d->datalen;
1134         data.dptr = &d->data[d->keylen];
1135
1136         if (key.dsize == 0 && data.dsize == 0) {
1137                 /* end of traverse */
1138                 state->done = True;
1139                 return;
1140         }
1141
1142         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1143                 /* empty records are deleted records in ctdb */
1144                 return;
1145         }
1146
1147         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1148                 state->done = True;
1149         }
1150
1151         state->count++;
1152 }
1153
1154
1155 /*
1156   start a cluster wide traverse, calling the supplied fn on each record
1157   return the number of records traversed, or -1 on error
1158  */
1159 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1160 {
1161         TDB_DATA data;
1162         struct ctdb_traverse_start t;
1163         int32_t status;
1164         int ret;
1165         uint64_t srvid = (getpid() | 0xFLL<<60);
1166         struct traverse_state state;
1167
1168         state.done = False;
1169         state.count = 0;
1170         state.private_data = private_data;
1171         state.fn = fn;
1172
1173         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1174         if (ret != 0) {
1175                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1176                 return -1;
1177         }
1178
1179         t.db_id = ctdb_db->db_id;
1180         t.srvid = srvid;
1181         t.reqid = 0;
1182
1183         data.dptr = (uint8_t *)&t;
1184         data.dsize = sizeof(t);
1185
1186         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1187                            data, NULL, NULL, &status, NULL, NULL);
1188         if (ret != 0 || status != 0) {
1189                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1190                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1191                 return -1;
1192         }
1193
1194         while (!state.done) {
1195                 event_loop_once(ctdb_db->ctdb->ev);
1196         }
1197
1198         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1201                 return -1;
1202         }
1203
1204         return state.count;
1205 }
1206
1207 #define ISASCII(x) ((x>31)&&(x<128))
1208 /*
1209   called on each key during a catdb
1210  */
1211 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1212 {
1213         int i;
1214         FILE *f = (FILE *)p;
1215         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1216
1217         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1218         for (i=0;i<key.dsize;i++) {
1219                 if (ISASCII(key.dptr[i])) {
1220                         fprintf(f, "%c", key.dptr[i]);
1221                 } else {
1222                         fprintf(f, "\\%02X", key.dptr[i]);
1223                 }
1224         }
1225         fprintf(f, "\"\n");
1226
1227         fprintf(f, "dmaster: %u\n", h->dmaster);
1228         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1229
1230         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1231         for (i=sizeof(*h);i<data.dsize;i++) {
1232                 if (ISASCII(data.dptr[i])) {
1233                         fprintf(f, "%c", data.dptr[i]);
1234                 } else {
1235                         fprintf(f, "\\%02X", data.dptr[i]);
1236                 }
1237         }
1238         fprintf(f, "\"\n");
1239
1240         fprintf(f, "\n");
1241
1242         return 0;
1243 }
1244
1245 /*
1246   convenience function to list all keys to stdout
1247  */
1248 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1249 {
1250         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1251 }
1252
1253 /*
1254   get the pid of a ctdb daemon
1255  */
1256 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1257 {
1258         int ret;
1259         int32_t res;
1260
1261         ret = ctdb_control(ctdb, destnode, 0, 
1262                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1263                            NULL, NULL, &res, &timeout, NULL);
1264         if (ret != 0) {
1265                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1266                 return -1;
1267         }
1268
1269         *pid = res;
1270
1271         return 0;
1272 }
1273
1274
1275 /*
1276   async freeze send control
1277  */
1278 struct ctdb_client_control_state *
1279 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1280 {
1281         return ctdb_control_send(ctdb, destnode, priority, 
1282                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1283                            mem_ctx, &timeout, NULL);
1284 }
1285
1286 /* 
1287    async freeze recv control
1288 */
1289 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1290 {
1291         int ret;
1292         int32_t res;
1293
1294         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1295         if ( (ret != 0) || (res != 0) ){
1296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1297                 return -1;
1298         }
1299
1300         return 0;
1301 }
1302
1303 /*
1304   freeze databases of a certain priority
1305  */
1306 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1307 {
1308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1309         struct ctdb_client_control_state *state;
1310         int ret;
1311
1312         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1313         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1314         talloc_free(tmp_ctx);
1315
1316         return ret;
1317 }
1318
1319 /* Freeze all databases */
1320 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1321 {
1322         int i;
1323
1324         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1325                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1326                         return -1;
1327                 }
1328         }
1329         return 0;
1330 }
1331
1332 /*
1333   thaw databases of a certain priority
1334  */
1335 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1336 {
1337         int ret;
1338         int32_t res;
1339
1340         ret = ctdb_control(ctdb, destnode, priority, 
1341                            CTDB_CONTROL_THAW, 0, tdb_null, 
1342                            NULL, NULL, &res, &timeout, NULL);
1343         if (ret != 0 || res != 0) {
1344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1345                 return -1;
1346         }
1347
1348         return 0;
1349 }
1350
1351 /* thaw all databases */
1352 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1355 }
1356
1357 /*
1358   get pnn of a node, or -1
1359  */
1360 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1361 {
1362         int ret;
1363         int32_t res;
1364
1365         ret = ctdb_control(ctdb, destnode, 0, 
1366                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1367                            NULL, NULL, &res, &timeout, NULL);
1368         if (ret != 0) {
1369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1370                 return -1;
1371         }
1372
1373         return res;
1374 }
1375
1376 /*
1377   get the monitoring mode of a remote node
1378  */
1379 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1380 {
1381         int ret;
1382         int32_t res;
1383
1384         ret = ctdb_control(ctdb, destnode, 0, 
1385                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1386                            NULL, NULL, &res, &timeout, NULL);
1387         if (ret != 0) {
1388                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1389                 return -1;
1390         }
1391
1392         *monmode = res;
1393
1394         return 0;
1395 }
1396
1397
1398 /*
1399  set the monitoring mode of a remote node to active
1400  */
1401 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1402 {
1403         int ret;
1404         
1405
1406         ret = ctdb_control(ctdb, destnode, 0, 
1407                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1408                            NULL, NULL,NULL, &timeout, NULL);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1411                 return -1;
1412         }
1413
1414         
1415
1416         return 0;
1417 }
1418
1419 /*
1420   set the monitoring mode of a remote node to disable
1421  */
1422 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1423 {
1424         int ret;
1425         
1426
1427         ret = ctdb_control(ctdb, destnode, 0, 
1428                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1429                            NULL, NULL, NULL, &timeout, NULL);
1430         if (ret != 0) {
1431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1432                 return -1;
1433         }
1434
1435         
1436
1437         return 0;
1438 }
1439
1440
1441
1442 /* 
1443   sent to a node to make it take over an ip address
1444 */
1445 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1446                           uint32_t destnode, struct ctdb_public_ip *ip)
1447 {
1448         TDB_DATA data;
1449         struct ctdb_public_ipv4 ipv4;
1450         int ret;
1451         int32_t res;
1452
1453         if (ip->addr.sa.sa_family == AF_INET) {
1454                 ipv4.pnn = ip->pnn;
1455                 ipv4.sin = ip->addr.ip;
1456
1457                 data.dsize = sizeof(ipv4);
1458                 data.dptr  = (uint8_t *)&ipv4;
1459
1460                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1461                            NULL, &res, &timeout, NULL);
1462         } else {
1463                 data.dsize = sizeof(*ip);
1464                 data.dptr  = (uint8_t *)ip;
1465
1466                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1467                            NULL, &res, &timeout, NULL);
1468         }
1469
1470         if (ret != 0 || res != 0) {
1471                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1472                 return -1;
1473         }
1474
1475         return 0;       
1476 }
1477
1478
1479 /* 
1480   sent to a node to make it release an ip address
1481 */
1482 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1483                          uint32_t destnode, struct ctdb_public_ip *ip)
1484 {
1485         TDB_DATA data;
1486         struct ctdb_public_ipv4 ipv4;
1487         int ret;
1488         int32_t res;
1489
1490         if (ip->addr.sa.sa_family == AF_INET) {
1491                 ipv4.pnn = ip->pnn;
1492                 ipv4.sin = ip->addr.ip;
1493
1494                 data.dsize = sizeof(ipv4);
1495                 data.dptr  = (uint8_t *)&ipv4;
1496
1497                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1498                                    NULL, &res, &timeout, NULL);
1499         } else {
1500                 data.dsize = sizeof(*ip);
1501                 data.dptr  = (uint8_t *)ip;
1502
1503                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1504                                    NULL, &res, &timeout, NULL);
1505         }
1506
1507         if (ret != 0 || res != 0) {
1508                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1509                 return -1;
1510         }
1511
1512         return 0;       
1513 }
1514
1515
1516 /*
1517   get a tunable
1518  */
1519 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1520                           struct timeval timeout, 
1521                           uint32_t destnode,
1522                           const char *name, uint32_t *value)
1523 {
1524         struct ctdb_control_get_tunable *t;
1525         TDB_DATA data, outdata;
1526         int32_t res;
1527         int ret;
1528
1529         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1530         data.dptr  = talloc_size(ctdb, data.dsize);
1531         CTDB_NO_MEMORY(ctdb, data.dptr);
1532
1533         t = (struct ctdb_control_get_tunable *)data.dptr;
1534         t->length = strlen(name)+1;
1535         memcpy(t->name, name, t->length);
1536
1537         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1538                            &outdata, &res, &timeout, NULL);
1539         talloc_free(data.dptr);
1540         if (ret != 0 || res != 0) {
1541                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1542                 return -1;
1543         }
1544
1545         if (outdata.dsize != sizeof(uint32_t)) {
1546                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1547                 talloc_free(outdata.dptr);
1548                 return -1;
1549         }
1550         
1551         *value = *(uint32_t *)outdata.dptr;
1552         talloc_free(outdata.dptr);
1553
1554         return 0;
1555 }
1556
1557 /*
1558   set a tunable
1559  */
1560 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1561                           struct timeval timeout, 
1562                           uint32_t destnode,
1563                           const char *name, uint32_t value)
1564 {
1565         struct ctdb_control_set_tunable *t;
1566         TDB_DATA data;
1567         int32_t res;
1568         int ret;
1569
1570         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1571         data.dptr  = talloc_size(ctdb, data.dsize);
1572         CTDB_NO_MEMORY(ctdb, data.dptr);
1573
1574         t = (struct ctdb_control_set_tunable *)data.dptr;
1575         t->length = strlen(name)+1;
1576         memcpy(t->name, name, t->length);
1577         t->value = value;
1578
1579         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1580                            NULL, &res, &timeout, NULL);
1581         talloc_free(data.dptr);
1582         if (ret != 0 || res != 0) {
1583                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1584                 return -1;
1585         }
1586
1587         return 0;
1588 }
1589
1590 /*
1591   list tunables
1592  */
1593 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1594                             struct timeval timeout, 
1595                             uint32_t destnode,
1596                             TALLOC_CTX *mem_ctx,
1597                             const char ***list, uint32_t *count)
1598 {
1599         TDB_DATA outdata;
1600         int32_t res;
1601         int ret;
1602         struct ctdb_control_list_tunable *t;
1603         char *p, *s, *ptr;
1604
1605         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1606                            mem_ctx, &outdata, &res, &timeout, NULL);
1607         if (ret != 0 || res != 0) {
1608                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1609                 return -1;
1610         }
1611
1612         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1613         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1614             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1615                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1616                 talloc_free(outdata.dptr);
1617                 return -1;              
1618         }
1619         
1620         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1621         CTDB_NO_MEMORY(ctdb, p);
1622
1623         talloc_free(outdata.dptr);
1624         
1625         (*list) = NULL;
1626         (*count) = 0;
1627
1628         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1629                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1630                 CTDB_NO_MEMORY(ctdb, *list);
1631                 (*list)[*count] = talloc_strdup(*list, s);
1632                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1633                 (*count)++;
1634         }
1635
1636         talloc_free(p);
1637
1638         return 0;
1639 }
1640
1641
1642 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1643                                    struct timeval timeout, uint32_t destnode,
1644                                    TALLOC_CTX *mem_ctx,
1645                                    uint32_t flags,
1646                                    struct ctdb_all_public_ips **ips)
1647 {
1648         int ret;
1649         TDB_DATA outdata;
1650         int32_t res;
1651
1652         ret = ctdb_control(ctdb, destnode, 0, 
1653                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1654                            mem_ctx, &outdata, &res, &timeout, NULL);
1655         if (ret == 0 && res == -1) {
1656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1657                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1658         }
1659         if (ret != 0 || res != 0) {
1660           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1661                 return -1;
1662         }
1663
1664         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1665         talloc_free(outdata.dptr);
1666                     
1667         return 0;
1668 }
1669
1670 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1671                              struct timeval timeout, uint32_t destnode,
1672                              TALLOC_CTX *mem_ctx,
1673                              struct ctdb_all_public_ips **ips)
1674 {
1675         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1676                                               destnode, mem_ctx,
1677                                               0, ips);
1678 }
1679
1680 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1681                         struct timeval timeout, uint32_t destnode, 
1682                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1683 {
1684         int ret, i, len;
1685         TDB_DATA outdata;
1686         int32_t res;
1687         struct ctdb_all_public_ipsv4 *ipsv4;
1688
1689         ret = ctdb_control(ctdb, destnode, 0, 
1690                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1691                            mem_ctx, &outdata, &res, &timeout, NULL);
1692         if (ret != 0 || res != 0) {
1693                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1694                 return -1;
1695         }
1696
1697         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1698         len = offsetof(struct ctdb_all_public_ips, ips) +
1699                 ipsv4->num*sizeof(struct ctdb_public_ip);
1700         *ips = talloc_zero_size(mem_ctx, len);
1701         CTDB_NO_MEMORY(ctdb, *ips);
1702         (*ips)->num = ipsv4->num;
1703         for (i=0; i<ipsv4->num; i++) {
1704                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1705                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1706         }
1707
1708         talloc_free(outdata.dptr);
1709                     
1710         return 0;
1711 }
1712
1713 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1714                                  struct timeval timeout, uint32_t destnode,
1715                                  TALLOC_CTX *mem_ctx,
1716                                  const ctdb_sock_addr *addr,
1717                                  struct ctdb_control_public_ip_info **_info)
1718 {
1719         int ret;
1720         TDB_DATA indata;
1721         TDB_DATA outdata;
1722         int32_t res;
1723         struct ctdb_control_public_ip_info *info;
1724         uint32_t len;
1725         uint32_t i;
1726
1727         indata.dptr = discard_const_p(uint8_t, addr);
1728         indata.dsize = sizeof(*addr);
1729
1730         ret = ctdb_control(ctdb, destnode, 0,
1731                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1732                            mem_ctx, &outdata, &res, &timeout, NULL);
1733         if (ret != 0 || res != 0) {
1734                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1735                                 "failed ret:%d res:%d\n",
1736                                 ret, res));
1737                 return -1;
1738         }
1739
1740         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1741         if (len > outdata.dsize) {
1742                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1743                                 "returned invalid data with size %u > %u\n",
1744                                 (unsigned int)outdata.dsize,
1745                                 (unsigned int)len));
1746                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1747                 return -1;
1748         }
1749
1750         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1751         len += info->num*sizeof(struct ctdb_control_iface_info);
1752
1753         if (len > outdata.dsize) {
1754                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1755                                 "returned invalid data with size %u > %u\n",
1756                                 (unsigned int)outdata.dsize,
1757                                 (unsigned int)len));
1758                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1759                 return -1;
1760         }
1761
1762         /* make sure we null terminate the returned strings */
1763         for (i=0; i < info->num; i++) {
1764                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1765         }
1766
1767         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1768                                                                 outdata.dptr,
1769                                                                 outdata.dsize);
1770         talloc_free(outdata.dptr);
1771         if (*_info == NULL) {
1772                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1773                                 "talloc_memdup size %u failed\n",
1774                                 (unsigned int)outdata.dsize));
1775                 return -1;
1776         }
1777
1778         return 0;
1779 }
1780
1781 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1782                          struct timeval timeout, uint32_t destnode,
1783                          TALLOC_CTX *mem_ctx,
1784                          struct ctdb_control_get_ifaces **_ifaces)
1785 {
1786         int ret;
1787         TDB_DATA outdata;
1788         int32_t res;
1789         struct ctdb_control_get_ifaces *ifaces;
1790         uint32_t len;
1791         uint32_t i;
1792
1793         ret = ctdb_control(ctdb, destnode, 0,
1794                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1795                            mem_ctx, &outdata, &res, &timeout, NULL);
1796         if (ret != 0 || res != 0) {
1797                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1798                                 "failed ret:%d res:%d\n",
1799                                 ret, res));
1800                 return -1;
1801         }
1802
1803         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1804         if (len > outdata.dsize) {
1805                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1806                                 "returned invalid data with size %u > %u\n",
1807                                 (unsigned int)outdata.dsize,
1808                                 (unsigned int)len));
1809                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1810                 return -1;
1811         }
1812
1813         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1814         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1815
1816         if (len > outdata.dsize) {
1817                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1818                                 "returned invalid data with size %u > %u\n",
1819                                 (unsigned int)outdata.dsize,
1820                                 (unsigned int)len));
1821                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1822                 return -1;
1823         }
1824
1825         /* make sure we null terminate the returned strings */
1826         for (i=0; i < ifaces->num; i++) {
1827                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1828         }
1829
1830         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1831                                                                   outdata.dptr,
1832                                                                   outdata.dsize);
1833         talloc_free(outdata.dptr);
1834         if (*_ifaces == NULL) {
1835                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1836                                 "talloc_memdup size %u failed\n",
1837                                 (unsigned int)outdata.dsize));
1838                 return -1;
1839         }
1840
1841         return 0;
1842 }
1843
1844 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1845                              struct timeval timeout, uint32_t destnode,
1846                              TALLOC_CTX *mem_ctx,
1847                              const struct ctdb_control_iface_info *info)
1848 {
1849         int ret;
1850         TDB_DATA indata;
1851         int32_t res;
1852
1853         indata.dptr = discard_const_p(uint8_t, info);
1854         indata.dsize = sizeof(*info);
1855
1856         ret = ctdb_control(ctdb, destnode, 0,
1857                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1858                            mem_ctx, NULL, &res, &timeout, NULL);
1859         if (ret != 0 || res != 0) {
1860                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1861                                 "failed ret:%d res:%d\n",
1862                                 ret, res));
1863                 return -1;
1864         }
1865
1866         return 0;
1867 }
1868
1869 /*
1870   set/clear the permanent disabled bit on a remote node
1871  */
1872 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1873                        uint32_t set, uint32_t clear)
1874 {
1875         int ret;
1876         TDB_DATA data;
1877         struct ctdb_node_map *nodemap=NULL;
1878         struct ctdb_node_flag_change c;
1879         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1880         uint32_t recmaster;
1881         uint32_t *nodes;
1882
1883
1884         /* find the recovery master */
1885         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1886         if (ret != 0) {
1887                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1888                 talloc_free(tmp_ctx);
1889                 return ret;
1890         }
1891
1892
1893         /* read the node flags from the recmaster */
1894         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1897                 talloc_free(tmp_ctx);
1898                 return -1;
1899         }
1900         if (destnode >= nodemap->num) {
1901                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1902                 talloc_free(tmp_ctx);
1903                 return -1;
1904         }
1905
1906         c.pnn       = destnode;
1907         c.old_flags = nodemap->nodes[destnode].flags;
1908         c.new_flags = c.old_flags;
1909         c.new_flags |= set;
1910         c.new_flags &= ~clear;
1911
1912         data.dsize = sizeof(c);
1913         data.dptr = (unsigned char *)&c;
1914
1915         /* send the flags update to all connected nodes */
1916         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1917
1918         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1919                                         nodes, 0,
1920                                         timeout, false, data,
1921                                         NULL, NULL,
1922                                         NULL) != 0) {
1923                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1924
1925                 talloc_free(tmp_ctx);
1926                 return -1;
1927         }
1928
1929         talloc_free(tmp_ctx);
1930         return 0;
1931 }
1932
1933
1934 /*
1935   get all tunables
1936  */
1937 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1938                                struct timeval timeout, 
1939                                uint32_t destnode,
1940                                struct ctdb_tunable *tunables)
1941 {
1942         TDB_DATA outdata;
1943         int ret;
1944         int32_t res;
1945
1946         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1947                            &outdata, &res, &timeout, NULL);
1948         if (ret != 0 || res != 0) {
1949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1950                 return -1;
1951         }
1952
1953         if (outdata.dsize != sizeof(*tunables)) {
1954                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1955                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1956                 return -1;              
1957         }
1958
1959         *tunables = *(struct ctdb_tunable *)outdata.dptr;
1960         talloc_free(outdata.dptr);
1961         return 0;
1962 }
1963
1964 /*
1965   add a public address to a node
1966  */
1967 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
1968                       struct timeval timeout, 
1969                       uint32_t destnode,
1970                       struct ctdb_control_ip_iface *pub)
1971 {
1972         TDB_DATA data;
1973         int32_t res;
1974         int ret;
1975
1976         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
1977         data.dptr  = (unsigned char *)pub;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
1980                            NULL, &res, &timeout, NULL);
1981         if (ret != 0 || res != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
1983                 return -1;
1984         }
1985
1986         return 0;
1987 }
1988
1989 /*
1990   delete a public address from a node
1991  */
1992 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
1993                       struct timeval timeout, 
1994                       uint32_t destnode,
1995                       struct ctdb_control_ip_iface *pub)
1996 {
1997         TDB_DATA data;
1998         int32_t res;
1999         int ret;
2000
2001         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2002         data.dptr  = (unsigned char *)pub;
2003
2004         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2005                            NULL, &res, &timeout, NULL);
2006         if (ret != 0 || res != 0) {
2007                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2008                 return -1;
2009         }
2010
2011         return 0;
2012 }
2013
2014 /*
2015   kill a tcp connection
2016  */
2017 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2018                       struct timeval timeout, 
2019                       uint32_t destnode,
2020                       struct ctdb_control_killtcp *killtcp)
2021 {
2022         TDB_DATA data;
2023         int32_t res;
2024         int ret;
2025
2026         data.dsize = sizeof(struct ctdb_control_killtcp);
2027         data.dptr  = (unsigned char *)killtcp;
2028
2029         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2030                            NULL, &res, &timeout, NULL);
2031         if (ret != 0 || res != 0) {
2032                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2033                 return -1;
2034         }
2035
2036         return 0;
2037 }
2038
2039 /*
2040   send a gratious arp
2041  */
2042 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2043                       struct timeval timeout, 
2044                       uint32_t destnode,
2045                       ctdb_sock_addr *addr,
2046                       const char *ifname)
2047 {
2048         TDB_DATA data;
2049         int32_t res;
2050         int ret, len;
2051         struct ctdb_control_gratious_arp *gratious_arp;
2052         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2053
2054
2055         len = strlen(ifname)+1;
2056         gratious_arp = talloc_size(tmp_ctx, 
2057                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2058         CTDB_NO_MEMORY(ctdb, gratious_arp);
2059
2060         gratious_arp->addr = *addr;
2061         gratious_arp->len = len;
2062         memcpy(&gratious_arp->iface[0], ifname, len);
2063
2064
2065         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2066         data.dptr  = (unsigned char *)gratious_arp;
2067
2068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2069                            NULL, &res, &timeout, NULL);
2070         if (ret != 0 || res != 0) {
2071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2072                 talloc_free(tmp_ctx);
2073                 return -1;
2074         }
2075
2076         talloc_free(tmp_ctx);
2077         return 0;
2078 }
2079
2080 /*
2081   get a list of all tcp tickles that a node knows about for a particular vnn
2082  */
2083 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2084                               struct timeval timeout, uint32_t destnode, 
2085                               TALLOC_CTX *mem_ctx, 
2086                               ctdb_sock_addr *addr,
2087                               struct ctdb_control_tcp_tickle_list **list)
2088 {
2089         int ret;
2090         TDB_DATA data, outdata;
2091         int32_t status;
2092
2093         data.dptr = (uint8_t*)addr;
2094         data.dsize = sizeof(ctdb_sock_addr);
2095
2096         ret = ctdb_control(ctdb, destnode, 0, 
2097                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2098                            mem_ctx, &outdata, &status, NULL, NULL);
2099         if (ret != 0 || status != 0) {
2100                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2101                 return -1;
2102         }
2103
2104         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2105
2106         return status;
2107 }
2108
2109 /*
2110   register a server id
2111  */
2112 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2113                       struct timeval timeout, 
2114                       struct ctdb_server_id *id)
2115 {
2116         TDB_DATA data;
2117         int32_t res;
2118         int ret;
2119
2120         data.dsize = sizeof(struct ctdb_server_id);
2121         data.dptr  = (unsigned char *)id;
2122
2123         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2124                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2125                         0, data, NULL,
2126                         NULL, &res, &timeout, NULL);
2127         if (ret != 0 || res != 0) {
2128                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2129                 return -1;
2130         }
2131
2132         return 0;
2133 }
2134
2135 /*
2136   unregister a server id
2137  */
2138 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2139                       struct timeval timeout, 
2140                       struct ctdb_server_id *id)
2141 {
2142         TDB_DATA data;
2143         int32_t res;
2144         int ret;
2145
2146         data.dsize = sizeof(struct ctdb_server_id);
2147         data.dptr  = (unsigned char *)id;
2148
2149         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2150                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2151                         0, data, NULL,
2152                         NULL, &res, &timeout, NULL);
2153         if (ret != 0 || res != 0) {
2154                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2155                 return -1;
2156         }
2157
2158         return 0;
2159 }
2160
2161
2162 /*
2163   check if a server id exists
2164
2165   if a server id does exist, return *status == 1, otherwise *status == 0
2166  */
2167 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2168                       struct timeval timeout, 
2169                       uint32_t destnode,
2170                       struct ctdb_server_id *id,
2171                       uint32_t *status)
2172 {
2173         TDB_DATA data;
2174         int32_t res;
2175         int ret;
2176
2177         data.dsize = sizeof(struct ctdb_server_id);
2178         data.dptr  = (unsigned char *)id;
2179
2180         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2181                         0, data, NULL,
2182                         NULL, &res, &timeout, NULL);
2183         if (ret != 0) {
2184                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2185                 return -1;
2186         }
2187
2188         if (res) {
2189                 *status = 1;
2190         } else {
2191                 *status = 0;
2192         }
2193
2194         return 0;
2195 }
2196
2197 /*
2198    get the list of server ids that are registered on a node
2199 */
2200 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2201                 TALLOC_CTX *mem_ctx,
2202                 struct timeval timeout, uint32_t destnode, 
2203                 struct ctdb_server_id_list **svid_list)
2204 {
2205         int ret;
2206         TDB_DATA outdata;
2207         int32_t res;
2208
2209         ret = ctdb_control(ctdb, destnode, 0, 
2210                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2211                            mem_ctx, &outdata, &res, &timeout, NULL);
2212         if (ret != 0 || res != 0) {
2213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2214                 return -1;
2215         }
2216
2217         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2218                     
2219         return 0;
2220 }
2221
2222 /*
2223   initialise the ctdb daemon for client applications
2224
2225   NOTE: In current code the daemon does not fork. This is for testing purposes only
2226   and to simplify the code.
2227 */
2228 struct ctdb_context *ctdb_init(struct event_context *ev)
2229 {
2230         int ret;
2231         struct ctdb_context *ctdb;
2232
2233         ctdb = talloc_zero(ev, struct ctdb_context);
2234         if (ctdb == NULL) {
2235                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2236                 return NULL;
2237         }
2238         ctdb->ev  = ev;
2239         ctdb->idr = idr_init(ctdb);
2240         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2241
2242         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2243         if (ret != 0) {
2244                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2245                 talloc_free(ctdb);
2246                 return NULL;
2247         }
2248
2249         return ctdb;
2250 }
2251
2252
2253 /*
2254   set some ctdb flags
2255 */
2256 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2257 {
2258         ctdb->flags |= flags;
2259 }
2260
2261
2262 /*
2263   return the pnn of this node
2264 */
2265 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2266 {
2267         return ctdb->pnn;
2268 }
2269
2270
2271 /*
2272   get the uptime of a remote node
2273  */
2274 struct ctdb_client_control_state *
2275 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2276 {
2277         return ctdb_control_send(ctdb, destnode, 0, 
2278                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2279                            mem_ctx, &timeout, NULL);
2280 }
2281
2282 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2283 {
2284         int ret;
2285         int32_t res;
2286         TDB_DATA outdata;
2287
2288         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2289         if (ret != 0 || res != 0) {
2290                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2291                 return -1;
2292         }
2293
2294         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2295
2296         return 0;
2297 }
2298
2299 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2300 {
2301         struct ctdb_client_control_state *state;
2302
2303         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2304         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2305 }
2306
2307 /*
2308   send a control to execute the "recovered" event script on a node
2309  */
2310 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2311 {
2312         int ret;
2313         int32_t status;
2314
2315         ret = ctdb_control(ctdb, destnode, 0, 
2316                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2317                            NULL, NULL, &status, &timeout, NULL);
2318         if (ret != 0 || status != 0) {
2319                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2320                 return -1;
2321         }
2322
2323         return 0;
2324 }
2325
2326 /* 
2327   callback for the async helpers used when sending the same control
2328   to multiple nodes in parallell.
2329 */
2330 static void async_callback(struct ctdb_client_control_state *state)
2331 {
2332         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2333         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2334         int ret;
2335         TDB_DATA outdata;
2336         int32_t res;
2337         uint32_t destnode = state->c->hdr.destnode;
2338
2339         /* one more node has responded with recmode data */
2340         data->count--;
2341
2342         /* if we failed to push the db, then return an error and let
2343            the main loop try again.
2344         */
2345         if (state->state != CTDB_CONTROL_DONE) {
2346                 if ( !data->dont_log_errors) {
2347                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2348                 }
2349                 data->fail_count++;
2350                 if (data->fail_callback) {
2351                         data->fail_callback(ctdb, destnode, res, outdata,
2352                                         data->callback_data);
2353                 }
2354                 return;
2355         }
2356         
2357         state->async.fn = NULL;
2358
2359         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2360         if ((ret != 0) || (res != 0)) {
2361                 if ( !data->dont_log_errors) {
2362                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2363                 }
2364                 data->fail_count++;
2365                 if (data->fail_callback) {
2366                         data->fail_callback(ctdb, destnode, res, outdata,
2367                                         data->callback_data);
2368                 }
2369         }
2370         if ((ret == 0) && (data->callback != NULL)) {
2371                 data->callback(ctdb, destnode, res, outdata,
2372                                         data->callback_data);
2373         }
2374 }
2375
2376
2377 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2378 {
2379         /* set up the callback functions */
2380         state->async.fn = async_callback;
2381         state->async.private_data = data;
2382         
2383         /* one more control to wait for to complete */
2384         data->count++;
2385 }
2386
2387
2388 /* wait for up to the maximum number of seconds allowed
2389    or until all nodes we expect a response from has replied
2390 */
2391 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2392 {
2393         while (data->count > 0) {
2394                 event_loop_once(ctdb->ev);
2395         }
2396         if (data->fail_count != 0) {
2397                 if (!data->dont_log_errors) {
2398                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2399                                  data->fail_count));
2400                 }
2401                 return -1;
2402         }
2403         return 0;
2404 }
2405
2406
2407 /* 
2408    perform a simple control on the listed nodes
2409    The control cannot return data
2410  */
2411 int ctdb_client_async_control(struct ctdb_context *ctdb,
2412                                 enum ctdb_controls opcode,
2413                                 uint32_t *nodes,
2414                                 uint64_t srvid,
2415                                 struct timeval timeout,
2416                                 bool dont_log_errors,
2417                                 TDB_DATA data,
2418                                 client_async_callback client_callback,
2419                                 client_async_callback fail_callback,
2420                                 void *callback_data)
2421 {
2422         struct client_async_data *async_data;
2423         struct ctdb_client_control_state *state;
2424         int j, num_nodes;
2425
2426         async_data = talloc_zero(ctdb, struct client_async_data);
2427         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2428         async_data->dont_log_errors = dont_log_errors;
2429         async_data->callback = client_callback;
2430         async_data->fail_callback = fail_callback;
2431         async_data->callback_data = callback_data;
2432         async_data->opcode        = opcode;
2433
2434         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2435
2436         /* loop over all nodes and send an async control to each of them */
2437         for (j=0; j<num_nodes; j++) {
2438                 uint32_t pnn = nodes[j];
2439
2440                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2441                                           0, data, async_data, &timeout, NULL);
2442                 if (state == NULL) {
2443                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2444                         talloc_free(async_data);
2445                         return -1;
2446                 }
2447                 
2448                 ctdb_client_async_add(async_data, state);
2449         }
2450
2451         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2452                 talloc_free(async_data);
2453                 return -1;
2454         }
2455
2456         talloc_free(async_data);
2457         return 0;
2458 }
2459
2460 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2461                                 struct ctdb_vnn_map *vnn_map,
2462                                 TALLOC_CTX *mem_ctx,
2463                                 bool include_self)
2464 {
2465         int i, j, num_nodes;
2466         uint32_t *nodes;
2467
2468         for (i=num_nodes=0;i<vnn_map->size;i++) {
2469                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2470                         continue;
2471                 }
2472                 num_nodes++;
2473         } 
2474
2475         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2476         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2477
2478         for (i=j=0;i<vnn_map->size;i++) {
2479                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2480                         continue;
2481                 }
2482                 nodes[j++] = vnn_map->map[i];
2483         } 
2484
2485         return nodes;
2486 }
2487
2488 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2489                                 struct ctdb_node_map *node_map,
2490                                 TALLOC_CTX *mem_ctx,
2491                                 bool include_self)
2492 {
2493         int i, j, num_nodes;
2494         uint32_t *nodes;
2495
2496         for (i=num_nodes=0;i<node_map->num;i++) {
2497                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2498                         continue;
2499                 }
2500                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2501                         continue;
2502                 }
2503                 num_nodes++;
2504         } 
2505
2506         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2507         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2508
2509         for (i=j=0;i<node_map->num;i++) {
2510                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2511                         continue;
2512                 }
2513                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2514                         continue;
2515                 }
2516                 nodes[j++] = node_map->nodes[i].pnn;
2517         } 
2518
2519         return nodes;
2520 }
2521
2522 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2523                                 struct ctdb_node_map *node_map,
2524                                 TALLOC_CTX *mem_ctx,
2525                                 uint32_t pnn)
2526 {
2527         int i, j, num_nodes;
2528         uint32_t *nodes;
2529
2530         for (i=num_nodes=0;i<node_map->num;i++) {
2531                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2532                         continue;
2533                 }
2534                 if (node_map->nodes[i].pnn == pnn) {
2535                         continue;
2536                 }
2537                 num_nodes++;
2538         } 
2539
2540         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2541         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2542
2543         for (i=j=0;i<node_map->num;i++) {
2544                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2545                         continue;
2546                 }
2547                 if (node_map->nodes[i].pnn == pnn) {
2548                         continue;
2549                 }
2550                 nodes[j++] = node_map->nodes[i].pnn;
2551         } 
2552
2553         return nodes;
2554 }
2555
2556 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2557                                 struct ctdb_node_map *node_map,
2558                                 TALLOC_CTX *mem_ctx,
2559                                 bool include_self)
2560 {
2561         int i, j, num_nodes;
2562         uint32_t *nodes;
2563
2564         for (i=num_nodes=0;i<node_map->num;i++) {
2565                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2566                         continue;
2567                 }
2568                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2569                         continue;
2570                 }
2571                 num_nodes++;
2572         } 
2573
2574         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2575         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2576
2577         for (i=j=0;i<node_map->num;i++) {
2578                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2579                         continue;
2580                 }
2581                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2582                         continue;
2583                 }
2584                 nodes[j++] = node_map->nodes[i].pnn;
2585         } 
2586
2587         return nodes;
2588 }
2589
2590 /* 
2591   this is used to test if a pnn lock exists and if it exists will return
2592   the number of connections that pnn has reported or -1 if that recovery
2593   daemon is not running.
2594 */
2595 int
2596 ctdb_read_pnn_lock(int fd, int32_t pnn)
2597 {
2598         struct flock lock;
2599         char c;
2600
2601         lock.l_type = F_WRLCK;
2602         lock.l_whence = SEEK_SET;
2603         lock.l_start = pnn;
2604         lock.l_len = 1;
2605         lock.l_pid = 0;
2606
2607         if (fcntl(fd, F_GETLK, &lock) != 0) {
2608                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2609                 return -1;
2610         }
2611
2612         if (lock.l_type == F_UNLCK) {
2613                 return -1;
2614         }
2615
2616         if (pread(fd, &c, 1, pnn) == -1) {
2617                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2618                 return -1;
2619         }
2620
2621         return c;
2622 }
2623
2624 /*
2625   get capabilities of a remote node
2626  */
2627 struct ctdb_client_control_state *
2628 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2629 {
2630         return ctdb_control_send(ctdb, destnode, 0, 
2631                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2632                            mem_ctx, &timeout, NULL);
2633 }
2634
2635 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2636 {
2637         int ret;
2638         int32_t res;
2639         TDB_DATA outdata;
2640
2641         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2642         if ( (ret != 0) || (res != 0) ) {
2643                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2644                 return -1;
2645         }
2646
2647         if (capabilities) {
2648                 *capabilities = *((uint32_t *)outdata.dptr);
2649         }
2650
2651         return 0;
2652 }
2653
2654 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2655 {
2656         struct ctdb_client_control_state *state;
2657         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2658         int ret;
2659
2660         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2661         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2662         talloc_free(tmp_ctx);
2663         return ret;
2664 }
2665
2666 /**
2667  * check whether a transaction is active on a given db on a given node
2668  */
2669 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2670                                      uint32_t destnode,
2671                                      uint32_t db_id)
2672 {
2673         int32_t status;
2674         int ret;
2675         TDB_DATA indata;
2676
2677         indata.dptr = (uint8_t *)&db_id;
2678         indata.dsize = sizeof(db_id);
2679
2680         ret = ctdb_control(ctdb, destnode, 0,
2681                            CTDB_CONTROL_TRANS2_ACTIVE,
2682                            0, indata, NULL, NULL, &status,
2683                            NULL, NULL);
2684
2685         if (ret != 0) {
2686                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2687                 return -1;
2688         }
2689
2690         return status;
2691 }
2692
2693
2694 struct ctdb_transaction_handle {
2695         struct ctdb_db_context *ctdb_db;
2696         bool in_replay;
2697         /*
2698          * we store the reads and writes done under a transaction:
2699          * - one list stores both reads and writes (m_all),
2700          * - the other just writes (m_write)
2701          */
2702         struct ctdb_marshall_buffer *m_all;
2703         struct ctdb_marshall_buffer *m_write;
2704 };
2705
2706 /* start a transaction on a database */
2707 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2708 {
2709         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2710         return 0;
2711 }
2712
2713 /* start a transaction on a database */
2714 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2715 {
2716         struct ctdb_record_handle *rh;
2717         TDB_DATA key;
2718         TDB_DATA data;
2719         struct ctdb_ltdb_header header;
2720         TALLOC_CTX *tmp_ctx;
2721         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2722         int ret;
2723         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2724         pid_t pid;
2725         int32_t status;
2726
2727         key.dptr = discard_const(keyname);
2728         key.dsize = strlen(keyname);
2729
2730         if (!ctdb_db->persistent) {
2731                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2732                 return -1;
2733         }
2734
2735 again:
2736         tmp_ctx = talloc_new(h);
2737
2738         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2739         if (rh == NULL) {
2740                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2741                 talloc_free(tmp_ctx);
2742                 return -1;
2743         }
2744
2745         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2746                                               CTDB_CURRENT_NODE,
2747                                               ctdb_db->db_id);
2748         if (status == 1) {
2749                 unsigned long int usec = (1000 + random()) % 100000;
2750                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2751                                     "on db_id[0x%08x]. waiting for %lu "
2752                                     "microseconds\n",
2753                                     ctdb_db->db_id, usec));
2754                 talloc_free(tmp_ctx);
2755                 usleep(usec);
2756                 goto again;
2757         }
2758
2759         /*
2760          * store the pid in the database:
2761          * it is not enough that the node is dmaster...
2762          */
2763         pid = getpid();
2764         data.dptr = (unsigned char *)&pid;
2765         data.dsize = sizeof(pid_t);
2766         rh->header.rsn++;
2767         rh->header.dmaster = ctdb_db->ctdb->pnn;
2768         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2769         if (ret != 0) {
2770                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2771                                   "transaction record\n"));
2772                 talloc_free(tmp_ctx);
2773                 return -1;
2774         }
2775
2776         talloc_free(rh);
2777
2778         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2779         if (ret != 0) {
2780                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2781                 talloc_free(tmp_ctx);
2782                 return -1;
2783         }
2784
2785         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2786         if (ret != 0) {
2787                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2788                                  "lock record inside transaction\n"));
2789                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2790                 talloc_free(tmp_ctx);
2791                 goto again;
2792         }
2793
2794         if (header.dmaster != ctdb_db->ctdb->pnn) {
2795                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2796                                    "transaction lock record\n"));
2797                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2798                 talloc_free(tmp_ctx);
2799                 goto again;
2800         }
2801
2802         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2803                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2804                                     "the transaction lock record\n"));
2805                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2806                 talloc_free(tmp_ctx);
2807                 goto again;
2808         }
2809
2810         talloc_free(tmp_ctx);
2811
2812         return 0;
2813 }
2814
2815
2816 /* start a transaction on a database */
2817 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2818                                                        TALLOC_CTX *mem_ctx)
2819 {
2820         struct ctdb_transaction_handle *h;
2821         int ret;
2822
2823         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2824         if (h == NULL) {
2825                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2826                 return NULL;
2827         }
2828
2829         h->ctdb_db = ctdb_db;
2830
2831         ret = ctdb_transaction_fetch_start(h);
2832         if (ret != 0) {
2833                 talloc_free(h);
2834                 return NULL;
2835         }
2836
2837         talloc_set_destructor(h, ctdb_transaction_destructor);
2838
2839         return h;
2840 }
2841
2842
2843
2844 /*
2845   fetch a record inside a transaction
2846  */
2847 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2848                            TALLOC_CTX *mem_ctx, 
2849                            TDB_DATA key, TDB_DATA *data)
2850 {
2851         struct ctdb_ltdb_header header;
2852         int ret;
2853
2854         ZERO_STRUCT(header);
2855
2856         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2857         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2858                 /* record doesn't exist yet */
2859                 *data = tdb_null;
2860                 ret = 0;
2861         }
2862         
2863         if (ret != 0) {
2864                 return ret;
2865         }
2866
2867         if (!h->in_replay) {
2868                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2869                 if (h->m_all == NULL) {
2870                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2871                         return -1;
2872                 }
2873         }
2874
2875         return 0;
2876 }
2877
2878 /*
2879   stores a record inside a transaction
2880  */
2881 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2882                            TDB_DATA key, TDB_DATA data)
2883 {
2884         TALLOC_CTX *tmp_ctx = talloc_new(h);
2885         struct ctdb_ltdb_header header;
2886         TDB_DATA olddata;
2887         int ret;
2888
2889         ZERO_STRUCT(header);
2890
2891         /* we need the header so we can update the RSN */
2892         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2893         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2894                 /* the record doesn't exist - create one with us as dmaster.
2895                    This is only safe because we are in a transaction and this
2896                    is a persistent database */
2897                 ZERO_STRUCT(header);
2898         } else if (ret != 0) {
2899                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2900                 talloc_free(tmp_ctx);
2901                 return ret;
2902         }
2903
2904         if (data.dsize == olddata.dsize &&
2905             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2906                 /* save writing the same data */
2907                 talloc_free(tmp_ctx);
2908                 return 0;
2909         }
2910
2911         header.dmaster = h->ctdb_db->ctdb->pnn;
2912         header.rsn++;
2913
2914         if (!h->in_replay) {
2915                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2916                 if (h->m_all == NULL) {
2917                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2918                         talloc_free(tmp_ctx);
2919                         return -1;
2920                 }
2921         }               
2922
2923         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2924         if (h->m_write == NULL) {
2925                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2926                 talloc_free(tmp_ctx);
2927                 return -1;
2928         }
2929         
2930         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2931
2932         talloc_free(tmp_ctx);
2933         
2934         return ret;
2935 }
2936
2937 /*
2938   replay a transaction
2939  */
2940 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2941 {
2942         int ret, i;
2943         struct ctdb_rec_data *rec = NULL;
2944
2945         h->in_replay = true;
2946         talloc_free(h->m_write);
2947         h->m_write = NULL;
2948
2949         ret = ctdb_transaction_fetch_start(h);
2950         if (ret != 0) {
2951                 return ret;
2952         }
2953
2954         for (i=0;i<h->m_all->count;i++) {
2955                 TDB_DATA key, data;
2956
2957                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2958                 if (rec == NULL) {
2959                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2960                         goto failed;
2961                 }
2962
2963                 if (rec->reqid == 0) {
2964                         /* its a store */
2965                         if (ctdb_transaction_store(h, key, data) != 0) {
2966                                 goto failed;
2967                         }
2968                 } else {
2969                         TDB_DATA data2;
2970                         TALLOC_CTX *tmp_ctx = talloc_new(h);
2971
2972                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
2973                                 talloc_free(tmp_ctx);
2974                                 goto failed;
2975                         }
2976                         if (data2.dsize != data.dsize ||
2977                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
2978                                 /* the record has changed on us - we have to give up */
2979                                 talloc_free(tmp_ctx);
2980                                 goto failed;
2981                         }
2982                         talloc_free(tmp_ctx);
2983                 }
2984         }
2985         
2986         return 0;
2987
2988 failed:
2989         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2990         return -1;
2991 }
2992
2993
2994 /*
2995   commit a transaction
2996  */
2997 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2998 {
2999         int ret, retries=0;
3000         int32_t status;
3001         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3002         struct timeval timeout;
3003         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3004
3005         talloc_set_destructor(h, NULL);
3006
3007         /* our commit strategy is quite complex.
3008
3009            - we first try to commit the changes to all other nodes
3010
3011            - if that works, then we commit locally and we are done
3012
3013            - if a commit on another node fails, then we need to cancel
3014              the transaction, then restart the transaction (thus
3015              opening a window of time for a pending recovery to
3016              complete), then replay the transaction, checking all the
3017              reads and writes (checking that reads give the same data,
3018              and writes succeed). Then we retry the transaction to the
3019              other nodes
3020         */
3021
3022 again:
3023         if (h->m_write == NULL) {
3024                 /* no changes were made */
3025                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3026                 talloc_free(h);
3027                 return 0;
3028         }
3029
3030         /* tell ctdbd to commit to the other nodes */
3031         timeout = timeval_current_ofs(1, 0);
3032         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3033                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3034                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3035                            &timeout, NULL);
3036         if (ret != 0 || status != 0) {
3037                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3038                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3039                                      ", retrying after 1 second...\n",
3040                                      (retries==0)?"":"retry "));
3041                 sleep(1);
3042
3043                 if (ret != 0) {
3044                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3045                 } else {
3046                         /* work out what error code we will give if we 
3047                            have to fail the operation */
3048                         switch ((enum ctdb_trans2_commit_error)status) {
3049                         case CTDB_TRANS2_COMMIT_SUCCESS:
3050                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3051                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3052                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3053                                 break;
3054                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3055                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3056                                 break;
3057                         }
3058                 }
3059
3060                 if (++retries == 100) {
3061                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3062                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3063                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3064                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3065                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3066                         talloc_free(h);
3067                         return -1;
3068                 }               
3069
3070                 if (ctdb_replay_transaction(h) != 0) {
3071                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3072                                           "transaction on db 0x%08x, "
3073                                           "failure control =%u\n",
3074                                           h->ctdb_db->db_id,
3075                                           (unsigned)failure_control));
3076                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3077                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3078                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3079                         talloc_free(h);
3080                         return -1;
3081                 }
3082                 goto again;
3083         } else {
3084                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3085         }
3086
3087         /* do the real commit locally */
3088         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3089         if (ret != 0) {
3090                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3091                                   "on db id 0x%08x locally, "
3092                                   "failure_control=%u\n",
3093                                   h->ctdb_db->db_id,
3094                                   (unsigned)failure_control));
3095                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3096                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3097                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3098                 talloc_free(h);
3099                 return ret;
3100         }
3101
3102         /* tell ctdbd that we are finished with our local commit */
3103         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3104                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3105                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3106         talloc_free(h);
3107         return 0;
3108 }
3109
3110 /*
3111   recovery daemon ping to main daemon
3112  */
3113 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3114 {
3115         int ret;
3116         int32_t res;
3117
3118         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3119                            ctdb, NULL, &res, NULL, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128 /* when forking the main daemon and the child process needs to connect back
3129  * to the daemon as a client process, this function can be used to change
3130  * the ctdb context from daemon into client mode
3131  */
3132 int switch_from_server_to_client(struct ctdb_context *ctdb)
3133 {
3134         int ret;
3135
3136         /* shutdown the transport */
3137         if (ctdb->methods) {
3138                 ctdb->methods->shutdown(ctdb);
3139         }
3140
3141         /* get a new event context */
3142         talloc_free(ctdb->ev);
3143         ctdb->ev = event_context_init(ctdb);
3144
3145         close(ctdb->daemon.sd);
3146         ctdb->daemon.sd = -1;
3147
3148         /* initialise ctdb */
3149         ret = ctdb_socket_connect(ctdb);
3150         if (ret != 0) {
3151                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3152                 return -1;
3153         }
3154
3155          return 0;
3156 }
3157
3158 /*
3159   get the status of running the monitor eventscripts: NULL means never run.
3160  */
3161 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3162                 struct timeval timeout, uint32_t destnode, 
3163                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3164                 struct ctdb_scripts_wire **script_status)
3165 {
3166         int ret;
3167         TDB_DATA outdata, indata;
3168         int32_t res;
3169         uint32_t uinttype = type;
3170
3171         indata.dptr = (uint8_t *)&uinttype;
3172         indata.dsize = sizeof(uinttype);
3173
3174         ret = ctdb_control(ctdb, destnode, 0, 
3175                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3176                            mem_ctx, &outdata, &res, &timeout, NULL);
3177         if (ret != 0 || res != 0) {
3178                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3179                 return -1;
3180         }
3181
3182         if (outdata.dsize == 0) {
3183                 *script_status = NULL;
3184         } else {
3185                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3186                 talloc_free(outdata.dptr);
3187         }
3188                     
3189         return 0;
3190 }
3191
3192 /*
3193   tell the main daemon how long it took to lock the reclock file
3194  */
3195 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3196 {
3197         int ret;
3198         int32_t res;
3199         TDB_DATA data;
3200
3201         data.dptr = (uint8_t *)&latency;
3202         data.dsize = sizeof(latency);
3203
3204         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3205                            ctdb, NULL, &res, NULL, NULL);
3206         if (ret != 0 || res != 0) {
3207                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3208                 return -1;
3209         }
3210
3211         return 0;
3212 }
3213
3214 /*
3215   get the name of the reclock file
3216  */
3217 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3218                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3219                          const char **name)
3220 {
3221         int ret;
3222         int32_t res;
3223         TDB_DATA data;
3224
3225         ret = ctdb_control(ctdb, destnode, 0, 
3226                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3227                            mem_ctx, &data, &res, &timeout, NULL);
3228         if (ret != 0 || res != 0) {
3229                 return -1;
3230         }
3231
3232         if (data.dsize == 0) {
3233                 *name = NULL;
3234         } else {
3235                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3236         }
3237         talloc_free(data.dptr);
3238
3239         return 0;
3240 }
3241
3242 /*
3243   set the reclock filename for a node
3244  */
3245 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3246 {
3247         int ret;
3248         TDB_DATA data;
3249         int32_t res;
3250
3251         if (reclock == NULL) {
3252                 data.dsize = 0;
3253                 data.dptr  = NULL;
3254         } else {
3255                 data.dsize = strlen(reclock) + 1;
3256                 data.dptr  = discard_const(reclock);
3257         }
3258
3259         ret = ctdb_control(ctdb, destnode, 0, 
3260                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3261                            NULL, NULL, &res, &timeout, NULL);
3262         if (ret != 0 || res != 0) {
3263                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3264                 return -1;
3265         }
3266
3267         return 0;
3268 }
3269
3270 /*
3271   stop a node
3272  */
3273 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3274 {
3275         int ret;
3276         int32_t res;
3277
3278         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3279                            ctdb, NULL, &res, &timeout, NULL);
3280         if (ret != 0 || res != 0) {
3281                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3282                 return -1;
3283         }
3284
3285         return 0;
3286 }
3287
3288 /*
3289   continue a node
3290  */
3291 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3292 {
3293         int ret;
3294
3295         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3296                            ctdb, NULL, NULL, &timeout, NULL);
3297         if (ret != 0) {
3298                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3299                 return -1;
3300         }
3301
3302         return 0;
3303 }
3304
3305 /*
3306   set the natgw state for a node
3307  */
3308 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3309 {
3310         int ret;
3311         TDB_DATA data;
3312         int32_t res;
3313
3314         data.dsize = sizeof(natgwstate);
3315         data.dptr  = (uint8_t *)&natgwstate;
3316
3317         ret = ctdb_control(ctdb, destnode, 0, 
3318                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3319                            NULL, NULL, &res, &timeout, NULL);
3320         if (ret != 0 || res != 0) {
3321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3322                 return -1;
3323         }
3324
3325         return 0;
3326 }
3327
3328 /*
3329   set the lmaster role for a node
3330  */
3331 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3332 {
3333         int ret;
3334         TDB_DATA data;
3335         int32_t res;
3336
3337         data.dsize = sizeof(lmasterrole);
3338         data.dptr  = (uint8_t *)&lmasterrole;
3339
3340         ret = ctdb_control(ctdb, destnode, 0, 
3341                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3342                            NULL, NULL, &res, &timeout, NULL);
3343         if (ret != 0 || res != 0) {
3344                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3345                 return -1;
3346         }
3347
3348         return 0;
3349 }
3350
3351 /*
3352   set the recmaster role for a node
3353  */
3354 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3355 {
3356         int ret;
3357         TDB_DATA data;
3358         int32_t res;
3359
3360         data.dsize = sizeof(recmasterrole);
3361         data.dptr  = (uint8_t *)&recmasterrole;
3362
3363         ret = ctdb_control(ctdb, destnode, 0, 
3364                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3365                            NULL, NULL, &res, &timeout, NULL);
3366         if (ret != 0 || res != 0) {
3367                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3368                 return -1;
3369         }
3370
3371         return 0;
3372 }
3373
3374 /* enable an eventscript
3375  */
3376 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3377 {
3378         int ret;
3379         TDB_DATA data;
3380         int32_t res;
3381
3382         data.dsize = strlen(script) + 1;
3383         data.dptr  = discard_const(script);
3384
3385         ret = ctdb_control(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3387                            NULL, NULL, &res, &timeout, NULL);
3388         if (ret != 0 || res != 0) {
3389                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3390                 return -1;
3391         }
3392
3393         return 0;
3394 }
3395
3396 /* disable an eventscript
3397  */
3398 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3399 {
3400         int ret;
3401         TDB_DATA data;
3402         int32_t res;
3403
3404         data.dsize = strlen(script) + 1;
3405         data.dptr  = discard_const(script);
3406
3407         ret = ctdb_control(ctdb, destnode, 0, 
3408                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3409                            NULL, NULL, &res, &timeout, NULL);
3410         if (ret != 0 || res != 0) {
3411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3412                 return -1;
3413         }
3414
3415         return 0;
3416 }
3417
3418
3419 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3420 {
3421         int ret;
3422         TDB_DATA data;
3423         int32_t res;
3424
3425         data.dsize = sizeof(*bantime);
3426         data.dptr  = (uint8_t *)bantime;
3427
3428         ret = ctdb_control(ctdb, destnode, 0, 
3429                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3430                            NULL, NULL, &res, &timeout, NULL);
3431         if (ret != 0 || res != 0) {
3432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3433                 return -1;
3434         }
3435
3436         return 0;
3437 }
3438
3439
3440 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3441 {
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res;
3445         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3446
3447         ret = ctdb_control(ctdb, destnode, 0, 
3448                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3449                            tmp_ctx, &outdata, &res, &timeout, NULL);
3450         if (ret != 0 || res != 0) {
3451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3452                 talloc_free(tmp_ctx);
3453                 return -1;
3454         }
3455
3456         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3457         talloc_free(tmp_ctx);
3458
3459         return 0;
3460 }
3461
3462
3463 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3464 {
3465         int ret;
3466         int32_t res;
3467         TDB_DATA data;
3468         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3469
3470         data.dptr = (uint8_t*)db_prio;
3471         data.dsize = sizeof(*db_prio);
3472
3473         ret = ctdb_control(ctdb, destnode, 0, 
3474                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3475                            tmp_ctx, NULL, &res, &timeout, NULL);
3476         if (ret != 0 || res != 0) {
3477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3478                 talloc_free(tmp_ctx);
3479                 return -1;
3480         }
3481
3482         talloc_free(tmp_ctx);
3483
3484         return 0;
3485 }
3486
3487 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3488 {
3489         int ret;
3490         int32_t res;
3491         TDB_DATA data;
3492         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3493
3494         data.dptr = (uint8_t*)&db_id;
3495         data.dsize = sizeof(db_id);
3496
3497         ret = ctdb_control(ctdb, destnode, 0, 
3498                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3499                            tmp_ctx, NULL, &res, &timeout, NULL);
3500         if (ret != 0 || res < 0) {
3501                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3502                 talloc_free(tmp_ctx);
3503                 return -1;
3504         }
3505
3506         if (priority) {
3507                 *priority = res;
3508         }
3509
3510         talloc_free(tmp_ctx);
3511
3512         return 0;
3513 }