example libctdb.a and test program libctdb/tst.c
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33
34
35 struct ctdb_record_handle {
36         struct ctdb_db_context *ctdb_db;
37         TDB_DATA key;
38         TDB_DATA *data;
39         struct ctdb_ltdb_header header;
40 };
41
42
43 /*
44   make a recv call to the local ctdb daemon - called from client context
45
46   This is called when the program wants to wait for a ctdb_call to complete and get the 
47   results. This call will block unless the call has already completed.
48 */
49 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
50 {
51         if (state == NULL) {
52                 return -1;
53         }
54
55         while (state->state < CTDB_CALL_DONE) {
56                 event_loop_once(state->ctdb_db->ctdb->ev);
57         }
58         if (state->state != CTDB_CALL_DONE) {
59                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
60                 talloc_free(state);
61                 return -1;
62         }
63
64         if (state->call->reply_data.dsize) {
65                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
66                                                       state->call->reply_data.dptr,
67                                                       state->call->reply_data.dsize);
68                 call->reply_data.dsize = state->call->reply_data.dsize;
69         } else {
70                 call->reply_data.dptr = NULL;
71                 call->reply_data.dsize = 0;
72         }
73         call->status = state->call->status;
74         talloc_free(state);
75
76         return 0;
77 }
78
79
80
81
82
83
84
85 /*
86   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
87 */
88 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
89 {
90         struct ctdb_client_call_state *state;
91
92         state = ctdb_call_send(ctdb_db, call);
93         return ctdb_call_recv(state, call);
94 }
95
96
97 /*
98   tell the daemon what messaging srvid we will use, and register the message
99   handler function in the client
100 */
101 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
102                              ctdb_message_fn_t handler,
103                              void *private_data)
104                                     
105 {
106         int res;
107         int32_t status;
108         
109         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
110                            tdb_null, NULL, NULL, &status, NULL, NULL);
111         if (res != 0 || status != 0) {
112                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
113                 return -1;
114         }
115
116         /* also need to register the handler with our own ctdb structure */
117         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
118 }
119
120 /*
121   tell the daemon we no longer want a srvid
122 */
123 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
124 {
125         int res;
126         int32_t status;
127         
128         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
129                            tdb_null, NULL, NULL, &status, NULL, NULL);
130         if (res != 0 || status != 0) {
131                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
132                 return -1;
133         }
134
135         /* also need to register the handler with our own ctdb structure */
136         ctdb_deregister_message_handler(ctdb, srvid, private_data);
137         return 0;
138 }
139
140
141
142
143 /*
144   cancel a ctdb_fetch_lock operation, releasing the lock
145  */
146 static int fetch_lock_destructor(struct ctdb_record_handle *h)
147 {
148         ctdb_ltdb_unlock(h->ctdb_db, h->key);
149         return 0;
150 }
151
152 /*
153   force the migration of a record to this node
154  */
155 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
156 {
157         struct ctdb_call call;
158         ZERO_STRUCT(call);
159         call.call_id = CTDB_NULL_FUNC;
160         call.key = key;
161         call.flags = CTDB_IMMEDIATE_MIGRATION;
162         return ctdb_call(ctdb_db, &call);
163 }
164
165 /*
166   get a lock on a record, and return the records data. Blocks until it gets the lock
167  */
168 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
169                                            TDB_DATA key, TDB_DATA *data)
170 {
171         int ret;
172         struct ctdb_record_handle *h;
173
174         /*
175           procedure is as follows:
176
177           1) get the chain lock. 
178           2) check if we are dmaster
179           3) if we are the dmaster then return handle 
180           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
181              reply from ctdbd
182           5) when we get the reply, goto (1)
183          */
184
185         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
186         if (h == NULL) {
187                 return NULL;
188         }
189
190         h->ctdb_db = ctdb_db;
191         h->key     = key;
192         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
193         if (h->key.dptr == NULL) {
194                 talloc_free(h);
195                 return NULL;
196         }
197         h->data    = data;
198
199         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
200                  (const char *)key.dptr));
201
202 again:
203         /* step 1 - get the chain lock */
204         ret = ctdb_ltdb_lock(ctdb_db, key);
205         if (ret != 0) {
206                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
207                 talloc_free(h);
208                 return NULL;
209         }
210
211         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
212
213         talloc_set_destructor(h, fetch_lock_destructor);
214
215         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
216
217         /* when torturing, ensure we test the remote path */
218         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
219             random() % 5 == 0) {
220                 h->header.dmaster = (uint32_t)-1;
221         }
222
223
224         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
225
226         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
227                 ctdb_ltdb_unlock(ctdb_db, key);
228                 ret = ctdb_client_force_migration(ctdb_db, key);
229                 if (ret != 0) {
230                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
231                         talloc_free(h);
232                         return NULL;
233                 }
234                 goto again;
235         }
236
237         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
238         return h;
239 }
240
241 /*
242   store some data to the record that was locked with ctdb_fetch_lock()
243 */
244 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
245 {
246         if (h->ctdb_db->persistent) {
247                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
248                 return -1;
249         }
250
251         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
252 }
253
254 /*
255   non-locking fetch of a record
256  */
257 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
258                TDB_DATA key, TDB_DATA *data)
259 {
260         struct ctdb_call call;
261         int ret;
262
263         call.call_id = CTDB_FETCH_FUNC;
264         call.call_data.dptr = NULL;
265         call.call_data.dsize = 0;
266
267         ret = ctdb_call(ctdb_db, &call);
268
269         if (ret == 0) {
270                 *data = call.reply_data;
271                 talloc_steal(mem_ctx, data->dptr);
272         }
273
274         return ret;
275 }
276
277
278
279
280
281
282
283
284 /*
285   send a ctdb control message
286   timeout specifies how long we should wait for a reply.
287   if timeout is NULL we wait indefinitely
288  */
289 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
290                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
291                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
292                  struct timeval *timeout,
293                  char **errormsg)
294 {
295         struct ctdb_client_control_state *state;
296
297         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
298                         flags, data, mem_ctx,
299                         timeout, errormsg);
300         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
301                         errormsg);
302 }
303
304
305
306
307 /*
308   a process exists call. Returns 0 if process exists, -1 otherwise
309  */
310 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
311 {
312         int ret;
313         TDB_DATA data;
314         int32_t status;
315
316         data.dptr = (uint8_t*)&pid;
317         data.dsize = sizeof(pid);
318
319         ret = ctdb_control(ctdb, destnode, 0, 
320                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
321                            NULL, NULL, &status, NULL, NULL);
322         if (ret != 0) {
323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
324                 return -1;
325         }
326
327         return status;
328 }
329
330 /*
331   get remote statistics
332  */
333 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
334 {
335         int ret;
336         TDB_DATA data;
337         int32_t res;
338
339         ret = ctdb_control(ctdb, destnode, 0, 
340                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
341                            ctdb, &data, &res, NULL, NULL);
342         if (ret != 0 || res != 0) {
343                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
344                 return -1;
345         }
346
347         if (data.dsize != sizeof(struct ctdb_statistics)) {
348                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
349                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
350                       return -1;
351         }
352
353         *status = *(struct ctdb_statistics *)data.dptr;
354         talloc_free(data.dptr);
355                         
356         return 0;
357 }
358
359 /*
360   shutdown a remote ctdb node
361  */
362 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
363 {
364         struct ctdb_client_control_state *state;
365
366         state = ctdb_control_send(ctdb, destnode, 0, 
367                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
368                            NULL, &timeout, NULL);
369         if (state == NULL) {
370                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
371                 return -1;
372         }
373
374         return 0;
375 }
376
377 /*
378   get vnn map from a remote node
379  */
380 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
381 {
382         int ret;
383         TDB_DATA outdata;
384         int32_t res;
385         struct ctdb_vnn_map_wire *map;
386
387         ret = ctdb_control(ctdb, destnode, 0, 
388                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
389                            mem_ctx, &outdata, &res, &timeout, NULL);
390         if (ret != 0 || res != 0) {
391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
392                 return -1;
393         }
394         
395         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
396         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
397             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
398                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
399                 return -1;
400         }
401
402         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
403         CTDB_NO_MEMORY(ctdb, *vnnmap);
404         (*vnnmap)->generation = map->generation;
405         (*vnnmap)->size       = map->size;
406         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
407
408         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
409         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
410         talloc_free(outdata.dptr);
411                     
412         return 0;
413 }
414
415
416 /*
417   get the recovery mode of a remote node
418  */
419 struct ctdb_client_control_state *
420 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
421 {
422         return ctdb_control_send(ctdb, destnode, 0, 
423                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
424                            mem_ctx, &timeout, NULL);
425 }
426
427 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
428 {
429         int ret;
430         int32_t res;
431
432         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
433         if (ret != 0) {
434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
435                 return -1;
436         }
437
438         if (recmode) {
439                 *recmode = (uint32_t)res;
440         }
441
442         return 0;
443 }
444
445 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
446 {
447         struct ctdb_client_control_state *state;
448
449         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
450         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
451 }
452
453
454
455
456 /*
457   set the recovery mode of a remote node
458  */
459 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
460 {
461         int ret;
462         TDB_DATA data;
463         int32_t res;
464
465         data.dsize = sizeof(uint32_t);
466         data.dptr = (unsigned char *)&recmode;
467
468         ret = ctdb_control(ctdb, destnode, 0, 
469                            CTDB_CONTROL_SET_RECMODE, 0, data, 
470                            NULL, NULL, &res, &timeout, NULL);
471         if (ret != 0 || res != 0) {
472                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
473                 return -1;
474         }
475
476         return 0;
477 }
478
479
480
481 /*
482   get the recovery master of a remote node
483  */
484 struct ctdb_client_control_state *
485 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
486                         struct timeval timeout, uint32_t destnode)
487 {
488         return ctdb_control_send(ctdb, destnode, 0, 
489                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
490                            mem_ctx, &timeout, NULL);
491 }
492
493 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
494 {
495         int ret;
496         int32_t res;
497
498         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
501                 return -1;
502         }
503
504         if (recmaster) {
505                 *recmaster = (uint32_t)res;
506         }
507
508         return 0;
509 }
510
511 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
512 {
513         struct ctdb_client_control_state *state;
514
515         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
516         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
517 }
518
519
520 /*
521   set the recovery master of a remote node
522  */
523 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
524 {
525         int ret;
526         TDB_DATA data;
527         int32_t res;
528
529         ZERO_STRUCT(data);
530         data.dsize = sizeof(uint32_t);
531         data.dptr = (unsigned char *)&recmaster;
532
533         ret = ctdb_control(ctdb, destnode, 0, 
534                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
535                            NULL, NULL, &res, &timeout, NULL);
536         if (ret != 0 || res != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
538                 return -1;
539         }
540
541         return 0;
542 }
543
544
545 /*
546   get a list of databases off a remote node
547  */
548 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
549                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
550 {
551         int ret;
552         TDB_DATA outdata;
553         int32_t res;
554
555         ret = ctdb_control(ctdb, destnode, 0, 
556                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
557                            mem_ctx, &outdata, &res, &timeout, NULL);
558         if (ret != 0 || res != 0) {
559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
560                 return -1;
561         }
562
563         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
564         talloc_free(outdata.dptr);
565                     
566         return 0;
567 }
568
569 /*
570   get a list of nodes (vnn and flags ) from a remote node
571  */
572 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
573                 struct timeval timeout, uint32_t destnode, 
574                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
575 {
576         int ret;
577         TDB_DATA outdata;
578         int32_t res;
579
580         ret = ctdb_control(ctdb, destnode, 0, 
581                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
582                            mem_ctx, &outdata, &res, &timeout, NULL);
583         if (ret == 0 && res == -1 && outdata.dsize == 0) {
584                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
585                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
586         }
587         if (ret != 0 || res != 0 || outdata.dsize == 0) {
588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
589                 return -1;
590         }
591
592         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
593         talloc_free(outdata.dptr);
594                     
595         return 0;
596 }
597
598 /*
599   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
600  */
601 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
602                 struct timeval timeout, uint32_t destnode, 
603                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
604 {
605         int ret, i, len;
606         TDB_DATA outdata;
607         struct ctdb_node_mapv4 *nodemapv4;
608         int32_t res;
609
610         ret = ctdb_control(ctdb, destnode, 0, 
611                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
612                            mem_ctx, &outdata, &res, &timeout, NULL);
613         if (ret != 0 || res != 0 || outdata.dsize == 0) {
614                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
615                 return -1;
616         }
617
618         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
619
620         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
621         (*nodemap) = talloc_zero_size(mem_ctx, len);
622         CTDB_NO_MEMORY(ctdb, (*nodemap));
623
624         (*nodemap)->num = nodemapv4->num;
625         for (i=0; i<nodemapv4->num; i++) {
626                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
627                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
628                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
629                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
630         }
631                 
632         talloc_free(outdata.dptr);
633                     
634         return 0;
635 }
636
637 /*
638   drop the transport, reload the nodes file and restart the transport
639  */
640 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
641                     struct timeval timeout, uint32_t destnode)
642 {
643         int ret;
644         int32_t res;
645
646         ret = ctdb_control(ctdb, destnode, 0, 
647                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
648                            NULL, NULL, &res, &timeout, NULL);
649         if (ret != 0 || res != 0) {
650                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
651                 return -1;
652         }
653
654         return 0;
655 }
656
657
658 /*
659   set vnn map on a node
660  */
661 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
662                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
663 {
664         int ret;
665         TDB_DATA data;
666         int32_t res;
667         struct ctdb_vnn_map_wire *map;
668         size_t len;
669
670         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
671         map = talloc_size(mem_ctx, len);
672         CTDB_NO_MEMORY(ctdb, map);
673
674         map->generation = vnnmap->generation;
675         map->size = vnnmap->size;
676         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
677         
678         data.dsize = len;
679         data.dptr  = (uint8_t *)map;
680
681         ret = ctdb_control(ctdb, destnode, 0, 
682                            CTDB_CONTROL_SETVNNMAP, 0, data, 
683                            NULL, NULL, &res, &timeout, NULL);
684         if (ret != 0 || res != 0) {
685                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
686                 return -1;
687         }
688
689         talloc_free(map);
690
691         return 0;
692 }
693
694
695 /*
696   async send for pull database
697  */
698 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
699         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
700         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
701 {
702         TDB_DATA indata;
703         struct ctdb_control_pulldb *pull;
704         struct ctdb_client_control_state *state;
705
706         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
707         CTDB_NO_MEMORY_NULL(ctdb, pull);
708
709         pull->db_id   = dbid;
710         pull->lmaster = lmaster;
711
712         indata.dsize = sizeof(struct ctdb_control_pulldb);
713         indata.dptr  = (unsigned char *)pull;
714
715         state = ctdb_control_send(ctdb, destnode, 0, 
716                                   CTDB_CONTROL_PULL_DB, 0, indata, 
717                                   mem_ctx, &timeout, NULL);
718         talloc_free(pull);
719
720         return state;
721 }
722
723 /*
724   async recv for pull database
725  */
726 int ctdb_ctrl_pulldb_recv(
727         struct ctdb_context *ctdb, 
728         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
729         TDB_DATA *outdata)
730 {
731         int ret;
732         int32_t res;
733
734         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
735         if ( (ret != 0) || (res != 0) ){
736                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
737                 return -1;
738         }
739
740         return 0;
741 }
742
743 /*
744   pull all keys and records for a specific database on a node
745  */
746 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
747                 uint32_t dbid, uint32_t lmaster, 
748                 TALLOC_CTX *mem_ctx, struct timeval timeout,
749                 TDB_DATA *outdata)
750 {
751         struct ctdb_client_control_state *state;
752
753         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
754                                       timeout);
755         
756         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
757 }
758
759
760 /*
761   change dmaster for all keys in the database to the new value
762  */
763 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
764                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
765 {
766         int ret;
767         TDB_DATA indata;
768         int32_t res;
769
770         indata.dsize = 2*sizeof(uint32_t);
771         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
772
773         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
774         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
775
776         ret = ctdb_control(ctdb, destnode, 0, 
777                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
778                            NULL, NULL, &res, &timeout, NULL);
779         if (ret != 0 || res != 0) {
780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
781                 return -1;
782         }
783
784         return 0;
785 }
786
787 /*
788   ping a node, return number of clients connected
789  */
790 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
791 {
792         int ret;
793         int32_t res;
794
795         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
796                            tdb_null, NULL, NULL, &res, NULL, NULL);
797         if (ret != 0) {
798                 return -1;
799         }
800         return res;
801 }
802
803 /*
804   find the real path to a ltdb 
805  */
806 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
807                    const char **path)
808 {
809         int ret;
810         int32_t res;
811         TDB_DATA data;
812
813         data.dptr = (uint8_t *)&dbid;
814         data.dsize = sizeof(dbid);
815
816         ret = ctdb_control(ctdb, destnode, 0, 
817                            CTDB_CONTROL_GETDBPATH, 0, data, 
818                            mem_ctx, &data, &res, &timeout, NULL);
819         if (ret != 0 || res != 0) {
820                 return -1;
821         }
822
823         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
824         if ((*path) == NULL) {
825                 return -1;
826         }
827
828         talloc_free(data.dptr);
829
830         return 0;
831 }
832
833 /*
834   find the name of a db 
835  */
836 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
837                    const char **name)
838 {
839         int ret;
840         int32_t res;
841         TDB_DATA data;
842
843         data.dptr = (uint8_t *)&dbid;
844         data.dsize = sizeof(dbid);
845
846         ret = ctdb_control(ctdb, destnode, 0, 
847                            CTDB_CONTROL_GET_DBNAME, 0, data, 
848                            mem_ctx, &data, &res, &timeout, NULL);
849         if (ret != 0 || res != 0) {
850                 return -1;
851         }
852
853         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
854         if ((*name) == NULL) {
855                 return -1;
856         }
857
858         talloc_free(data.dptr);
859
860         return 0;
861 }
862
863 /*
864   get the health status of a db
865  */
866 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
867                           struct timeval timeout,
868                           uint32_t destnode,
869                           uint32_t dbid, TALLOC_CTX *mem_ctx,
870                           const char **reason)
871 {
872         int ret;
873         int32_t res;
874         TDB_DATA data;
875
876         data.dptr = (uint8_t *)&dbid;
877         data.dsize = sizeof(dbid);
878
879         ret = ctdb_control(ctdb, destnode, 0,
880                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
881                            mem_ctx, &data, &res, &timeout, NULL);
882         if (ret != 0 || res != 0) {
883                 return -1;
884         }
885
886         if (data.dsize == 0) {
887                 (*reason) = NULL;
888                 return 0;
889         }
890
891         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
892         if ((*reason) == NULL) {
893                 return -1;
894         }
895
896         talloc_free(data.dptr);
897
898         return 0;
899 }
900
901 /*
902   create a database
903  */
904 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
905                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
906 {
907         int ret;
908         int32_t res;
909         TDB_DATA data;
910
911         data.dptr = discard_const(name);
912         data.dsize = strlen(name)+1;
913
914         ret = ctdb_control(ctdb, destnode, 0, 
915                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
916                            0, data, 
917                            mem_ctx, &data, &res, &timeout, NULL);
918
919         if (ret != 0 || res != 0) {
920                 return -1;
921         }
922
923         return 0;
924 }
925
926 /*
927   get debug level on a node
928  */
929 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
930 {
931         int ret;
932         int32_t res;
933         TDB_DATA data;
934
935         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
936                            ctdb, &data, &res, NULL, NULL);
937         if (ret != 0 || res != 0) {
938                 return -1;
939         }
940         if (data.dsize != sizeof(int32_t)) {
941                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
942                          (unsigned)data.dsize));
943                 return -1;
944         }
945         *level = *(int32_t *)data.dptr;
946         talloc_free(data.dptr);
947         return 0;
948 }
949
950 /*
951   set debug level on a node
952  */
953 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
954 {
955         int ret;
956         int32_t res;
957         TDB_DATA data;
958
959         data.dptr = (uint8_t *)&level;
960         data.dsize = sizeof(level);
961
962         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
963                            NULL, NULL, &res, NULL, NULL);
964         if (ret != 0 || res != 0) {
965                 return -1;
966         }
967         return 0;
968 }
969
970
971 /*
972   get a list of connected nodes
973  */
974 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
975                                 struct timeval timeout,
976                                 TALLOC_CTX *mem_ctx,
977                                 uint32_t *num_nodes)
978 {
979         struct ctdb_node_map *map=NULL;
980         int ret, i;
981         uint32_t *nodes;
982
983         *num_nodes = 0;
984
985         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
986         if (ret != 0) {
987                 return NULL;
988         }
989
990         nodes = talloc_array(mem_ctx, uint32_t, map->num);
991         if (nodes == NULL) {
992                 return NULL;
993         }
994
995         for (i=0;i<map->num;i++) {
996                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
997                         nodes[*num_nodes] = map->nodes[i].pnn;
998                         (*num_nodes)++;
999                 }
1000         }
1001
1002         return nodes;
1003 }
1004
1005
1006 /*
1007   reset remote status
1008  */
1009 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1010 {
1011         int ret;
1012         int32_t res;
1013
1014         ret = ctdb_control(ctdb, destnode, 0, 
1015                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1016                            NULL, NULL, &res, NULL, NULL);
1017         if (ret != 0 || res != 0) {
1018                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1019                 return -1;
1020         }
1021         return 0;
1022 }
1023
1024 /*
1025   this is the dummy null procedure that all databases support
1026 */
1027 static int ctdb_null_func(struct ctdb_call_info *call)
1028 {
1029         return 0;
1030 }
1031
1032 /*
1033   this is a plain fetch procedure that all databases support
1034 */
1035 static int ctdb_fetch_func(struct ctdb_call_info *call)
1036 {
1037         call->reply_data = &call->record_data;
1038         return 0;
1039 }
1040
1041 /*
1042   attach to a specific database - client call
1043 */
1044 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1045 {
1046         struct ctdb_db_context *ctdb_db;
1047         TDB_DATA data;
1048         int ret;
1049         int32_t res;
1050
1051         ctdb_db = ctdb_db_handle(ctdb, name);
1052         if (ctdb_db) {
1053                 return ctdb_db;
1054         }
1055
1056         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1057         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1058
1059         ctdb_db->ctdb = ctdb;
1060         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1061         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1062
1063         data.dptr = discard_const(name);
1064         data.dsize = strlen(name)+1;
1065
1066         /* tell ctdb daemon to attach */
1067         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1068                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1069                            0, data, ctdb_db, &data, &res, NULL, NULL);
1070         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1071                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1072                 talloc_free(ctdb_db);
1073                 return NULL;
1074         }
1075         
1076         ctdb_db->db_id = *(uint32_t *)data.dptr;
1077         talloc_free(data.dptr);
1078
1079         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1080         if (ret != 0) {
1081                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1082                 talloc_free(ctdb_db);
1083                 return NULL;
1084         }
1085
1086         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1087         if (ctdb->valgrinding) {
1088                 tdb_flags |= TDB_NOMMAP;
1089         }
1090         tdb_flags |= TDB_DISALLOW_NESTING;
1091
1092         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1093         if (ctdb_db->ltdb == NULL) {
1094                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1095                 talloc_free(ctdb_db);
1096                 return NULL;
1097         }
1098
1099         ctdb_db->persistent = persistent;
1100
1101         DLIST_ADD(ctdb->db_list, ctdb_db);
1102
1103         /* add well known functions */
1104         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1105         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1106
1107         return ctdb_db;
1108 }
1109
1110
1111 /*
1112   setup a call for a database
1113  */
1114 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1115 {
1116         struct ctdb_registered_call *call;
1117
1118 #if 0
1119         TDB_DATA data;
1120         int32_t status;
1121         struct ctdb_control_set_call c;
1122         int ret;
1123
1124         /* this is no longer valid with the separate daemon architecture */
1125         c.db_id = ctdb_db->db_id;
1126         c.fn    = fn;
1127         c.id    = id;
1128
1129         data.dptr = (uint8_t *)&c;
1130         data.dsize = sizeof(c);
1131
1132         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1133                            data, NULL, NULL, &status, NULL, NULL);
1134         if (ret != 0 || status != 0) {
1135                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1136                 return -1;
1137         }
1138 #endif
1139
1140         /* also register locally */
1141         call = talloc(ctdb_db, struct ctdb_registered_call);
1142         call->fn = fn;
1143         call->id = id;
1144
1145         DLIST_ADD(ctdb_db->calls, call);        
1146         return 0;
1147 }
1148
1149
1150 struct traverse_state {
1151         bool done;
1152         uint32_t count;
1153         ctdb_traverse_func fn;
1154         void *private_data;
1155 };
1156
1157 /*
1158   called on each key during a ctdb_traverse
1159  */
1160 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1161 {
1162         struct traverse_state *state = (struct traverse_state *)p;
1163         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1164         TDB_DATA key;
1165
1166         if (data.dsize < sizeof(uint32_t) ||
1167             d->length != data.dsize) {
1168                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1169                 state->done = True;
1170                 return;
1171         }
1172
1173         key.dsize = d->keylen;
1174         key.dptr  = &d->data[0];
1175         data.dsize = d->datalen;
1176         data.dptr = &d->data[d->keylen];
1177
1178         if (key.dsize == 0 && data.dsize == 0) {
1179                 /* end of traverse */
1180                 state->done = True;
1181                 return;
1182         }
1183
1184         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1185                 /* empty records are deleted records in ctdb */
1186                 return;
1187         }
1188
1189         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1190                 state->done = True;
1191         }
1192
1193         state->count++;
1194 }
1195
1196
1197 /*
1198   start a cluster wide traverse, calling the supplied fn on each record
1199   return the number of records traversed, or -1 on error
1200  */
1201 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1202 {
1203         TDB_DATA data;
1204         struct ctdb_traverse_start t;
1205         int32_t status;
1206         int ret;
1207         uint64_t srvid = (getpid() | 0xFLL<<60);
1208         struct traverse_state state;
1209
1210         state.done = False;
1211         state.count = 0;
1212         state.private_data = private_data;
1213         state.fn = fn;
1214
1215         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1216         if (ret != 0) {
1217                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1218                 return -1;
1219         }
1220
1221         t.db_id = ctdb_db->db_id;
1222         t.srvid = srvid;
1223         t.reqid = 0;
1224
1225         data.dptr = (uint8_t *)&t;
1226         data.dsize = sizeof(t);
1227
1228         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1229                            data, NULL, NULL, &status, NULL, NULL);
1230         if (ret != 0 || status != 0) {
1231                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1232                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1233                 return -1;
1234         }
1235
1236         while (!state.done) {
1237                 event_loop_once(ctdb_db->ctdb->ev);
1238         }
1239
1240         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1241         if (ret != 0) {
1242                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1243                 return -1;
1244         }
1245
1246         return state.count;
1247 }
1248
1249 #define ISASCII(x) ((x>31)&&(x<128))
1250 /*
1251   called on each key during a catdb
1252  */
1253 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1254 {
1255         int i;
1256         FILE *f = (FILE *)p;
1257         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1258
1259         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1260         for (i=0;i<key.dsize;i++) {
1261                 if (ISASCII(key.dptr[i])) {
1262                         fprintf(f, "%c", key.dptr[i]);
1263                 } else {
1264                         fprintf(f, "\\%02X", key.dptr[i]);
1265                 }
1266         }
1267         fprintf(f, "\"\n");
1268
1269         fprintf(f, "dmaster: %u\n", h->dmaster);
1270         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1271
1272         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1273         for (i=sizeof(*h);i<data.dsize;i++) {
1274                 if (ISASCII(data.dptr[i])) {
1275                         fprintf(f, "%c", data.dptr[i]);
1276                 } else {
1277                         fprintf(f, "\\%02X", data.dptr[i]);
1278                 }
1279         }
1280         fprintf(f, "\"\n");
1281
1282         fprintf(f, "\n");
1283
1284         return 0;
1285 }
1286
1287 /*
1288   convenience function to list all keys to stdout
1289  */
1290 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1291 {
1292         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1293 }
1294
1295 /*
1296   get the pid of a ctdb daemon
1297  */
1298 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1299 {
1300         int ret;
1301         int32_t res;
1302
1303         ret = ctdb_control(ctdb, destnode, 0, 
1304                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1305                            NULL, NULL, &res, &timeout, NULL);
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1308                 return -1;
1309         }
1310
1311         *pid = res;
1312
1313         return 0;
1314 }
1315
1316
1317 /*
1318   async freeze send control
1319  */
1320 struct ctdb_client_control_state *
1321 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1322 {
1323         return ctdb_control_send(ctdb, destnode, priority, 
1324                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1325                            mem_ctx, &timeout, NULL);
1326 }
1327
1328 /* 
1329    async freeze recv control
1330 */
1331 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1332 {
1333         int ret;
1334         int32_t res;
1335
1336         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1337         if ( (ret != 0) || (res != 0) ){
1338                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1339                 return -1;
1340         }
1341
1342         return 0;
1343 }
1344
1345 /*
1346   freeze databases of a certain priority
1347  */
1348 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1349 {
1350         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1351         struct ctdb_client_control_state *state;
1352         int ret;
1353
1354         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1355         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1356         talloc_free(tmp_ctx);
1357
1358         return ret;
1359 }
1360
1361 /* Freeze all databases */
1362 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1363 {
1364         int i;
1365
1366         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1367                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1368                         return -1;
1369                 }
1370         }
1371         return 0;
1372 }
1373
1374 /*
1375   thaw databases of a certain priority
1376  */
1377 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1378 {
1379         int ret;
1380         int32_t res;
1381
1382         ret = ctdb_control(ctdb, destnode, priority, 
1383                            CTDB_CONTROL_THAW, 0, tdb_null, 
1384                            NULL, NULL, &res, &timeout, NULL);
1385         if (ret != 0 || res != 0) {
1386                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1387                 return -1;
1388         }
1389
1390         return 0;
1391 }
1392
1393 /* thaw all databases */
1394 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1395 {
1396         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1397 }
1398
1399 /*
1400   get pnn of a node, or -1
1401  */
1402 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1403 {
1404         int ret;
1405         int32_t res;
1406
1407         ret = ctdb_control(ctdb, destnode, 0, 
1408                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1409                            NULL, NULL, &res, &timeout, NULL);
1410         if (ret != 0) {
1411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1412                 return -1;
1413         }
1414
1415         return res;
1416 }
1417
1418 /*
1419   get the monitoring mode of a remote node
1420  */
1421 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1422 {
1423         int ret;
1424         int32_t res;
1425
1426         ret = ctdb_control(ctdb, destnode, 0, 
1427                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1428                            NULL, NULL, &res, &timeout, NULL);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1431                 return -1;
1432         }
1433
1434         *monmode = res;
1435
1436         return 0;
1437 }
1438
1439
1440 /*
1441  set the monitoring mode of a remote node to active
1442  */
1443 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1444 {
1445         int ret;
1446         
1447
1448         ret = ctdb_control(ctdb, destnode, 0, 
1449                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1450                            NULL, NULL,NULL, &timeout, NULL);
1451         if (ret != 0) {
1452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1453                 return -1;
1454         }
1455
1456         
1457
1458         return 0;
1459 }
1460
1461 /*
1462   set the monitoring mode of a remote node to disable
1463  */
1464 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1465 {
1466         int ret;
1467         
1468
1469         ret = ctdb_control(ctdb, destnode, 0, 
1470                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1471                            NULL, NULL, NULL, &timeout, NULL);
1472         if (ret != 0) {
1473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1474                 return -1;
1475         }
1476
1477         
1478
1479         return 0;
1480 }
1481
1482
1483
1484 /* 
1485   sent to a node to make it take over an ip address
1486 */
1487 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1488                           uint32_t destnode, struct ctdb_public_ip *ip)
1489 {
1490         TDB_DATA data;
1491         struct ctdb_public_ipv4 ipv4;
1492         int ret;
1493         int32_t res;
1494
1495         if (ip->addr.sa.sa_family == AF_INET) {
1496                 ipv4.pnn = ip->pnn;
1497                 ipv4.sin = ip->addr.ip;
1498
1499                 data.dsize = sizeof(ipv4);
1500                 data.dptr  = (uint8_t *)&ipv4;
1501
1502                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1503                            NULL, &res, &timeout, NULL);
1504         } else {
1505                 data.dsize = sizeof(*ip);
1506                 data.dptr  = (uint8_t *)ip;
1507
1508                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1509                            NULL, &res, &timeout, NULL);
1510         }
1511
1512         if (ret != 0 || res != 0) {
1513                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1514                 return -1;
1515         }
1516
1517         return 0;       
1518 }
1519
1520
1521 /* 
1522   sent to a node to make it release an ip address
1523 */
1524 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1525                          uint32_t destnode, struct ctdb_public_ip *ip)
1526 {
1527         TDB_DATA data;
1528         struct ctdb_public_ipv4 ipv4;
1529         int ret;
1530         int32_t res;
1531
1532         if (ip->addr.sa.sa_family == AF_INET) {
1533                 ipv4.pnn = ip->pnn;
1534                 ipv4.sin = ip->addr.ip;
1535
1536                 data.dsize = sizeof(ipv4);
1537                 data.dptr  = (uint8_t *)&ipv4;
1538
1539                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1540                                    NULL, &res, &timeout, NULL);
1541         } else {
1542                 data.dsize = sizeof(*ip);
1543                 data.dptr  = (uint8_t *)ip;
1544
1545                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1546                                    NULL, &res, &timeout, NULL);
1547         }
1548
1549         if (ret != 0 || res != 0) {
1550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1551                 return -1;
1552         }
1553
1554         return 0;       
1555 }
1556
1557
1558 /*
1559   get a tunable
1560  */
1561 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1562                           struct timeval timeout, 
1563                           uint32_t destnode,
1564                           const char *name, uint32_t *value)
1565 {
1566         struct ctdb_control_get_tunable *t;
1567         TDB_DATA data, outdata;
1568         int32_t res;
1569         int ret;
1570
1571         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1572         data.dptr  = talloc_size(ctdb, data.dsize);
1573         CTDB_NO_MEMORY(ctdb, data.dptr);
1574
1575         t = (struct ctdb_control_get_tunable *)data.dptr;
1576         t->length = strlen(name)+1;
1577         memcpy(t->name, name, t->length);
1578
1579         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1580                            &outdata, &res, &timeout, NULL);
1581         talloc_free(data.dptr);
1582         if (ret != 0 || res != 0) {
1583                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1584                 return -1;
1585         }
1586
1587         if (outdata.dsize != sizeof(uint32_t)) {
1588                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1589                 talloc_free(outdata.dptr);
1590                 return -1;
1591         }
1592         
1593         *value = *(uint32_t *)outdata.dptr;
1594         talloc_free(outdata.dptr);
1595
1596         return 0;
1597 }
1598
1599 /*
1600   set a tunable
1601  */
1602 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1603                           struct timeval timeout, 
1604                           uint32_t destnode,
1605                           const char *name, uint32_t value)
1606 {
1607         struct ctdb_control_set_tunable *t;
1608         TDB_DATA data;
1609         int32_t res;
1610         int ret;
1611
1612         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1613         data.dptr  = talloc_size(ctdb, data.dsize);
1614         CTDB_NO_MEMORY(ctdb, data.dptr);
1615
1616         t = (struct ctdb_control_set_tunable *)data.dptr;
1617         t->length = strlen(name)+1;
1618         memcpy(t->name, name, t->length);
1619         t->value = value;
1620
1621         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1622                            NULL, &res, &timeout, NULL);
1623         talloc_free(data.dptr);
1624         if (ret != 0 || res != 0) {
1625                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1626                 return -1;
1627         }
1628
1629         return 0;
1630 }
1631
1632 /*
1633   list tunables
1634  */
1635 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1636                             struct timeval timeout, 
1637                             uint32_t destnode,
1638                             TALLOC_CTX *mem_ctx,
1639                             const char ***list, uint32_t *count)
1640 {
1641         TDB_DATA outdata;
1642         int32_t res;
1643         int ret;
1644         struct ctdb_control_list_tunable *t;
1645         char *p, *s, *ptr;
1646
1647         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1648                            mem_ctx, &outdata, &res, &timeout, NULL);
1649         if (ret != 0 || res != 0) {
1650                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1651                 return -1;
1652         }
1653
1654         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1655         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1656             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1657                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1658                 talloc_free(outdata.dptr);
1659                 return -1;              
1660         }
1661         
1662         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1663         CTDB_NO_MEMORY(ctdb, p);
1664
1665         talloc_free(outdata.dptr);
1666         
1667         (*list) = NULL;
1668         (*count) = 0;
1669
1670         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1671                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1672                 CTDB_NO_MEMORY(ctdb, *list);
1673                 (*list)[*count] = talloc_strdup(*list, s);
1674                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1675                 (*count)++;
1676         }
1677
1678         talloc_free(p);
1679
1680         return 0;
1681 }
1682
1683
1684 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1685                                    struct timeval timeout, uint32_t destnode,
1686                                    TALLOC_CTX *mem_ctx,
1687                                    uint32_t flags,
1688                                    struct ctdb_all_public_ips **ips)
1689 {
1690         int ret;
1691         TDB_DATA outdata;
1692         int32_t res;
1693
1694         ret = ctdb_control(ctdb, destnode, 0, 
1695                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1696                            mem_ctx, &outdata, &res, &timeout, NULL);
1697         if (ret == 0 && res == -1) {
1698                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1699                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1700         }
1701         if (ret != 0 || res != 0) {
1702           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1703                 return -1;
1704         }
1705
1706         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1707         talloc_free(outdata.dptr);
1708                     
1709         return 0;
1710 }
1711
1712 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1713                              struct timeval timeout, uint32_t destnode,
1714                              TALLOC_CTX *mem_ctx,
1715                              struct ctdb_all_public_ips **ips)
1716 {
1717         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1718                                               destnode, mem_ctx,
1719                                               0, ips);
1720 }
1721
1722 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1723                         struct timeval timeout, uint32_t destnode, 
1724                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1725 {
1726         int ret, i, len;
1727         TDB_DATA outdata;
1728         int32_t res;
1729         struct ctdb_all_public_ipsv4 *ipsv4;
1730
1731         ret = ctdb_control(ctdb, destnode, 0, 
1732                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1733                            mem_ctx, &outdata, &res, &timeout, NULL);
1734         if (ret != 0 || res != 0) {
1735                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1736                 return -1;
1737         }
1738
1739         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1740         len = offsetof(struct ctdb_all_public_ips, ips) +
1741                 ipsv4->num*sizeof(struct ctdb_public_ip);
1742         *ips = talloc_zero_size(mem_ctx, len);
1743         CTDB_NO_MEMORY(ctdb, *ips);
1744         (*ips)->num = ipsv4->num;
1745         for (i=0; i<ipsv4->num; i++) {
1746                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1747                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1748         }
1749
1750         talloc_free(outdata.dptr);
1751                     
1752         return 0;
1753 }
1754
1755 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1756                                  struct timeval timeout, uint32_t destnode,
1757                                  TALLOC_CTX *mem_ctx,
1758                                  const ctdb_sock_addr *addr,
1759                                  struct ctdb_control_public_ip_info **_info)
1760 {
1761         int ret;
1762         TDB_DATA indata;
1763         TDB_DATA outdata;
1764         int32_t res;
1765         struct ctdb_control_public_ip_info *info;
1766         uint32_t len;
1767         uint32_t i;
1768
1769         indata.dptr = discard_const_p(uint8_t, addr);
1770         indata.dsize = sizeof(*addr);
1771
1772         ret = ctdb_control(ctdb, destnode, 0,
1773                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1774                            mem_ctx, &outdata, &res, &timeout, NULL);
1775         if (ret != 0 || res != 0) {
1776                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1777                                 "failed ret:%d res:%d\n",
1778                                 ret, res));
1779                 return -1;
1780         }
1781
1782         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1783         if (len > outdata.dsize) {
1784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1785                                 "returned invalid data with size %u > %u\n",
1786                                 (unsigned int)outdata.dsize,
1787                                 (unsigned int)len));
1788                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1789                 return -1;
1790         }
1791
1792         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1793         len += info->num*sizeof(struct ctdb_control_iface_info);
1794
1795         if (len > outdata.dsize) {
1796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1797                                 "returned invalid data with size %u > %u\n",
1798                                 (unsigned int)outdata.dsize,
1799                                 (unsigned int)len));
1800                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1801                 return -1;
1802         }
1803
1804         /* make sure we null terminate the returned strings */
1805         for (i=0; i < info->num; i++) {
1806                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1807         }
1808
1809         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1810                                                                 outdata.dptr,
1811                                                                 outdata.dsize);
1812         talloc_free(outdata.dptr);
1813         if (*_info == NULL) {
1814                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1815                                 "talloc_memdup size %u failed\n",
1816                                 (unsigned int)outdata.dsize));
1817                 return -1;
1818         }
1819
1820         return 0;
1821 }
1822
1823 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1824                          struct timeval timeout, uint32_t destnode,
1825                          TALLOC_CTX *mem_ctx,
1826                          struct ctdb_control_get_ifaces **_ifaces)
1827 {
1828         int ret;
1829         TDB_DATA outdata;
1830         int32_t res;
1831         struct ctdb_control_get_ifaces *ifaces;
1832         uint32_t len;
1833         uint32_t i;
1834
1835         ret = ctdb_control(ctdb, destnode, 0,
1836                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1837                            mem_ctx, &outdata, &res, &timeout, NULL);
1838         if (ret != 0 || res != 0) {
1839                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1840                                 "failed ret:%d res:%d\n",
1841                                 ret, res));
1842                 return -1;
1843         }
1844
1845         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1846         if (len > outdata.dsize) {
1847                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1848                                 "returned invalid data with size %u > %u\n",
1849                                 (unsigned int)outdata.dsize,
1850                                 (unsigned int)len));
1851                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1852                 return -1;
1853         }
1854
1855         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1856         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1857
1858         if (len > outdata.dsize) {
1859                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1860                                 "returned invalid data with size %u > %u\n",
1861                                 (unsigned int)outdata.dsize,
1862                                 (unsigned int)len));
1863                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1864                 return -1;
1865         }
1866
1867         /* make sure we null terminate the returned strings */
1868         for (i=0; i < ifaces->num; i++) {
1869                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1870         }
1871
1872         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1873                                                                   outdata.dptr,
1874                                                                   outdata.dsize);
1875         talloc_free(outdata.dptr);
1876         if (*_ifaces == NULL) {
1877                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1878                                 "talloc_memdup size %u failed\n",
1879                                 (unsigned int)outdata.dsize));
1880                 return -1;
1881         }
1882
1883         return 0;
1884 }
1885
1886 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1887                              struct timeval timeout, uint32_t destnode,
1888                              TALLOC_CTX *mem_ctx,
1889                              const struct ctdb_control_iface_info *info)
1890 {
1891         int ret;
1892         TDB_DATA indata;
1893         int32_t res;
1894
1895         indata.dptr = discard_const_p(uint8_t, info);
1896         indata.dsize = sizeof(*info);
1897
1898         ret = ctdb_control(ctdb, destnode, 0,
1899                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1900                            mem_ctx, NULL, &res, &timeout, NULL);
1901         if (ret != 0 || res != 0) {
1902                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1903                                 "failed ret:%d res:%d\n",
1904                                 ret, res));
1905                 return -1;
1906         }
1907
1908         return 0;
1909 }
1910
1911 /*
1912   set/clear the permanent disabled bit on a remote node
1913  */
1914 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1915                        uint32_t set, uint32_t clear)
1916 {
1917         int ret;
1918         TDB_DATA data;
1919         struct ctdb_node_map *nodemap=NULL;
1920         struct ctdb_node_flag_change c;
1921         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1922         uint32_t recmaster;
1923         uint32_t *nodes;
1924
1925
1926         /* find the recovery master */
1927         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1928         if (ret != 0) {
1929                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1930                 talloc_free(tmp_ctx);
1931                 return ret;
1932         }
1933
1934
1935         /* read the node flags from the recmaster */
1936         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1937         if (ret != 0) {
1938                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1939                 talloc_free(tmp_ctx);
1940                 return -1;
1941         }
1942         if (destnode >= nodemap->num) {
1943                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1944                 talloc_free(tmp_ctx);
1945                 return -1;
1946         }
1947
1948         c.pnn       = destnode;
1949         c.old_flags = nodemap->nodes[destnode].flags;
1950         c.new_flags = c.old_flags;
1951         c.new_flags |= set;
1952         c.new_flags &= ~clear;
1953
1954         data.dsize = sizeof(c);
1955         data.dptr = (unsigned char *)&c;
1956
1957         /* send the flags update to all connected nodes */
1958         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1959
1960         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1961                                         nodes, 0,
1962                                         timeout, false, data,
1963                                         NULL, NULL,
1964                                         NULL) != 0) {
1965                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1966
1967                 talloc_free(tmp_ctx);
1968                 return -1;
1969         }
1970
1971         talloc_free(tmp_ctx);
1972         return 0;
1973 }
1974
1975
1976 /*
1977   get all tunables
1978  */
1979 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1980                                struct timeval timeout, 
1981                                uint32_t destnode,
1982                                struct ctdb_tunable *tunables)
1983 {
1984         TDB_DATA outdata;
1985         int ret;
1986         int32_t res;
1987
1988         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1989                            &outdata, &res, &timeout, NULL);
1990         if (ret != 0 || res != 0) {
1991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1992                 return -1;
1993         }
1994
1995         if (outdata.dsize != sizeof(*tunables)) {
1996                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1997                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1998                 return -1;              
1999         }
2000
2001         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2002         talloc_free(outdata.dptr);
2003         return 0;
2004 }
2005
2006 /*
2007   add a public address to a node
2008  */
2009 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2010                       struct timeval timeout, 
2011                       uint32_t destnode,
2012                       struct ctdb_control_ip_iface *pub)
2013 {
2014         TDB_DATA data;
2015         int32_t res;
2016         int ret;
2017
2018         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2019         data.dptr  = (unsigned char *)pub;
2020
2021         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2022                            NULL, &res, &timeout, NULL);
2023         if (ret != 0 || res != 0) {
2024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2025                 return -1;
2026         }
2027
2028         return 0;
2029 }
2030
2031 /*
2032   delete a public address from a node
2033  */
2034 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2035                       struct timeval timeout, 
2036                       uint32_t destnode,
2037                       struct ctdb_control_ip_iface *pub)
2038 {
2039         TDB_DATA data;
2040         int32_t res;
2041         int ret;
2042
2043         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2044         data.dptr  = (unsigned char *)pub;
2045
2046         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2047                            NULL, &res, &timeout, NULL);
2048         if (ret != 0 || res != 0) {
2049                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2050                 return -1;
2051         }
2052
2053         return 0;
2054 }
2055
2056 /*
2057   kill a tcp connection
2058  */
2059 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2060                       struct timeval timeout, 
2061                       uint32_t destnode,
2062                       struct ctdb_control_killtcp *killtcp)
2063 {
2064         TDB_DATA data;
2065         int32_t res;
2066         int ret;
2067
2068         data.dsize = sizeof(struct ctdb_control_killtcp);
2069         data.dptr  = (unsigned char *)killtcp;
2070
2071         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2072                            NULL, &res, &timeout, NULL);
2073         if (ret != 0 || res != 0) {
2074                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2075                 return -1;
2076         }
2077
2078         return 0;
2079 }
2080
2081 /*
2082   send a gratious arp
2083  */
2084 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2085                       struct timeval timeout, 
2086                       uint32_t destnode,
2087                       ctdb_sock_addr *addr,
2088                       const char *ifname)
2089 {
2090         TDB_DATA data;
2091         int32_t res;
2092         int ret, len;
2093         struct ctdb_control_gratious_arp *gratious_arp;
2094         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2095
2096
2097         len = strlen(ifname)+1;
2098         gratious_arp = talloc_size(tmp_ctx, 
2099                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2100         CTDB_NO_MEMORY(ctdb, gratious_arp);
2101
2102         gratious_arp->addr = *addr;
2103         gratious_arp->len = len;
2104         memcpy(&gratious_arp->iface[0], ifname, len);
2105
2106
2107         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2108         data.dptr  = (unsigned char *)gratious_arp;
2109
2110         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2111                            NULL, &res, &timeout, NULL);
2112         if (ret != 0 || res != 0) {
2113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2114                 talloc_free(tmp_ctx);
2115                 return -1;
2116         }
2117
2118         talloc_free(tmp_ctx);
2119         return 0;
2120 }
2121
2122 /*
2123   get a list of all tcp tickles that a node knows about for a particular vnn
2124  */
2125 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2126                               struct timeval timeout, uint32_t destnode, 
2127                               TALLOC_CTX *mem_ctx, 
2128                               ctdb_sock_addr *addr,
2129                               struct ctdb_control_tcp_tickle_list **list)
2130 {
2131         int ret;
2132         TDB_DATA data, outdata;
2133         int32_t status;
2134
2135         data.dptr = (uint8_t*)addr;
2136         data.dsize = sizeof(ctdb_sock_addr);
2137
2138         ret = ctdb_control(ctdb, destnode, 0, 
2139                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2140                            mem_ctx, &outdata, &status, NULL, NULL);
2141         if (ret != 0 || status != 0) {
2142                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2143                 return -1;
2144         }
2145
2146         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2147
2148         return status;
2149 }
2150
2151 /*
2152   register a server id
2153  */
2154 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2155                       struct timeval timeout, 
2156                       struct ctdb_server_id *id)
2157 {
2158         TDB_DATA data;
2159         int32_t res;
2160         int ret;
2161
2162         data.dsize = sizeof(struct ctdb_server_id);
2163         data.dptr  = (unsigned char *)id;
2164
2165         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2166                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2167                         0, data, NULL,
2168                         NULL, &res, &timeout, NULL);
2169         if (ret != 0 || res != 0) {
2170                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2171                 return -1;
2172         }
2173
2174         return 0;
2175 }
2176
2177 /*
2178   unregister a server id
2179  */
2180 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2181                       struct timeval timeout, 
2182                       struct ctdb_server_id *id)
2183 {
2184         TDB_DATA data;
2185         int32_t res;
2186         int ret;
2187
2188         data.dsize = sizeof(struct ctdb_server_id);
2189         data.dptr  = (unsigned char *)id;
2190
2191         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2192                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2193                         0, data, NULL,
2194                         NULL, &res, &timeout, NULL);
2195         if (ret != 0 || res != 0) {
2196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2197                 return -1;
2198         }
2199
2200         return 0;
2201 }
2202
2203
2204 /*
2205   check if a server id exists
2206
2207   if a server id does exist, return *status == 1, otherwise *status == 0
2208  */
2209 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2210                       struct timeval timeout, 
2211                       uint32_t destnode,
2212                       struct ctdb_server_id *id,
2213                       uint32_t *status)
2214 {
2215         TDB_DATA data;
2216         int32_t res;
2217         int ret;
2218
2219         data.dsize = sizeof(struct ctdb_server_id);
2220         data.dptr  = (unsigned char *)id;
2221
2222         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2223                         0, data, NULL,
2224                         NULL, &res, &timeout, NULL);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2227                 return -1;
2228         }
2229
2230         if (res) {
2231                 *status = 1;
2232         } else {
2233                 *status = 0;
2234         }
2235
2236         return 0;
2237 }
2238
2239 /*
2240    get the list of server ids that are registered on a node
2241 */
2242 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2243                 TALLOC_CTX *mem_ctx,
2244                 struct timeval timeout, uint32_t destnode, 
2245                 struct ctdb_server_id_list **svid_list)
2246 {
2247         int ret;
2248         TDB_DATA outdata;
2249         int32_t res;
2250
2251         ret = ctdb_control(ctdb, destnode, 0, 
2252                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2253                            mem_ctx, &outdata, &res, &timeout, NULL);
2254         if (ret != 0 || res != 0) {
2255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2256                 return -1;
2257         }
2258
2259         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2260                     
2261         return 0;
2262 }
2263
2264 /*
2265   initialise the ctdb daemon for client applications
2266
2267   NOTE: In current code the daemon does not fork. This is for testing purposes only
2268   and to simplify the code.
2269 */
2270 struct ctdb_context *ctdb_init(struct event_context *ev)
2271 {
2272         int ret;
2273         struct ctdb_context *ctdb;
2274
2275         ctdb = talloc_zero(ev, struct ctdb_context);
2276         if (ctdb == NULL) {
2277                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2278                 return NULL;
2279         }
2280         ctdb->ev  = ev;
2281         ctdb->idr = idr_init(ctdb);
2282         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2283
2284         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2287                 talloc_free(ctdb);
2288                 return NULL;
2289         }
2290
2291         return ctdb;
2292 }
2293
2294
2295 /*
2296   set some ctdb flags
2297 */
2298 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2299 {
2300         ctdb->flags |= flags;
2301 }
2302
2303
2304 /*
2305   return the pnn of this node
2306 */
2307 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2308 {
2309         return ctdb->pnn;
2310 }
2311
2312
2313 /*
2314   get the uptime of a remote node
2315  */
2316 struct ctdb_client_control_state *
2317 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2318 {
2319         return ctdb_control_send(ctdb, destnode, 0, 
2320                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2321                            mem_ctx, &timeout, NULL);
2322 }
2323
2324 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2325 {
2326         int ret;
2327         int32_t res;
2328         TDB_DATA outdata;
2329
2330         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2331         if (ret != 0 || res != 0) {
2332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2333                 return -1;
2334         }
2335
2336         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2337
2338         return 0;
2339 }
2340
2341 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2342 {
2343         struct ctdb_client_control_state *state;
2344
2345         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2346         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2347 }
2348
2349 /*
2350   send a control to execute the "recovered" event script on a node
2351  */
2352 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2353 {
2354         int ret;
2355         int32_t status;
2356
2357         ret = ctdb_control(ctdb, destnode, 0, 
2358                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2359                            NULL, NULL, &status, &timeout, NULL);
2360         if (ret != 0 || status != 0) {
2361                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2362                 return -1;
2363         }
2364
2365         return 0;
2366 }
2367
2368 /* 
2369   callback for the async helpers used when sending the same control
2370   to multiple nodes in parallell.
2371 */
2372 static void async_callback(struct ctdb_client_control_state *state)
2373 {
2374         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2375         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2376         int ret;
2377         TDB_DATA outdata;
2378         int32_t res;
2379         uint32_t destnode = state->c->hdr.destnode;
2380
2381         /* one more node has responded with recmode data */
2382         data->count--;
2383
2384         /* if we failed to push the db, then return an error and let
2385            the main loop try again.
2386         */
2387         if (state->state != CTDB_CONTROL_DONE) {
2388                 if ( !data->dont_log_errors) {
2389                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2390                 }
2391                 data->fail_count++;
2392                 if (data->fail_callback) {
2393                         data->fail_callback(ctdb, destnode, res, outdata,
2394                                         data->callback_data);
2395                 }
2396                 return;
2397         }
2398         
2399         state->async.fn = NULL;
2400
2401         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2402         if ((ret != 0) || (res != 0)) {
2403                 if ( !data->dont_log_errors) {
2404                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2405                 }
2406                 data->fail_count++;
2407                 if (data->fail_callback) {
2408                         data->fail_callback(ctdb, destnode, res, outdata,
2409                                         data->callback_data);
2410                 }
2411         }
2412         if ((ret == 0) && (data->callback != NULL)) {
2413                 data->callback(ctdb, destnode, res, outdata,
2414                                         data->callback_data);
2415         }
2416 }
2417
2418
2419 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2420 {
2421         /* set up the callback functions */
2422         state->async.fn = async_callback;
2423         state->async.private_data = data;
2424         
2425         /* one more control to wait for to complete */
2426         data->count++;
2427 }
2428
2429
2430 /* wait for up to the maximum number of seconds allowed
2431    or until all nodes we expect a response from has replied
2432 */
2433 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2434 {
2435         while (data->count > 0) {
2436                 event_loop_once(ctdb->ev);
2437         }
2438         if (data->fail_count != 0) {
2439                 if (!data->dont_log_errors) {
2440                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2441                                  data->fail_count));
2442                 }
2443                 return -1;
2444         }
2445         return 0;
2446 }
2447
2448
2449 /* 
2450    perform a simple control on the listed nodes
2451    The control cannot return data
2452  */
2453 int ctdb_client_async_control(struct ctdb_context *ctdb,
2454                                 enum ctdb_controls opcode,
2455                                 uint32_t *nodes,
2456                                 uint64_t srvid,
2457                                 struct timeval timeout,
2458                                 bool dont_log_errors,
2459                                 TDB_DATA data,
2460                                 client_async_callback client_callback,
2461                                 client_async_callback fail_callback,
2462                                 void *callback_data)
2463 {
2464         struct client_async_data *async_data;
2465         struct ctdb_client_control_state *state;
2466         int j, num_nodes;
2467
2468         async_data = talloc_zero(ctdb, struct client_async_data);
2469         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2470         async_data->dont_log_errors = dont_log_errors;
2471         async_data->callback = client_callback;
2472         async_data->fail_callback = fail_callback;
2473         async_data->callback_data = callback_data;
2474         async_data->opcode        = opcode;
2475
2476         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2477
2478         /* loop over all nodes and send an async control to each of them */
2479         for (j=0; j<num_nodes; j++) {
2480                 uint32_t pnn = nodes[j];
2481
2482                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2483                                           0, data, async_data, &timeout, NULL);
2484                 if (state == NULL) {
2485                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2486                         talloc_free(async_data);
2487                         return -1;
2488                 }
2489                 
2490                 ctdb_client_async_add(async_data, state);
2491         }
2492
2493         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2494                 talloc_free(async_data);
2495                 return -1;
2496         }
2497
2498         talloc_free(async_data);
2499         return 0;
2500 }
2501
2502 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2503                                 struct ctdb_vnn_map *vnn_map,
2504                                 TALLOC_CTX *mem_ctx,
2505                                 bool include_self)
2506 {
2507         int i, j, num_nodes;
2508         uint32_t *nodes;
2509
2510         for (i=num_nodes=0;i<vnn_map->size;i++) {
2511                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2512                         continue;
2513                 }
2514                 num_nodes++;
2515         } 
2516
2517         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2518         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2519
2520         for (i=j=0;i<vnn_map->size;i++) {
2521                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2522                         continue;
2523                 }
2524                 nodes[j++] = vnn_map->map[i];
2525         } 
2526
2527         return nodes;
2528 }
2529
2530 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2531                                 struct ctdb_node_map *node_map,
2532                                 TALLOC_CTX *mem_ctx,
2533                                 bool include_self)
2534 {
2535         int i, j, num_nodes;
2536         uint32_t *nodes;
2537
2538         for (i=num_nodes=0;i<node_map->num;i++) {
2539                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2540                         continue;
2541                 }
2542                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2543                         continue;
2544                 }
2545                 num_nodes++;
2546         } 
2547
2548         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2549         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2550
2551         for (i=j=0;i<node_map->num;i++) {
2552                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2553                         continue;
2554                 }
2555                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2556                         continue;
2557                 }
2558                 nodes[j++] = node_map->nodes[i].pnn;
2559         } 
2560
2561         return nodes;
2562 }
2563
2564 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2565                                 struct ctdb_node_map *node_map,
2566                                 TALLOC_CTX *mem_ctx,
2567                                 uint32_t pnn)
2568 {
2569         int i, j, num_nodes;
2570         uint32_t *nodes;
2571
2572         for (i=num_nodes=0;i<node_map->num;i++) {
2573                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2574                         continue;
2575                 }
2576                 if (node_map->nodes[i].pnn == pnn) {
2577                         continue;
2578                 }
2579                 num_nodes++;
2580         } 
2581
2582         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2583         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2584
2585         for (i=j=0;i<node_map->num;i++) {
2586                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2587                         continue;
2588                 }
2589                 if (node_map->nodes[i].pnn == pnn) {
2590                         continue;
2591                 }
2592                 nodes[j++] = node_map->nodes[i].pnn;
2593         } 
2594
2595         return nodes;
2596 }
2597
2598 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2599                                 struct ctdb_node_map *node_map,
2600                                 TALLOC_CTX *mem_ctx,
2601                                 bool include_self)
2602 {
2603         int i, j, num_nodes;
2604         uint32_t *nodes;
2605
2606         for (i=num_nodes=0;i<node_map->num;i++) {
2607                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2608                         continue;
2609                 }
2610                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2611                         continue;
2612                 }
2613                 num_nodes++;
2614         } 
2615
2616         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2617         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2618
2619         for (i=j=0;i<node_map->num;i++) {
2620                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2621                         continue;
2622                 }
2623                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2624                         continue;
2625                 }
2626                 nodes[j++] = node_map->nodes[i].pnn;
2627         } 
2628
2629         return nodes;
2630 }
2631
2632 /* 
2633   this is used to test if a pnn lock exists and if it exists will return
2634   the number of connections that pnn has reported or -1 if that recovery
2635   daemon is not running.
2636 */
2637 int
2638 ctdb_read_pnn_lock(int fd, int32_t pnn)
2639 {
2640         struct flock lock;
2641         char c;
2642
2643         lock.l_type = F_WRLCK;
2644         lock.l_whence = SEEK_SET;
2645         lock.l_start = pnn;
2646         lock.l_len = 1;
2647         lock.l_pid = 0;
2648
2649         if (fcntl(fd, F_GETLK, &lock) != 0) {
2650                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2651                 return -1;
2652         }
2653
2654         if (lock.l_type == F_UNLCK) {
2655                 return -1;
2656         }
2657
2658         if (pread(fd, &c, 1, pnn) == -1) {
2659                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2660                 return -1;
2661         }
2662
2663         return c;
2664 }
2665
2666 /*
2667   get capabilities of a remote node
2668  */
2669 struct ctdb_client_control_state *
2670 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2671 {
2672         return ctdb_control_send(ctdb, destnode, 0, 
2673                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2674                            mem_ctx, &timeout, NULL);
2675 }
2676
2677 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2678 {
2679         int ret;
2680         int32_t res;
2681         TDB_DATA outdata;
2682
2683         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2684         if ( (ret != 0) || (res != 0) ) {
2685                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2686                 return -1;
2687         }
2688
2689         if (capabilities) {
2690                 *capabilities = *((uint32_t *)outdata.dptr);
2691         }
2692
2693         return 0;
2694 }
2695
2696 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2697 {
2698         struct ctdb_client_control_state *state;
2699         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2700         int ret;
2701
2702         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2703         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2704         talloc_free(tmp_ctx);
2705         return ret;
2706 }
2707
2708 /**
2709  * check whether a transaction is active on a given db on a given node
2710  */
2711 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2712                                      uint32_t destnode,
2713                                      uint32_t db_id)
2714 {
2715         int32_t status;
2716         int ret;
2717         TDB_DATA indata;
2718
2719         indata.dptr = (uint8_t *)&db_id;
2720         indata.dsize = sizeof(db_id);
2721
2722         ret = ctdb_control(ctdb, destnode, 0,
2723                            CTDB_CONTROL_TRANS2_ACTIVE,
2724                            0, indata, NULL, NULL, &status,
2725                            NULL, NULL);
2726
2727         if (ret != 0) {
2728                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2729                 return -1;
2730         }
2731
2732         return status;
2733 }
2734
2735
2736 struct ctdb_transaction_handle {
2737         struct ctdb_db_context *ctdb_db;
2738         bool in_replay;
2739         /*
2740          * we store the reads and writes done under a transaction:
2741          * - one list stores both reads and writes (m_all),
2742          * - the other just writes (m_write)
2743          */
2744         struct ctdb_marshall_buffer *m_all;
2745         struct ctdb_marshall_buffer *m_write;
2746 };
2747
2748 /* start a transaction on a database */
2749 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2750 {
2751         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2752         return 0;
2753 }
2754
2755 /* start a transaction on a database */
2756 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2757 {
2758         struct ctdb_record_handle *rh;
2759         TDB_DATA key;
2760         TDB_DATA data;
2761         struct ctdb_ltdb_header header;
2762         TALLOC_CTX *tmp_ctx;
2763         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2764         int ret;
2765         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2766         pid_t pid;
2767         int32_t status;
2768
2769         key.dptr = discard_const(keyname);
2770         key.dsize = strlen(keyname);
2771
2772         if (!ctdb_db->persistent) {
2773                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2774                 return -1;
2775         }
2776
2777 again:
2778         tmp_ctx = talloc_new(h);
2779
2780         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2781         if (rh == NULL) {
2782                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2783                 talloc_free(tmp_ctx);
2784                 return -1;
2785         }
2786
2787         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2788                                               CTDB_CURRENT_NODE,
2789                                               ctdb_db->db_id);
2790         if (status == 1) {
2791                 unsigned long int usec = (1000 + random()) % 100000;
2792                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2793                                     "on db_id[0x%08x]. waiting for %lu "
2794                                     "microseconds\n",
2795                                     ctdb_db->db_id, usec));
2796                 talloc_free(tmp_ctx);
2797                 usleep(usec);
2798                 goto again;
2799         }
2800
2801         /*
2802          * store the pid in the database:
2803          * it is not enough that the node is dmaster...
2804          */
2805         pid = getpid();
2806         data.dptr = (unsigned char *)&pid;
2807         data.dsize = sizeof(pid_t);
2808         rh->header.rsn++;
2809         rh->header.dmaster = ctdb_db->ctdb->pnn;
2810         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2811         if (ret != 0) {
2812                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2813                                   "transaction record\n"));
2814                 talloc_free(tmp_ctx);
2815                 return -1;
2816         }
2817
2818         talloc_free(rh);
2819
2820         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2821         if (ret != 0) {
2822                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2823                 talloc_free(tmp_ctx);
2824                 return -1;
2825         }
2826
2827         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2828         if (ret != 0) {
2829                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2830                                  "lock record inside transaction\n"));
2831                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2832                 talloc_free(tmp_ctx);
2833                 goto again;
2834         }
2835
2836         if (header.dmaster != ctdb_db->ctdb->pnn) {
2837                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2838                                    "transaction lock record\n"));
2839                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2840                 talloc_free(tmp_ctx);
2841                 goto again;
2842         }
2843
2844         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2845                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2846                                     "the transaction lock record\n"));
2847                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2848                 talloc_free(tmp_ctx);
2849                 goto again;
2850         }
2851
2852         talloc_free(tmp_ctx);
2853
2854         return 0;
2855 }
2856
2857
2858 /* start a transaction on a database */
2859 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2860                                                        TALLOC_CTX *mem_ctx)
2861 {
2862         struct ctdb_transaction_handle *h;
2863         int ret;
2864
2865         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2866         if (h == NULL) {
2867                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2868                 return NULL;
2869         }
2870
2871         h->ctdb_db = ctdb_db;
2872
2873         ret = ctdb_transaction_fetch_start(h);
2874         if (ret != 0) {
2875                 talloc_free(h);
2876                 return NULL;
2877         }
2878
2879         talloc_set_destructor(h, ctdb_transaction_destructor);
2880
2881         return h;
2882 }
2883
2884
2885
2886 /*
2887   fetch a record inside a transaction
2888  */
2889 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2890                            TALLOC_CTX *mem_ctx, 
2891                            TDB_DATA key, TDB_DATA *data)
2892 {
2893         struct ctdb_ltdb_header header;
2894         int ret;
2895
2896         ZERO_STRUCT(header);
2897
2898         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2899         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2900                 /* record doesn't exist yet */
2901                 *data = tdb_null;
2902                 ret = 0;
2903         }
2904         
2905         if (ret != 0) {
2906                 return ret;
2907         }
2908
2909         if (!h->in_replay) {
2910                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2911                 if (h->m_all == NULL) {
2912                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2913                         return -1;
2914                 }
2915         }
2916
2917         return 0;
2918 }
2919
2920 /*
2921   stores a record inside a transaction
2922  */
2923 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2924                            TDB_DATA key, TDB_DATA data)
2925 {
2926         TALLOC_CTX *tmp_ctx = talloc_new(h);
2927         struct ctdb_ltdb_header header;
2928         TDB_DATA olddata;
2929         int ret;
2930
2931         ZERO_STRUCT(header);
2932
2933         /* we need the header so we can update the RSN */
2934         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2935         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2936                 /* the record doesn't exist - create one with us as dmaster.
2937                    This is only safe because we are in a transaction and this
2938                    is a persistent database */
2939                 ZERO_STRUCT(header);
2940         } else if (ret != 0) {
2941                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2942                 talloc_free(tmp_ctx);
2943                 return ret;
2944         }
2945
2946         if (data.dsize == olddata.dsize &&
2947             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2948                 /* save writing the same data */
2949                 talloc_free(tmp_ctx);
2950                 return 0;
2951         }
2952
2953         header.dmaster = h->ctdb_db->ctdb->pnn;
2954         header.rsn++;
2955
2956         if (!h->in_replay) {
2957                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2958                 if (h->m_all == NULL) {
2959                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2960                         talloc_free(tmp_ctx);
2961                         return -1;
2962                 }
2963         }               
2964
2965         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2966         if (h->m_write == NULL) {
2967                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2968                 talloc_free(tmp_ctx);
2969                 return -1;
2970         }
2971         
2972         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2973
2974         talloc_free(tmp_ctx);
2975         
2976         return ret;
2977 }
2978
2979 /*
2980   replay a transaction
2981  */
2982 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2983 {
2984         int ret, i;
2985         struct ctdb_rec_data *rec = NULL;
2986
2987         h->in_replay = true;
2988         talloc_free(h->m_write);
2989         h->m_write = NULL;
2990
2991         ret = ctdb_transaction_fetch_start(h);
2992         if (ret != 0) {
2993                 return ret;
2994         }
2995
2996         for (i=0;i<h->m_all->count;i++) {
2997                 TDB_DATA key, data;
2998
2999                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3000                 if (rec == NULL) {
3001                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3002                         goto failed;
3003                 }
3004
3005                 if (rec->reqid == 0) {
3006                         /* its a store */
3007                         if (ctdb_transaction_store(h, key, data) != 0) {
3008                                 goto failed;
3009                         }
3010                 } else {
3011                         TDB_DATA data2;
3012                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3013
3014                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3015                                 talloc_free(tmp_ctx);
3016                                 goto failed;
3017                         }
3018                         if (data2.dsize != data.dsize ||
3019                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3020                                 /* the record has changed on us - we have to give up */
3021                                 talloc_free(tmp_ctx);
3022                                 goto failed;
3023                         }
3024                         talloc_free(tmp_ctx);
3025                 }
3026         }
3027         
3028         return 0;
3029
3030 failed:
3031         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3032         return -1;
3033 }
3034
3035
3036 /*
3037   commit a transaction
3038  */
3039 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3040 {
3041         int ret, retries=0;
3042         int32_t status;
3043         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3044         struct timeval timeout;
3045         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3046
3047         talloc_set_destructor(h, NULL);
3048
3049         /* our commit strategy is quite complex.
3050
3051            - we first try to commit the changes to all other nodes
3052
3053            - if that works, then we commit locally and we are done
3054
3055            - if a commit on another node fails, then we need to cancel
3056              the transaction, then restart the transaction (thus
3057              opening a window of time for a pending recovery to
3058              complete), then replay the transaction, checking all the
3059              reads and writes (checking that reads give the same data,
3060              and writes succeed). Then we retry the transaction to the
3061              other nodes
3062         */
3063
3064 again:
3065         if (h->m_write == NULL) {
3066                 /* no changes were made */
3067                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3068                 talloc_free(h);
3069                 return 0;
3070         }
3071
3072         /* tell ctdbd to commit to the other nodes */
3073         timeout = timeval_current_ofs(1, 0);
3074         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3075                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3076                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3077                            &timeout, NULL);
3078         if (ret != 0 || status != 0) {
3079                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3080                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3081                                      ", retrying after 1 second...\n",
3082                                      (retries==0)?"":"retry "));
3083                 sleep(1);
3084
3085                 if (ret != 0) {
3086                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3087                 } else {
3088                         /* work out what error code we will give if we 
3089                            have to fail the operation */
3090                         switch ((enum ctdb_trans2_commit_error)status) {
3091                         case CTDB_TRANS2_COMMIT_SUCCESS:
3092                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3093                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3094                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3095                                 break;
3096                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3097                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3098                                 break;
3099                         }
3100                 }
3101
3102                 if (++retries == 100) {
3103                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3104                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3105                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3106                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3107                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3108                         talloc_free(h);
3109                         return -1;
3110                 }               
3111
3112                 if (ctdb_replay_transaction(h) != 0) {
3113                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3114                                           "transaction on db 0x%08x, "
3115                                           "failure control =%u\n",
3116                                           h->ctdb_db->db_id,
3117                                           (unsigned)failure_control));
3118                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3119                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3120                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3121                         talloc_free(h);
3122                         return -1;
3123                 }
3124                 goto again;
3125         } else {
3126                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3127         }
3128
3129         /* do the real commit locally */
3130         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3131         if (ret != 0) {
3132                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3133                                   "on db id 0x%08x locally, "
3134                                   "failure_control=%u\n",
3135                                   h->ctdb_db->db_id,
3136                                   (unsigned)failure_control));
3137                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3138                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3139                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3140                 talloc_free(h);
3141                 return ret;
3142         }
3143
3144         /* tell ctdbd that we are finished with our local commit */
3145         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3146                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3147                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3148         talloc_free(h);
3149         return 0;
3150 }
3151
3152 /*
3153   recovery daemon ping to main daemon
3154  */
3155 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3156 {
3157         int ret;
3158         int32_t res;
3159
3160         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3161                            ctdb, NULL, &res, NULL, NULL);
3162         if (ret != 0 || res != 0) {
3163                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3164                 return -1;
3165         }
3166
3167         return 0;
3168 }
3169
3170 /* when forking the main daemon and the child process needs to connect back
3171  * to the daemon as a client process, this function can be used to change
3172  * the ctdb context from daemon into client mode
3173  */
3174 int switch_from_server_to_client(struct ctdb_context *ctdb)
3175 {
3176         int ret;
3177
3178         /* shutdown the transport */
3179         if (ctdb->methods) {
3180                 ctdb->methods->shutdown(ctdb);
3181         }
3182
3183         /* get a new event context */
3184         talloc_free(ctdb->ev);
3185         ctdb->ev = event_context_init(ctdb);
3186
3187         close(ctdb->daemon.sd);
3188         ctdb->daemon.sd = -1;
3189
3190         /* initialise ctdb */
3191         ret = ctdb_socket_connect(ctdb);
3192         if (ret != 0) {
3193                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3194                 return -1;
3195         }
3196
3197          return 0;
3198 }
3199
3200 /*
3201   get the status of running the monitor eventscripts: NULL means never run.
3202  */
3203 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3204                 struct timeval timeout, uint32_t destnode, 
3205                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3206                 struct ctdb_scripts_wire **script_status)
3207 {
3208         int ret;
3209         TDB_DATA outdata, indata;
3210         int32_t res;
3211         uint32_t uinttype = type;
3212
3213         indata.dptr = (uint8_t *)&uinttype;
3214         indata.dsize = sizeof(uinttype);
3215
3216         ret = ctdb_control(ctdb, destnode, 0, 
3217                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3218                            mem_ctx, &outdata, &res, &timeout, NULL);
3219         if (ret != 0 || res != 0) {
3220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3221                 return -1;
3222         }
3223
3224         if (outdata.dsize == 0) {
3225                 *script_status = NULL;
3226         } else {
3227                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3228                 talloc_free(outdata.dptr);
3229         }
3230                     
3231         return 0;
3232 }
3233
3234 /*
3235   tell the main daemon how long it took to lock the reclock file
3236  */
3237 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3238 {
3239         int ret;
3240         int32_t res;
3241         TDB_DATA data;
3242
3243         data.dptr = (uint8_t *)&latency;
3244         data.dsize = sizeof(latency);
3245
3246         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3247                            ctdb, NULL, &res, NULL, NULL);
3248         if (ret != 0 || res != 0) {
3249                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3250                 return -1;
3251         }
3252
3253         return 0;
3254 }
3255
3256 /*
3257   get the name of the reclock file
3258  */
3259 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3260                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3261                          const char **name)
3262 {
3263         int ret;
3264         int32_t res;
3265         TDB_DATA data;
3266
3267         ret = ctdb_control(ctdb, destnode, 0, 
3268                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3269                            mem_ctx, &data, &res, &timeout, NULL);
3270         if (ret != 0 || res != 0) {
3271                 return -1;
3272         }
3273
3274         if (data.dsize == 0) {
3275                 *name = NULL;
3276         } else {
3277                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3278         }
3279         talloc_free(data.dptr);
3280
3281         return 0;
3282 }
3283
3284 /*
3285   set the reclock filename for a node
3286  */
3287 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3288 {
3289         int ret;
3290         TDB_DATA data;
3291         int32_t res;
3292
3293         if (reclock == NULL) {
3294                 data.dsize = 0;
3295                 data.dptr  = NULL;
3296         } else {
3297                 data.dsize = strlen(reclock) + 1;
3298                 data.dptr  = discard_const(reclock);
3299         }
3300
3301         ret = ctdb_control(ctdb, destnode, 0, 
3302                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3303                            NULL, NULL, &res, &timeout, NULL);
3304         if (ret != 0 || res != 0) {
3305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3306                 return -1;
3307         }
3308
3309         return 0;
3310 }
3311
3312 /*
3313   stop a node
3314  */
3315 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3316 {
3317         int ret;
3318         int32_t res;
3319
3320         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3321                            ctdb, NULL, &res, &timeout, NULL);
3322         if (ret != 0 || res != 0) {
3323                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3324                 return -1;
3325         }
3326
3327         return 0;
3328 }
3329
3330 /*
3331   continue a node
3332  */
3333 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3334 {
3335         int ret;
3336
3337         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3338                            ctdb, NULL, NULL, &timeout, NULL);
3339         if (ret != 0) {
3340                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3341                 return -1;
3342         }
3343
3344         return 0;
3345 }
3346
3347 /*
3348   set the natgw state for a node
3349  */
3350 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3351 {
3352         int ret;
3353         TDB_DATA data;
3354         int32_t res;
3355
3356         data.dsize = sizeof(natgwstate);
3357         data.dptr  = (uint8_t *)&natgwstate;
3358
3359         ret = ctdb_control(ctdb, destnode, 0, 
3360                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3361                            NULL, NULL, &res, &timeout, NULL);
3362         if (ret != 0 || res != 0) {
3363                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3364                 return -1;
3365         }
3366
3367         return 0;
3368 }
3369
3370 /*
3371   set the lmaster role for a node
3372  */
3373 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3374 {
3375         int ret;
3376         TDB_DATA data;
3377         int32_t res;
3378
3379         data.dsize = sizeof(lmasterrole);
3380         data.dptr  = (uint8_t *)&lmasterrole;
3381
3382         ret = ctdb_control(ctdb, destnode, 0, 
3383                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3384                            NULL, NULL, &res, &timeout, NULL);
3385         if (ret != 0 || res != 0) {
3386                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3387                 return -1;
3388         }
3389
3390         return 0;
3391 }
3392
3393 /*
3394   set the recmaster role for a node
3395  */
3396 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3397 {
3398         int ret;
3399         TDB_DATA data;
3400         int32_t res;
3401
3402         data.dsize = sizeof(recmasterrole);
3403         data.dptr  = (uint8_t *)&recmasterrole;
3404
3405         ret = ctdb_control(ctdb, destnode, 0, 
3406                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3407                            NULL, NULL, &res, &timeout, NULL);
3408         if (ret != 0 || res != 0) {
3409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3410                 return -1;
3411         }
3412
3413         return 0;
3414 }
3415
3416 /* enable an eventscript
3417  */
3418 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3419 {
3420         int ret;
3421         TDB_DATA data;
3422         int32_t res;
3423
3424         data.dsize = strlen(script) + 1;
3425         data.dptr  = discard_const(script);
3426
3427         ret = ctdb_control(ctdb, destnode, 0, 
3428                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3429                            NULL, NULL, &res, &timeout, NULL);
3430         if (ret != 0 || res != 0) {
3431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3432                 return -1;
3433         }
3434
3435         return 0;
3436 }
3437
3438 /* disable an eventscript
3439  */
3440 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3441 {
3442         int ret;
3443         TDB_DATA data;
3444         int32_t res;
3445
3446         data.dsize = strlen(script) + 1;
3447         data.dptr  = discard_const(script);
3448
3449         ret = ctdb_control(ctdb, destnode, 0, 
3450                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3451                            NULL, NULL, &res, &timeout, NULL);
3452         if (ret != 0 || res != 0) {
3453                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3454                 return -1;
3455         }
3456
3457         return 0;
3458 }
3459
3460
3461 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3462 {
3463         int ret;
3464         TDB_DATA data;
3465         int32_t res;
3466
3467         data.dsize = sizeof(*bantime);
3468         data.dptr  = (uint8_t *)bantime;
3469
3470         ret = ctdb_control(ctdb, destnode, 0, 
3471                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3472                            NULL, NULL, &res, &timeout, NULL);
3473         if (ret != 0 || res != 0) {
3474                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3475                 return -1;
3476         }
3477
3478         return 0;
3479 }
3480
3481
3482 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3483 {
3484         int ret;
3485         TDB_DATA outdata;
3486         int32_t res;
3487         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3488
3489         ret = ctdb_control(ctdb, destnode, 0, 
3490                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3491                            tmp_ctx, &outdata, &res, &timeout, NULL);
3492         if (ret != 0 || res != 0) {
3493                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3494                 talloc_free(tmp_ctx);
3495                 return -1;
3496         }
3497
3498         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3499         talloc_free(tmp_ctx);
3500
3501         return 0;
3502 }
3503
3504
3505 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3506 {
3507         int ret;
3508         int32_t res;
3509         TDB_DATA data;
3510         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3511
3512         data.dptr = (uint8_t*)db_prio;
3513         data.dsize = sizeof(*db_prio);
3514
3515         ret = ctdb_control(ctdb, destnode, 0, 
3516                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3517                            tmp_ctx, NULL, &res, &timeout, NULL);
3518         if (ret != 0 || res != 0) {
3519                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524         talloc_free(tmp_ctx);
3525
3526         return 0;
3527 }
3528
3529 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3530 {
3531         int ret;
3532         int32_t res;
3533         TDB_DATA data;
3534         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3535
3536         data.dptr = (uint8_t*)&db_id;
3537         data.dsize = sizeof(db_id);
3538
3539         ret = ctdb_control(ctdb, destnode, 0, 
3540                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3541                            tmp_ctx, NULL, &res, &timeout, NULL);
3542         if (ret != 0 || res < 0) {
3543                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3544                 talloc_free(tmp_ctx);
3545                 return -1;
3546         }
3547
3548         if (priority) {
3549                 *priority = res;
3550         }
3551
3552         talloc_free(tmp_ctx);
3553
3554         return 0;
3555 }