remove the timeout parameter to ctdb_control_send() and
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36 struct ctdb_record_handle {
37         struct ctdb_db_context *ctdb_db;
38         TDB_DATA key;
39         TDB_DATA *data;
40         struct ctdb_ltdb_header header;
41 };
42
43
44 /*
45   make a recv call to the local ctdb daemon - called from client context
46
47   This is called when the program wants to wait for a ctdb_call to complete and get the 
48   results. This call will block unless the call has already completed.
49 */
50 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
51 {
52         if (state == NULL) {
53                 return -1;
54         }
55
56         while (state->state < CTDB_CALL_DONE) {
57                 event_loop_once(state->ctdb_db->ctdb->ev);
58         }
59         if (state->state != CTDB_CALL_DONE) {
60                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
61                 talloc_free(state);
62                 return -1;
63         }
64
65         if (state->call->reply_data.dsize) {
66                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
67                                                       state->call->reply_data.dptr,
68                                                       state->call->reply_data.dsize);
69                 call->reply_data.dsize = state->call->reply_data.dsize;
70         } else {
71                 call->reply_data.dptr = NULL;
72                 call->reply_data.dsize = 0;
73         }
74         call->status = state->call->status;
75         talloc_free(state);
76
77         return 0;
78 }
79
80
81
82
83
84
85
86 /*
87   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
88 */
89 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
90 {
91         struct ctdb_client_call_state *state;
92
93         state = ctdb_call_send(ctdb_db, call);
94         return ctdb_call_recv(state, call);
95 }
96
97
98
99
100
101 /*
102   cancel a ctdb_fetch_lock operation, releasing the lock
103  */
104 static int fetch_lock_destructor(struct ctdb_record_handle *h)
105 {
106         ctdb_ltdb_unlock(h->ctdb_db, h->key);
107         return 0;
108 }
109
110 /*
111   force the migration of a record to this node
112  */
113 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
114 {
115         struct ctdb_call call;
116         ZERO_STRUCT(call);
117         call.call_id = CTDB_NULL_FUNC;
118         call.key = key;
119         call.flags = CTDB_IMMEDIATE_MIGRATION;
120         return ctdb_call(ctdb_db, &call);
121 }
122
123 /*
124   get a lock on a record, and return the records data. Blocks until it gets the lock
125  */
126 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
127                                            TDB_DATA key, TDB_DATA *data)
128 {
129         int ret;
130         struct ctdb_record_handle *h;
131
132         /*
133           procedure is as follows:
134
135           1) get the chain lock. 
136           2) check if we are dmaster
137           3) if we are the dmaster then return handle 
138           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
139              reply from ctdbd
140           5) when we get the reply, goto (1)
141          */
142
143         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
144         if (h == NULL) {
145                 return NULL;
146         }
147
148         h->ctdb_db = ctdb_db;
149         h->key     = key;
150         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
151         if (h->key.dptr == NULL) {
152                 talloc_free(h);
153                 return NULL;
154         }
155         h->data    = data;
156
157         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
158                  (const char *)key.dptr));
159
160 again:
161         /* step 1 - get the chain lock */
162         ret = ctdb_ltdb_lock(ctdb_db, key);
163         if (ret != 0) {
164                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
170
171         talloc_set_destructor(h, fetch_lock_destructor);
172
173         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
174
175         /* when torturing, ensure we test the remote path */
176         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
177             random() % 5 == 0) {
178                 h->header.dmaster = (uint32_t)-1;
179         }
180
181
182         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
183
184         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
185                 ctdb_ltdb_unlock(ctdb_db, key);
186                 ret = ctdb_client_force_migration(ctdb_db, key);
187                 if (ret != 0) {
188                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
189                         talloc_free(h);
190                         return NULL;
191                 }
192                 goto again;
193         }
194
195         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
196         return h;
197 }
198
199 /*
200   store some data to the record that was locked with ctdb_fetch_lock()
201 */
202 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
203 {
204         if (h->ctdb_db->persistent) {
205                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
206                 return -1;
207         }
208
209         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
210 }
211
212 /*
213   non-locking fetch of a record
214  */
215 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
216                TDB_DATA key, TDB_DATA *data)
217 {
218         struct ctdb_call call;
219         int ret;
220
221         call.call_id = CTDB_FETCH_FUNC;
222         call.call_data.dptr = NULL;
223         call.call_data.dsize = 0;
224
225         ret = ctdb_call(ctdb_db, &call);
226
227         if (ret == 0) {
228                 *data = call.reply_data;
229                 talloc_steal(mem_ctx, data->dptr);
230         }
231
232         return ret;
233 }
234
235
236
237
238
239
240
241
242 /*
243   send a ctdb control message
244   timeout specifies how long we should wait for a reply.
245   if timeout is NULL we wait indefinitely
246  */
247 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
248                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
249                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
250                  struct timeval *timeout,
251                  char **errormsg)
252 {
253         struct ctdb_client_control_state *state;
254
255         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
256                         flags, data, mem_ctx,
257                         errormsg);
258         if (state != NULL && timeout && !timeval_is_zero(timeout)) {
259                 event_add_timed(ctdb->ev, state, *timeout, ctdb_control_timeout_func, state);
260         }
261
262         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
263                         errormsg);
264 }
265
266
267
268
269 /*
270   a process exists call. Returns 0 if process exists, -1 otherwise
271  */
272 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
273 {
274         int ret;
275         TDB_DATA data;
276         int32_t status;
277
278         data.dptr = (uint8_t*)&pid;
279         data.dsize = sizeof(pid);
280
281         ret = ctdb_control(ctdb, destnode, 0, 
282                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
283                            NULL, NULL, &status, NULL, NULL);
284         if (ret != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
286                 return -1;
287         }
288
289         return status;
290 }
291
292 /*
293   get remote statistics
294  */
295 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
296 {
297         int ret;
298         TDB_DATA data;
299         int32_t res;
300
301         ret = ctdb_control(ctdb, destnode, 0, 
302                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
303                            ctdb, &data, &res, NULL, NULL);
304         if (ret != 0 || res != 0) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
306                 return -1;
307         }
308
309         if (data.dsize != sizeof(struct ctdb_statistics)) {
310                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
311                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
312                       return -1;
313         }
314
315         *status = *(struct ctdb_statistics *)data.dptr;
316         talloc_free(data.dptr);
317                         
318         return 0;
319 }
320
321 /*
322   shutdown a remote ctdb node
323  */
324 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
325 {
326         struct ctdb_client_control_state *state;
327
328         state = ctdb_control_send(ctdb, destnode, 0, 
329                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
330                            NULL, NULL);
331         if (state == NULL) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
333                 return -1;
334         }
335         if (!timeval_is_zero(&timeout)) {
336                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
337         }
338
339         return 0;
340 }
341
342 /*
343   get vnn map from a remote node
344  */
345 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
346 {
347         int ret;
348         TDB_DATA outdata;
349         int32_t res;
350         struct ctdb_vnn_map_wire *map;
351
352         ret = ctdb_control(ctdb, destnode, 0, 
353                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
354                            mem_ctx, &outdata, &res, &timeout, NULL);
355         if (ret != 0 || res != 0) {
356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
357                 return -1;
358         }
359         
360         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
361         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
362             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
363                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
364                 return -1;
365         }
366
367         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
368         CTDB_NO_MEMORY(ctdb, *vnnmap);
369         (*vnnmap)->generation = map->generation;
370         (*vnnmap)->size       = map->size;
371         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
372
373         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
374         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
375         talloc_free(outdata.dptr);
376                     
377         return 0;
378 }
379
380
381 /*
382   get the recovery mode of a remote node
383  */
384 struct ctdb_client_control_state *
385 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
386 {
387         struct ctdb_client_control_state *state;
388
389         state = ctdb_control_send(ctdb, destnode, 0, 
390                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
391                            mem_ctx, NULL);
392
393         if (state != NULL && !timeval_is_zero(&timeout)) {
394                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
395         }
396
397         return state;
398 }
399
400 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
401 {
402         int ret;
403         int32_t res;
404
405         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
406         if (ret != 0) {
407                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
408                 return -1;
409         }
410
411         if (recmode) {
412                 *recmode = (uint32_t)res;
413         }
414
415         return 0;
416 }
417
418 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
419 {
420         struct ctdb_client_control_state *state;
421
422         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
423         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
424 }
425
426
427
428
429 /*
430   set the recovery mode of a remote node
431  */
432 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
433 {
434         int ret;
435         TDB_DATA data;
436         int32_t res;
437
438         data.dsize = sizeof(uint32_t);
439         data.dptr = (unsigned char *)&recmode;
440
441         ret = ctdb_control(ctdb, destnode, 0, 
442                            CTDB_CONTROL_SET_RECMODE, 0, data, 
443                            NULL, NULL, &res, &timeout, NULL);
444         if (ret != 0 || res != 0) {
445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
446                 return -1;
447         }
448
449         return 0;
450 }
451
452
453
454 /*
455   get the recovery master of a remote node
456  */
457 struct ctdb_client_control_state *
458 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
459                         struct timeval timeout, uint32_t destnode)
460 {
461         struct ctdb_client_control_state *state;
462
463         state = ctdb_control_send(ctdb, destnode, 0, 
464                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
465                            mem_ctx, NULL);
466         if (state != NULL && !timeval_is_zero(&timeout)) {
467                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
468         }
469
470         return state;
471 }
472
473 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
474 {
475         int ret;
476         int32_t res;
477
478         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
479         if (ret != 0) {
480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
481                 return -1;
482         }
483
484         if (recmaster) {
485                 *recmaster = (uint32_t)res;
486         }
487
488         return 0;
489 }
490
491 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
492 {
493         struct ctdb_client_control_state *state;
494
495         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
496         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
497 }
498
499
500 /*
501   set the recovery master of a remote node
502  */
503 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
504 {
505         int ret;
506         TDB_DATA data;
507         int32_t res;
508
509         ZERO_STRUCT(data);
510         data.dsize = sizeof(uint32_t);
511         data.dptr = (unsigned char *)&recmaster;
512
513         ret = ctdb_control(ctdb, destnode, 0, 
514                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
515                            NULL, NULL, &res, &timeout, NULL);
516         if (ret != 0 || res != 0) {
517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
518                 return -1;
519         }
520
521         return 0;
522 }
523
524
525 /*
526   get a list of databases off a remote node
527  */
528 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
529                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
530 {
531         int ret;
532         TDB_DATA outdata;
533         int32_t res;
534
535         ret = ctdb_control(ctdb, destnode, 0, 
536                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
537                            mem_ctx, &outdata, &res, &timeout, NULL);
538         if (ret != 0 || res != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
540                 return -1;
541         }
542
543         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
544         talloc_free(outdata.dptr);
545                     
546         return 0;
547 }
548
549 /*
550   get a list of nodes (vnn and flags ) from a remote node
551  */
552 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
553                 struct timeval timeout, uint32_t destnode, 
554                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
555 {
556         int ret;
557         TDB_DATA outdata;
558         int32_t res;
559
560         ret = ctdb_control(ctdb, destnode, 0, 
561                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
562                            mem_ctx, &outdata, &res, &timeout, NULL);
563         if (ret == 0 && res == -1 && outdata.dsize == 0) {
564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
565                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
566         }
567         if (ret != 0 || res != 0 || outdata.dsize == 0) {
568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
569                 return -1;
570         }
571
572         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
573         talloc_free(outdata.dptr);
574                     
575         return 0;
576 }
577
578 /*
579   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
580  */
581 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
582                 struct timeval timeout, uint32_t destnode, 
583                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
584 {
585         int ret, i, len;
586         TDB_DATA outdata;
587         struct ctdb_node_mapv4 *nodemapv4;
588         int32_t res;
589
590         ret = ctdb_control(ctdb, destnode, 0, 
591                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
592                            mem_ctx, &outdata, &res, &timeout, NULL);
593         if (ret != 0 || res != 0 || outdata.dsize == 0) {
594                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
595                 return -1;
596         }
597
598         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
599
600         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
601         (*nodemap) = talloc_zero_size(mem_ctx, len);
602         CTDB_NO_MEMORY(ctdb, (*nodemap));
603
604         (*nodemap)->num = nodemapv4->num;
605         for (i=0; i<nodemapv4->num; i++) {
606                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
607                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
608                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
609                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
610         }
611                 
612         talloc_free(outdata.dptr);
613                     
614         return 0;
615 }
616
617 /*
618   drop the transport, reload the nodes file and restart the transport
619  */
620 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
621                     struct timeval timeout, uint32_t destnode)
622 {
623         int ret;
624         int32_t res;
625
626         ret = ctdb_control(ctdb, destnode, 0, 
627                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
628                            NULL, NULL, &res, &timeout, NULL);
629         if (ret != 0 || res != 0) {
630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
631                 return -1;
632         }
633
634         return 0;
635 }
636
637
638 /*
639   set vnn map on a node
640  */
641 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
642                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
643 {
644         int ret;
645         TDB_DATA data;
646         int32_t res;
647         struct ctdb_vnn_map_wire *map;
648         size_t len;
649
650         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
651         map = talloc_size(mem_ctx, len);
652         CTDB_NO_MEMORY(ctdb, map);
653
654         map->generation = vnnmap->generation;
655         map->size = vnnmap->size;
656         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
657         
658         data.dsize = len;
659         data.dptr  = (uint8_t *)map;
660
661         ret = ctdb_control(ctdb, destnode, 0, 
662                            CTDB_CONTROL_SETVNNMAP, 0, data, 
663                            NULL, NULL, &res, &timeout, NULL);
664         if (ret != 0 || res != 0) {
665                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
666                 return -1;
667         }
668
669         talloc_free(map);
670
671         return 0;
672 }
673
674
675 /*
676   async send for pull database
677  */
678 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
679         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
680         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
681 {
682         TDB_DATA indata;
683         struct ctdb_control_pulldb *pull;
684         struct ctdb_client_control_state *state;
685
686         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
687         CTDB_NO_MEMORY_NULL(ctdb, pull);
688
689         pull->db_id   = dbid;
690         pull->lmaster = lmaster;
691
692         indata.dsize = sizeof(struct ctdb_control_pulldb);
693         indata.dptr  = (unsigned char *)pull;
694
695         state = ctdb_control_send(ctdb, destnode, 0, 
696                                   CTDB_CONTROL_PULL_DB, 0, indata, 
697                                   mem_ctx, NULL);
698         if (state != NULL && !timeval_is_zero(&timeout)) {
699                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
700         }
701
702         talloc_free(pull);
703
704         return state;
705 }
706
707 /*
708   async recv for pull database
709  */
710 int ctdb_ctrl_pulldb_recv(
711         struct ctdb_context *ctdb, 
712         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
713         TDB_DATA *outdata)
714 {
715         int ret;
716         int32_t res;
717
718         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
719         if ( (ret != 0) || (res != 0) ){
720                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
721                 return -1;
722         }
723
724         return 0;
725 }
726
727 /*
728   pull all keys and records for a specific database on a node
729  */
730 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
731                 uint32_t dbid, uint32_t lmaster, 
732                 TALLOC_CTX *mem_ctx, struct timeval timeout,
733                 TDB_DATA *outdata)
734 {
735         struct ctdb_client_control_state *state;
736
737         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
738                                       timeout);
739         
740         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
741 }
742
743
744 /*
745   change dmaster for all keys in the database to the new value
746  */
747 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
748                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
749 {
750         int ret;
751         TDB_DATA indata;
752         int32_t res;
753
754         indata.dsize = 2*sizeof(uint32_t);
755         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
756
757         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
758         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
759
760         ret = ctdb_control(ctdb, destnode, 0, 
761                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
762                            NULL, NULL, &res, &timeout, NULL);
763         if (ret != 0 || res != 0) {
764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
765                 return -1;
766         }
767
768         return 0;
769 }
770
771 /*
772   ping a node, return number of clients connected
773  */
774 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
775 {
776         int ret;
777         int32_t res;
778
779         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
780                            tdb_null, NULL, NULL, &res, NULL, NULL);
781         if (ret != 0) {
782                 return -1;
783         }
784         return res;
785 }
786
787 /*
788   find the real path to a ltdb 
789  */
790 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
791                    const char **path)
792 {
793         int ret;
794         int32_t res;
795         TDB_DATA data;
796
797         data.dptr = (uint8_t *)&dbid;
798         data.dsize = sizeof(dbid);
799
800         ret = ctdb_control(ctdb, destnode, 0, 
801                            CTDB_CONTROL_GETDBPATH, 0, data, 
802                            mem_ctx, &data, &res, &timeout, NULL);
803         if (ret != 0 || res != 0) {
804                 return -1;
805         }
806
807         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
808         if ((*path) == NULL) {
809                 return -1;
810         }
811
812         talloc_free(data.dptr);
813
814         return 0;
815 }
816
817 /*
818   find the name of a db 
819  */
820 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
821                    const char **name)
822 {
823         int ret;
824         int32_t res;
825         TDB_DATA data;
826
827         data.dptr = (uint8_t *)&dbid;
828         data.dsize = sizeof(dbid);
829
830         ret = ctdb_control(ctdb, destnode, 0, 
831                            CTDB_CONTROL_GET_DBNAME, 0, data, 
832                            mem_ctx, &data, &res, &timeout, NULL);
833         if (ret != 0 || res != 0) {
834                 return -1;
835         }
836
837         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
838         if ((*name) == NULL) {
839                 return -1;
840         }
841
842         talloc_free(data.dptr);
843
844         return 0;
845 }
846
847 /*
848   get the health status of a db
849  */
850 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
851                           struct timeval timeout,
852                           uint32_t destnode,
853                           uint32_t dbid, TALLOC_CTX *mem_ctx,
854                           const char **reason)
855 {
856         int ret;
857         int32_t res;
858         TDB_DATA data;
859
860         data.dptr = (uint8_t *)&dbid;
861         data.dsize = sizeof(dbid);
862
863         ret = ctdb_control(ctdb, destnode, 0,
864                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
865                            mem_ctx, &data, &res, &timeout, NULL);
866         if (ret != 0 || res != 0) {
867                 return -1;
868         }
869
870         if (data.dsize == 0) {
871                 (*reason) = NULL;
872                 return 0;
873         }
874
875         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
876         if ((*reason) == NULL) {
877                 return -1;
878         }
879
880         talloc_free(data.dptr);
881
882         return 0;
883 }
884
885 /*
886   create a database
887  */
888 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
889                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
890 {
891         int ret;
892         int32_t res;
893         TDB_DATA data;
894
895         data.dptr = discard_const(name);
896         data.dsize = strlen(name)+1;
897
898         ret = ctdb_control(ctdb, destnode, 0, 
899                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
900                            0, data, 
901                            mem_ctx, &data, &res, &timeout, NULL);
902
903         if (ret != 0 || res != 0) {
904                 return -1;
905         }
906
907         return 0;
908 }
909
910 /*
911   get debug level on a node
912  */
913 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
914 {
915         int ret;
916         int32_t res;
917         TDB_DATA data;
918
919         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
920                            ctdb, &data, &res, NULL, NULL);
921         if (ret != 0 || res != 0) {
922                 return -1;
923         }
924         if (data.dsize != sizeof(int32_t)) {
925                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
926                          (unsigned)data.dsize));
927                 return -1;
928         }
929         *level = *(int32_t *)data.dptr;
930         talloc_free(data.dptr);
931         return 0;
932 }
933
934 /*
935   set debug level on a node
936  */
937 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
938 {
939         int ret;
940         int32_t res;
941         TDB_DATA data;
942
943         data.dptr = (uint8_t *)&level;
944         data.dsize = sizeof(level);
945
946         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
947                            NULL, NULL, &res, NULL, NULL);
948         if (ret != 0 || res != 0) {
949                 return -1;
950         }
951         return 0;
952 }
953
954
955 /*
956   get a list of connected nodes
957  */
958 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
959                                 struct timeval timeout,
960                                 TALLOC_CTX *mem_ctx,
961                                 uint32_t *num_nodes)
962 {
963         struct ctdb_node_map *map=NULL;
964         int ret, i;
965         uint32_t *nodes;
966
967         *num_nodes = 0;
968
969         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
970         if (ret != 0) {
971                 return NULL;
972         }
973
974         nodes = talloc_array(mem_ctx, uint32_t, map->num);
975         if (nodes == NULL) {
976                 return NULL;
977         }
978
979         for (i=0;i<map->num;i++) {
980                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
981                         nodes[*num_nodes] = map->nodes[i].pnn;
982                         (*num_nodes)++;
983                 }
984         }
985
986         return nodes;
987 }
988
989
990 /*
991   reset remote status
992  */
993 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
994 {
995         int ret;
996         int32_t res;
997
998         ret = ctdb_control(ctdb, destnode, 0, 
999                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1000                            NULL, NULL, &res, NULL, NULL);
1001         if (ret != 0 || res != 0) {
1002                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1003                 return -1;
1004         }
1005         return 0;
1006 }
1007
1008 /*
1009   this is the dummy null procedure that all databases support
1010 */
1011 static int ctdb_null_func(struct ctdb_call_info *call)
1012 {
1013         return 0;
1014 }
1015
1016 /*
1017   this is a plain fetch procedure that all databases support
1018 */
1019 static int ctdb_fetch_func(struct ctdb_call_info *call)
1020 {
1021         call->reply_data = &call->record_data;
1022         return 0;
1023 }
1024
1025 /*
1026   attach to a specific database - client call
1027 */
1028 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1029 {
1030         struct ctdb_db_context *ctdb_db;
1031         TDB_DATA data;
1032         int ret;
1033         int32_t res;
1034
1035         ctdb_db = ctdb_db_handle(ctdb, name);
1036         if (ctdb_db) {
1037                 return ctdb_db;
1038         }
1039
1040         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1041         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1042
1043         ctdb_db->ctdb = ctdb;
1044         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1045         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1046
1047         data.dptr = discard_const(name);
1048         data.dsize = strlen(name)+1;
1049
1050         /* tell ctdb daemon to attach */
1051         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1052                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1053                            0, data, ctdb_db, &data, &res, NULL, NULL);
1054         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1055                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1056                 talloc_free(ctdb_db);
1057                 return NULL;
1058         }
1059         
1060         ctdb_db->db_id = *(uint32_t *)data.dptr;
1061         talloc_free(data.dptr);
1062
1063         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1064         if (ret != 0) {
1065                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1066                 talloc_free(ctdb_db);
1067                 return NULL;
1068         }
1069
1070         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1071         if (ctdb->valgrinding) {
1072                 tdb_flags |= TDB_NOMMAP;
1073         }
1074         tdb_flags |= TDB_DISALLOW_NESTING;
1075
1076         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1077         if (ctdb_db->ltdb == NULL) {
1078                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1079                 talloc_free(ctdb_db);
1080                 return NULL;
1081         }
1082
1083         ctdb_db->persistent = persistent;
1084
1085         DLIST_ADD(ctdb->db_list, ctdb_db);
1086
1087         /* add well known functions */
1088         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1089         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1090
1091         return ctdb_db;
1092 }
1093
1094
1095 /*
1096   setup a call for a database
1097  */
1098 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1099 {
1100         struct ctdb_registered_call *call;
1101
1102 #if 0
1103         TDB_DATA data;
1104         int32_t status;
1105         struct ctdb_control_set_call c;
1106         int ret;
1107
1108         /* this is no longer valid with the separate daemon architecture */
1109         c.db_id = ctdb_db->db_id;
1110         c.fn    = fn;
1111         c.id    = id;
1112
1113         data.dptr = (uint8_t *)&c;
1114         data.dsize = sizeof(c);
1115
1116         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1117                            data, NULL, NULL, &status, NULL, NULL);
1118         if (ret != 0 || status != 0) {
1119                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1120                 return -1;
1121         }
1122 #endif
1123
1124         /* also register locally */
1125         call = talloc(ctdb_db, struct ctdb_registered_call);
1126         call->fn = fn;
1127         call->id = id;
1128
1129         DLIST_ADD(ctdb_db->calls, call);        
1130         return 0;
1131 }
1132
1133
1134 struct traverse_state {
1135         bool done;
1136         uint32_t count;
1137         ctdb_traverse_func fn;
1138         void *private_data;
1139 };
1140
1141 /*
1142   called on each key during a ctdb_traverse
1143  */
1144 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1145 {
1146         struct traverse_state *state = (struct traverse_state *)p;
1147         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1148         TDB_DATA key;
1149
1150         if (data.dsize < sizeof(uint32_t) ||
1151             d->length != data.dsize) {
1152                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1153                 state->done = True;
1154                 return;
1155         }
1156
1157         key.dsize = d->keylen;
1158         key.dptr  = &d->data[0];
1159         data.dsize = d->datalen;
1160         data.dptr = &d->data[d->keylen];
1161
1162         if (key.dsize == 0 && data.dsize == 0) {
1163                 /* end of traverse */
1164                 state->done = True;
1165                 return;
1166         }
1167
1168         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1169                 /* empty records are deleted records in ctdb */
1170                 return;
1171         }
1172
1173         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1174                 state->done = True;
1175         }
1176
1177         state->count++;
1178 }
1179
1180
1181 /*
1182   start a cluster wide traverse, calling the supplied fn on each record
1183   return the number of records traversed, or -1 on error
1184  */
1185 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1186 {
1187         TDB_DATA data;
1188         struct ctdb_traverse_start t;
1189         int32_t status;
1190         int ret;
1191         uint64_t srvid = (getpid() | 0xFLL<<60);
1192         struct traverse_state state;
1193
1194         state.done = False;
1195         state.count = 0;
1196         state.private_data = private_data;
1197         state.fn = fn;
1198
1199         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1200         if (ret != 0) {
1201                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1202                 return -1;
1203         }
1204
1205         t.db_id = ctdb_db->db_id;
1206         t.srvid = srvid;
1207         t.reqid = 0;
1208
1209         data.dptr = (uint8_t *)&t;
1210         data.dsize = sizeof(t);
1211
1212         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1213                            data, NULL, NULL, &status, NULL, NULL);
1214         if (ret != 0 || status != 0) {
1215                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1216                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1217                 return -1;
1218         }
1219
1220         while (!state.done) {
1221                 event_loop_once(ctdb_db->ctdb->ev);
1222         }
1223
1224         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1225         if (ret != 0) {
1226                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1227                 return -1;
1228         }
1229
1230         return state.count;
1231 }
1232
1233 #define ISASCII(x) ((x>31)&&(x<128))
1234 /*
1235   called on each key during a catdb
1236  */
1237 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1238 {
1239         int i;
1240         FILE *f = (FILE *)p;
1241         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1242
1243         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1244         for (i=0;i<key.dsize;i++) {
1245                 if (ISASCII(key.dptr[i])) {
1246                         fprintf(f, "%c", key.dptr[i]);
1247                 } else {
1248                         fprintf(f, "\\%02X", key.dptr[i]);
1249                 }
1250         }
1251         fprintf(f, "\"\n");
1252
1253         fprintf(f, "dmaster: %u\n", h->dmaster);
1254         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1255
1256         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1257         for (i=sizeof(*h);i<data.dsize;i++) {
1258                 if (ISASCII(data.dptr[i])) {
1259                         fprintf(f, "%c", data.dptr[i]);
1260                 } else {
1261                         fprintf(f, "\\%02X", data.dptr[i]);
1262                 }
1263         }
1264         fprintf(f, "\"\n");
1265
1266         fprintf(f, "\n");
1267
1268         return 0;
1269 }
1270
1271 /*
1272   convenience function to list all keys to stdout
1273  */
1274 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1275 {
1276         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1277 }
1278
1279 /*
1280   get the pid of a ctdb daemon
1281  */
1282 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1283 {
1284         int ret;
1285         int32_t res;
1286
1287         ret = ctdb_control(ctdb, destnode, 0, 
1288                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1289                            NULL, NULL, &res, &timeout, NULL);
1290         if (ret != 0) {
1291                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1292                 return -1;
1293         }
1294
1295         *pid = res;
1296
1297         return 0;
1298 }
1299
1300
1301 /*
1302   async freeze send control
1303  */
1304 struct ctdb_client_control_state *
1305 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1306 {
1307         struct ctdb_client_control_state *state;
1308
1309         state = ctdb_control_send(ctdb, destnode, priority, 
1310                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1311                            mem_ctx, NULL);
1312         if (state != NULL && !timeval_is_zero(&timeout)) {
1313                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
1314         }
1315
1316         return state;
1317 }
1318
1319 /* 
1320    async freeze recv control
1321 */
1322 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1323 {
1324         int ret;
1325         int32_t res;
1326
1327         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1328         if ( (ret != 0) || (res != 0) ){
1329                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1330                 return -1;
1331         }
1332
1333         return 0;
1334 }
1335
1336 /*
1337   freeze databases of a certain priority
1338  */
1339 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1340 {
1341         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1342         struct ctdb_client_control_state *state;
1343         int ret;
1344
1345         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1346         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1347         talloc_free(tmp_ctx);
1348
1349         return ret;
1350 }
1351
1352 /* Freeze all databases */
1353 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1354 {
1355         int i;
1356
1357         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1358                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1359                         return -1;
1360                 }
1361         }
1362         return 0;
1363 }
1364
1365 /*
1366   thaw databases of a certain priority
1367  */
1368 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1369 {
1370         int ret;
1371         int32_t res;
1372
1373         ret = ctdb_control(ctdb, destnode, priority, 
1374                            CTDB_CONTROL_THAW, 0, tdb_null, 
1375                            NULL, NULL, &res, &timeout, NULL);
1376         if (ret != 0 || res != 0) {
1377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1378                 return -1;
1379         }
1380
1381         return 0;
1382 }
1383
1384 /* thaw all databases */
1385 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1386 {
1387         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1388 }
1389
1390 /*
1391   get pnn of a node, or -1
1392  */
1393 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1394 {
1395         int ret;
1396         int32_t res;
1397
1398         ret = ctdb_control(ctdb, destnode, 0, 
1399                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1400                            NULL, NULL, &res, &timeout, NULL);
1401         if (ret != 0) {
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1403                 return -1;
1404         }
1405
1406         return res;
1407 }
1408
1409 /*
1410   get the monitoring mode of a remote node
1411  */
1412 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1413 {
1414         int ret;
1415         int32_t res;
1416
1417         ret = ctdb_control(ctdb, destnode, 0, 
1418                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1419                            NULL, NULL, &res, &timeout, NULL);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1422                 return -1;
1423         }
1424
1425         *monmode = res;
1426
1427         return 0;
1428 }
1429
1430
1431 /*
1432  set the monitoring mode of a remote node to active
1433  */
1434 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1435 {
1436         int ret;
1437         
1438
1439         ret = ctdb_control(ctdb, destnode, 0, 
1440                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1441                            NULL, NULL,NULL, &timeout, NULL);
1442         if (ret != 0) {
1443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1444                 return -1;
1445         }
1446
1447         
1448
1449         return 0;
1450 }
1451
1452 /*
1453   set the monitoring mode of a remote node to disable
1454  */
1455 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1456 {
1457         int ret;
1458         
1459
1460         ret = ctdb_control(ctdb, destnode, 0, 
1461                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1462                            NULL, NULL, NULL, &timeout, NULL);
1463         if (ret != 0) {
1464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1465                 return -1;
1466         }
1467
1468         
1469
1470         return 0;
1471 }
1472
1473
1474
1475 /* 
1476   sent to a node to make it take over an ip address
1477 */
1478 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1479                           uint32_t destnode, struct ctdb_public_ip *ip)
1480 {
1481         TDB_DATA data;
1482         struct ctdb_public_ipv4 ipv4;
1483         int ret;
1484         int32_t res;
1485
1486         if (ip->addr.sa.sa_family == AF_INET) {
1487                 ipv4.pnn = ip->pnn;
1488                 ipv4.sin = ip->addr.ip;
1489
1490                 data.dsize = sizeof(ipv4);
1491                 data.dptr  = (uint8_t *)&ipv4;
1492
1493                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1494                            NULL, &res, &timeout, NULL);
1495         } else {
1496                 data.dsize = sizeof(*ip);
1497                 data.dptr  = (uint8_t *)ip;
1498
1499                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1500                            NULL, &res, &timeout, NULL);
1501         }
1502
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1505                 return -1;
1506         }
1507
1508         return 0;       
1509 }
1510
1511
1512 /* 
1513   sent to a node to make it release an ip address
1514 */
1515 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1516                          uint32_t destnode, struct ctdb_public_ip *ip)
1517 {
1518         TDB_DATA data;
1519         struct ctdb_public_ipv4 ipv4;
1520         int ret;
1521         int32_t res;
1522
1523         if (ip->addr.sa.sa_family == AF_INET) {
1524                 ipv4.pnn = ip->pnn;
1525                 ipv4.sin = ip->addr.ip;
1526
1527                 data.dsize = sizeof(ipv4);
1528                 data.dptr  = (uint8_t *)&ipv4;
1529
1530                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1531                                    NULL, &res, &timeout, NULL);
1532         } else {
1533                 data.dsize = sizeof(*ip);
1534                 data.dptr  = (uint8_t *)ip;
1535
1536                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1537                                    NULL, &res, &timeout, NULL);
1538         }
1539
1540         if (ret != 0 || res != 0) {
1541                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1542                 return -1;
1543         }
1544
1545         return 0;       
1546 }
1547
1548
1549 /*
1550   get a tunable
1551  */
1552 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1553                           struct timeval timeout, 
1554                           uint32_t destnode,
1555                           const char *name, uint32_t *value)
1556 {
1557         struct ctdb_control_get_tunable *t;
1558         TDB_DATA data, outdata;
1559         int32_t res;
1560         int ret;
1561
1562         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1563         data.dptr  = talloc_size(ctdb, data.dsize);
1564         CTDB_NO_MEMORY(ctdb, data.dptr);
1565
1566         t = (struct ctdb_control_get_tunable *)data.dptr;
1567         t->length = strlen(name)+1;
1568         memcpy(t->name, name, t->length);
1569
1570         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1571                            &outdata, &res, &timeout, NULL);
1572         talloc_free(data.dptr);
1573         if (ret != 0 || res != 0) {
1574                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1575                 return -1;
1576         }
1577
1578         if (outdata.dsize != sizeof(uint32_t)) {
1579                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1580                 talloc_free(outdata.dptr);
1581                 return -1;
1582         }
1583         
1584         *value = *(uint32_t *)outdata.dptr;
1585         talloc_free(outdata.dptr);
1586
1587         return 0;
1588 }
1589
1590 /*
1591   set a tunable
1592  */
1593 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1594                           struct timeval timeout, 
1595                           uint32_t destnode,
1596                           const char *name, uint32_t value)
1597 {
1598         struct ctdb_control_set_tunable *t;
1599         TDB_DATA data;
1600         int32_t res;
1601         int ret;
1602
1603         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1604         data.dptr  = talloc_size(ctdb, data.dsize);
1605         CTDB_NO_MEMORY(ctdb, data.dptr);
1606
1607         t = (struct ctdb_control_set_tunable *)data.dptr;
1608         t->length = strlen(name)+1;
1609         memcpy(t->name, name, t->length);
1610         t->value = value;
1611
1612         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1613                            NULL, &res, &timeout, NULL);
1614         talloc_free(data.dptr);
1615         if (ret != 0 || res != 0) {
1616                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1617                 return -1;
1618         }
1619
1620         return 0;
1621 }
1622
1623 /*
1624   list tunables
1625  */
1626 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1627                             struct timeval timeout, 
1628                             uint32_t destnode,
1629                             TALLOC_CTX *mem_ctx,
1630                             const char ***list, uint32_t *count)
1631 {
1632         TDB_DATA outdata;
1633         int32_t res;
1634         int ret;
1635         struct ctdb_control_list_tunable *t;
1636         char *p, *s, *ptr;
1637
1638         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1639                            mem_ctx, &outdata, &res, &timeout, NULL);
1640         if (ret != 0 || res != 0) {
1641                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1642                 return -1;
1643         }
1644
1645         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1646         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1647             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1648                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1649                 talloc_free(outdata.dptr);
1650                 return -1;              
1651         }
1652         
1653         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1654         CTDB_NO_MEMORY(ctdb, p);
1655
1656         talloc_free(outdata.dptr);
1657         
1658         (*list) = NULL;
1659         (*count) = 0;
1660
1661         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1662                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1663                 CTDB_NO_MEMORY(ctdb, *list);
1664                 (*list)[*count] = talloc_strdup(*list, s);
1665                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1666                 (*count)++;
1667         }
1668
1669         talloc_free(p);
1670
1671         return 0;
1672 }
1673
1674
1675 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1676                                    struct timeval timeout, uint32_t destnode,
1677                                    TALLOC_CTX *mem_ctx,
1678                                    uint32_t flags,
1679                                    struct ctdb_all_public_ips **ips)
1680 {
1681         int ret;
1682         TDB_DATA outdata;
1683         int32_t res;
1684
1685         ret = ctdb_control(ctdb, destnode, 0, 
1686                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1687                            mem_ctx, &outdata, &res, &timeout, NULL);
1688         if (ret == 0 && res == -1) {
1689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1690                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1691         }
1692         if (ret != 0 || res != 0) {
1693           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1694                 return -1;
1695         }
1696
1697         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1698         talloc_free(outdata.dptr);
1699                     
1700         return 0;
1701 }
1702
1703 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1704                              struct timeval timeout, uint32_t destnode,
1705                              TALLOC_CTX *mem_ctx,
1706                              struct ctdb_all_public_ips **ips)
1707 {
1708         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1709                                               destnode, mem_ctx,
1710                                               0, ips);
1711 }
1712
1713 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1714                         struct timeval timeout, uint32_t destnode, 
1715                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1716 {
1717         int ret, i, len;
1718         TDB_DATA outdata;
1719         int32_t res;
1720         struct ctdb_all_public_ipsv4 *ipsv4;
1721
1722         ret = ctdb_control(ctdb, destnode, 0, 
1723                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1724                            mem_ctx, &outdata, &res, &timeout, NULL);
1725         if (ret != 0 || res != 0) {
1726                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1727                 return -1;
1728         }
1729
1730         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1731         len = offsetof(struct ctdb_all_public_ips, ips) +
1732                 ipsv4->num*sizeof(struct ctdb_public_ip);
1733         *ips = talloc_zero_size(mem_ctx, len);
1734         CTDB_NO_MEMORY(ctdb, *ips);
1735         (*ips)->num = ipsv4->num;
1736         for (i=0; i<ipsv4->num; i++) {
1737                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1738                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1739         }
1740
1741         talloc_free(outdata.dptr);
1742                     
1743         return 0;
1744 }
1745
1746 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1747                                  struct timeval timeout, uint32_t destnode,
1748                                  TALLOC_CTX *mem_ctx,
1749                                  const ctdb_sock_addr *addr,
1750                                  struct ctdb_control_public_ip_info **_info)
1751 {
1752         int ret;
1753         TDB_DATA indata;
1754         TDB_DATA outdata;
1755         int32_t res;
1756         struct ctdb_control_public_ip_info *info;
1757         uint32_t len;
1758         uint32_t i;
1759
1760         indata.dptr = discard_const_p(uint8_t, addr);
1761         indata.dsize = sizeof(*addr);
1762
1763         ret = ctdb_control(ctdb, destnode, 0,
1764                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1765                            mem_ctx, &outdata, &res, &timeout, NULL);
1766         if (ret != 0 || res != 0) {
1767                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1768                                 "failed ret:%d res:%d\n",
1769                                 ret, res));
1770                 return -1;
1771         }
1772
1773         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1774         if (len > outdata.dsize) {
1775                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1776                                 "returned invalid data with size %u > %u\n",
1777                                 (unsigned int)outdata.dsize,
1778                                 (unsigned int)len));
1779                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1780                 return -1;
1781         }
1782
1783         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1784         len += info->num*sizeof(struct ctdb_control_iface_info);
1785
1786         if (len > outdata.dsize) {
1787                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1788                                 "returned invalid data with size %u > %u\n",
1789                                 (unsigned int)outdata.dsize,
1790                                 (unsigned int)len));
1791                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1792                 return -1;
1793         }
1794
1795         /* make sure we null terminate the returned strings */
1796         for (i=0; i < info->num; i++) {
1797                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1798         }
1799
1800         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1801                                                                 outdata.dptr,
1802                                                                 outdata.dsize);
1803         talloc_free(outdata.dptr);
1804         if (*_info == NULL) {
1805                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1806                                 "talloc_memdup size %u failed\n",
1807                                 (unsigned int)outdata.dsize));
1808                 return -1;
1809         }
1810
1811         return 0;
1812 }
1813
1814 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1815                          struct timeval timeout, uint32_t destnode,
1816                          TALLOC_CTX *mem_ctx,
1817                          struct ctdb_control_get_ifaces **_ifaces)
1818 {
1819         int ret;
1820         TDB_DATA outdata;
1821         int32_t res;
1822         struct ctdb_control_get_ifaces *ifaces;
1823         uint32_t len;
1824         uint32_t i;
1825
1826         ret = ctdb_control(ctdb, destnode, 0,
1827                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1828                            mem_ctx, &outdata, &res, &timeout, NULL);
1829         if (ret != 0 || res != 0) {
1830                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1831                                 "failed ret:%d res:%d\n",
1832                                 ret, res));
1833                 return -1;
1834         }
1835
1836         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1837         if (len > outdata.dsize) {
1838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1839                                 "returned invalid data with size %u > %u\n",
1840                                 (unsigned int)outdata.dsize,
1841                                 (unsigned int)len));
1842                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1843                 return -1;
1844         }
1845
1846         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1847         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1848
1849         if (len > outdata.dsize) {
1850                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1851                                 "returned invalid data with size %u > %u\n",
1852                                 (unsigned int)outdata.dsize,
1853                                 (unsigned int)len));
1854                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1855                 return -1;
1856         }
1857
1858         /* make sure we null terminate the returned strings */
1859         for (i=0; i < ifaces->num; i++) {
1860                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1861         }
1862
1863         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1864                                                                   outdata.dptr,
1865                                                                   outdata.dsize);
1866         talloc_free(outdata.dptr);
1867         if (*_ifaces == NULL) {
1868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1869                                 "talloc_memdup size %u failed\n",
1870                                 (unsigned int)outdata.dsize));
1871                 return -1;
1872         }
1873
1874         return 0;
1875 }
1876
1877 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1878                              struct timeval timeout, uint32_t destnode,
1879                              TALLOC_CTX *mem_ctx,
1880                              const struct ctdb_control_iface_info *info)
1881 {
1882         int ret;
1883         TDB_DATA indata;
1884         int32_t res;
1885
1886         indata.dptr = discard_const_p(uint8_t, info);
1887         indata.dsize = sizeof(*info);
1888
1889         ret = ctdb_control(ctdb, destnode, 0,
1890                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1891                            mem_ctx, NULL, &res, &timeout, NULL);
1892         if (ret != 0 || res != 0) {
1893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1894                                 "failed ret:%d res:%d\n",
1895                                 ret, res));
1896                 return -1;
1897         }
1898
1899         return 0;
1900 }
1901
1902 /*
1903   set/clear the permanent disabled bit on a remote node
1904  */
1905 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1906                        uint32_t set, uint32_t clear)
1907 {
1908         int ret;
1909         TDB_DATA data;
1910         struct ctdb_node_map *nodemap=NULL;
1911         struct ctdb_node_flag_change c;
1912         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1913         uint32_t recmaster;
1914         uint32_t *nodes;
1915
1916
1917         /* find the recovery master */
1918         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1919         if (ret != 0) {
1920                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1921                 talloc_free(tmp_ctx);
1922                 return ret;
1923         }
1924
1925
1926         /* read the node flags from the recmaster */
1927         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1928         if (ret != 0) {
1929                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1930                 talloc_free(tmp_ctx);
1931                 return -1;
1932         }
1933         if (destnode >= nodemap->num) {
1934                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1935                 talloc_free(tmp_ctx);
1936                 return -1;
1937         }
1938
1939         c.pnn       = destnode;
1940         c.old_flags = nodemap->nodes[destnode].flags;
1941         c.new_flags = c.old_flags;
1942         c.new_flags |= set;
1943         c.new_flags &= ~clear;
1944
1945         data.dsize = sizeof(c);
1946         data.dptr = (unsigned char *)&c;
1947
1948         /* send the flags update to all connected nodes */
1949         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1950
1951         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1952                                         nodes, 0,
1953                                         timeout, false, data,
1954                                         NULL, NULL,
1955                                         NULL) != 0) {
1956                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1957
1958                 talloc_free(tmp_ctx);
1959                 return -1;
1960         }
1961
1962         talloc_free(tmp_ctx);
1963         return 0;
1964 }
1965
1966
1967 /*
1968   get all tunables
1969  */
1970 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1971                                struct timeval timeout, 
1972                                uint32_t destnode,
1973                                struct ctdb_tunable *tunables)
1974 {
1975         TDB_DATA outdata;
1976         int ret;
1977         int32_t res;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1980                            &outdata, &res, &timeout, NULL);
1981         if (ret != 0 || res != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1983                 return -1;
1984         }
1985
1986         if (outdata.dsize != sizeof(*tunables)) {
1987                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1988                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1989                 return -1;              
1990         }
1991
1992         *tunables = *(struct ctdb_tunable *)outdata.dptr;
1993         talloc_free(outdata.dptr);
1994         return 0;
1995 }
1996
1997 /*
1998   add a public address to a node
1999  */
2000 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2001                       struct timeval timeout, 
2002                       uint32_t destnode,
2003                       struct ctdb_control_ip_iface *pub)
2004 {
2005         TDB_DATA data;
2006         int32_t res;
2007         int ret;
2008
2009         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2010         data.dptr  = (unsigned char *)pub;
2011
2012         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2013                            NULL, &res, &timeout, NULL);
2014         if (ret != 0 || res != 0) {
2015                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2016                 return -1;
2017         }
2018
2019         return 0;
2020 }
2021
2022 /*
2023   delete a public address from a node
2024  */
2025 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2026                       struct timeval timeout, 
2027                       uint32_t destnode,
2028                       struct ctdb_control_ip_iface *pub)
2029 {
2030         TDB_DATA data;
2031         int32_t res;
2032         int ret;
2033
2034         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2035         data.dptr  = (unsigned char *)pub;
2036
2037         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2038                            NULL, &res, &timeout, NULL);
2039         if (ret != 0 || res != 0) {
2040                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2041                 return -1;
2042         }
2043
2044         return 0;
2045 }
2046
2047 /*
2048   kill a tcp connection
2049  */
2050 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2051                       struct timeval timeout, 
2052                       uint32_t destnode,
2053                       struct ctdb_control_killtcp *killtcp)
2054 {
2055         TDB_DATA data;
2056         int32_t res;
2057         int ret;
2058
2059         data.dsize = sizeof(struct ctdb_control_killtcp);
2060         data.dptr  = (unsigned char *)killtcp;
2061
2062         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2063                            NULL, &res, &timeout, NULL);
2064         if (ret != 0 || res != 0) {
2065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2066                 return -1;
2067         }
2068
2069         return 0;
2070 }
2071
2072 /*
2073   send a gratious arp
2074  */
2075 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2076                       struct timeval timeout, 
2077                       uint32_t destnode,
2078                       ctdb_sock_addr *addr,
2079                       const char *ifname)
2080 {
2081         TDB_DATA data;
2082         int32_t res;
2083         int ret, len;
2084         struct ctdb_control_gratious_arp *gratious_arp;
2085         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2086
2087
2088         len = strlen(ifname)+1;
2089         gratious_arp = talloc_size(tmp_ctx, 
2090                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2091         CTDB_NO_MEMORY(ctdb, gratious_arp);
2092
2093         gratious_arp->addr = *addr;
2094         gratious_arp->len = len;
2095         memcpy(&gratious_arp->iface[0], ifname, len);
2096
2097
2098         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2099         data.dptr  = (unsigned char *)gratious_arp;
2100
2101         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2102                            NULL, &res, &timeout, NULL);
2103         if (ret != 0 || res != 0) {
2104                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2105                 talloc_free(tmp_ctx);
2106                 return -1;
2107         }
2108
2109         talloc_free(tmp_ctx);
2110         return 0;
2111 }
2112
2113 /*
2114   get a list of all tcp tickles that a node knows about for a particular vnn
2115  */
2116 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2117                               struct timeval timeout, uint32_t destnode, 
2118                               TALLOC_CTX *mem_ctx, 
2119                               ctdb_sock_addr *addr,
2120                               struct ctdb_control_tcp_tickle_list **list)
2121 {
2122         int ret;
2123         TDB_DATA data, outdata;
2124         int32_t status;
2125
2126         data.dptr = (uint8_t*)addr;
2127         data.dsize = sizeof(ctdb_sock_addr);
2128
2129         ret = ctdb_control(ctdb, destnode, 0, 
2130                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2131                            mem_ctx, &outdata, &status, NULL, NULL);
2132         if (ret != 0 || status != 0) {
2133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2134                 return -1;
2135         }
2136
2137         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2138
2139         return status;
2140 }
2141
2142 /*
2143   register a server id
2144  */
2145 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2146                       struct timeval timeout, 
2147                       struct ctdb_server_id *id)
2148 {
2149         TDB_DATA data;
2150         int32_t res;
2151         int ret;
2152
2153         data.dsize = sizeof(struct ctdb_server_id);
2154         data.dptr  = (unsigned char *)id;
2155
2156         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2157                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2158                         0, data, NULL,
2159                         NULL, &res, &timeout, NULL);
2160         if (ret != 0 || res != 0) {
2161                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2162                 return -1;
2163         }
2164
2165         return 0;
2166 }
2167
2168 /*
2169   unregister a server id
2170  */
2171 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2172                       struct timeval timeout, 
2173                       struct ctdb_server_id *id)
2174 {
2175         TDB_DATA data;
2176         int32_t res;
2177         int ret;
2178
2179         data.dsize = sizeof(struct ctdb_server_id);
2180         data.dptr  = (unsigned char *)id;
2181
2182         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2183                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2184                         0, data, NULL,
2185                         NULL, &res, &timeout, NULL);
2186         if (ret != 0 || res != 0) {
2187                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2188                 return -1;
2189         }
2190
2191         return 0;
2192 }
2193
2194
2195 /*
2196   check if a server id exists
2197
2198   if a server id does exist, return *status == 1, otherwise *status == 0
2199  */
2200 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2201                       struct timeval timeout, 
2202                       uint32_t destnode,
2203                       struct ctdb_server_id *id,
2204                       uint32_t *status)
2205 {
2206         TDB_DATA data;
2207         int32_t res;
2208         int ret;
2209
2210         data.dsize = sizeof(struct ctdb_server_id);
2211         data.dptr  = (unsigned char *)id;
2212
2213         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2214                         0, data, NULL,
2215                         NULL, &res, &timeout, NULL);
2216         if (ret != 0) {
2217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2218                 return -1;
2219         }
2220
2221         if (res) {
2222                 *status = 1;
2223         } else {
2224                 *status = 0;
2225         }
2226
2227         return 0;
2228 }
2229
2230 /*
2231    get the list of server ids that are registered on a node
2232 */
2233 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2234                 TALLOC_CTX *mem_ctx,
2235                 struct timeval timeout, uint32_t destnode, 
2236                 struct ctdb_server_id_list **svid_list)
2237 {
2238         int ret;
2239         TDB_DATA outdata;
2240         int32_t res;
2241
2242         ret = ctdb_control(ctdb, destnode, 0, 
2243                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2244                            mem_ctx, &outdata, &res, &timeout, NULL);
2245         if (ret != 0 || res != 0) {
2246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2247                 return -1;
2248         }
2249
2250         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2251                     
2252         return 0;
2253 }
2254
2255 /*
2256   set some ctdb flags
2257 */
2258 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2259 {
2260         ctdb->flags |= flags;
2261 }
2262
2263
2264 /*
2265   return the pnn of this node
2266 */
2267 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2268 {
2269         return ctdb->pnn;
2270 }
2271
2272
2273 /*
2274   get the uptime of a remote node
2275  */
2276 struct ctdb_client_control_state *
2277 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2278 {
2279         struct ctdb_client_control_state *state;
2280
2281         state = ctdb_control_send(ctdb, destnode, 0, 
2282                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2283                            mem_ctx, NULL);
2284         if (state != NULL && !timeval_is_zero(&timeout)) {
2285                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2286         }
2287
2288         return state;
2289 }
2290
2291 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2292 {
2293         int ret;
2294         int32_t res;
2295         TDB_DATA outdata;
2296
2297         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2298         if (ret != 0 || res != 0) {
2299                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2300                 return -1;
2301         }
2302
2303         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2304
2305         return 0;
2306 }
2307
2308 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2309 {
2310         struct ctdb_client_control_state *state;
2311
2312         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2313         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2314 }
2315
2316 /*
2317   send a control to execute the "recovered" event script on a node
2318  */
2319 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2320 {
2321         int ret;
2322         int32_t status;
2323
2324         ret = ctdb_control(ctdb, destnode, 0, 
2325                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2326                            NULL, NULL, &status, &timeout, NULL);
2327         if (ret != 0 || status != 0) {
2328                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2329                 return -1;
2330         }
2331
2332         return 0;
2333 }
2334
2335 /* 
2336   callback for the async helpers used when sending the same control
2337   to multiple nodes in parallell.
2338 */
2339 static void async_callback(struct ctdb_client_control_state *state)
2340 {
2341         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2342         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2343         int ret;
2344         TDB_DATA outdata;
2345         int32_t res;
2346         uint32_t destnode = state->c->hdr.destnode;
2347
2348         /* one more node has responded with recmode data */
2349         data->count--;
2350
2351         /* if we failed to push the db, then return an error and let
2352            the main loop try again.
2353         */
2354         if (state->state != CTDB_CONTROL_DONE) {
2355                 if ( !data->dont_log_errors) {
2356                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2357                 }
2358                 data->fail_count++;
2359                 if (data->fail_callback) {
2360                         data->fail_callback(ctdb, destnode, res, outdata,
2361                                         data->callback_data);
2362                 }
2363                 return;
2364         }
2365         
2366         state->async.fn = NULL;
2367
2368         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2369         if ((ret != 0) || (res != 0)) {
2370                 if ( !data->dont_log_errors) {
2371                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2372                 }
2373                 data->fail_count++;
2374                 if (data->fail_callback) {
2375                         data->fail_callback(ctdb, destnode, res, outdata,
2376                                         data->callback_data);
2377                 }
2378         }
2379         if ((ret == 0) && (data->callback != NULL)) {
2380                 data->callback(ctdb, destnode, res, outdata,
2381                                         data->callback_data);
2382         }
2383 }
2384
2385
2386 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2387 {
2388         /* set up the callback functions */
2389         state->async.fn = async_callback;
2390         state->async.private_data = data;
2391         
2392         /* one more control to wait for to complete */
2393         data->count++;
2394 }
2395
2396
2397 /* wait for up to the maximum number of seconds allowed
2398    or until all nodes we expect a response from has replied
2399 */
2400 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2401 {
2402         while (data->count > 0) {
2403                 event_loop_once(ctdb->ev);
2404         }
2405         if (data->fail_count != 0) {
2406                 if (!data->dont_log_errors) {
2407                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2408                                  data->fail_count));
2409                 }
2410                 return -1;
2411         }
2412         return 0;
2413 }
2414
2415
2416 /* 
2417    perform a simple control on the listed nodes
2418    The control cannot return data
2419  */
2420 int ctdb_client_async_control(struct ctdb_context *ctdb,
2421                                 enum ctdb_controls opcode,
2422                                 uint32_t *nodes,
2423                                 uint64_t srvid,
2424                                 struct timeval timeout,
2425                                 bool dont_log_errors,
2426                                 TDB_DATA data,
2427                                 client_async_callback client_callback,
2428                                 client_async_callback fail_callback,
2429                                 void *callback_data)
2430 {
2431         struct client_async_data *async_data;
2432         struct ctdb_client_control_state *state;
2433         int j, num_nodes;
2434
2435         async_data = talloc_zero(ctdb, struct client_async_data);
2436         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2437         async_data->dont_log_errors = dont_log_errors;
2438         async_data->callback = client_callback;
2439         async_data->fail_callback = fail_callback;
2440         async_data->callback_data = callback_data;
2441         async_data->opcode        = opcode;
2442
2443         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2444
2445         /* loop over all nodes and send an async control to each of them */
2446         for (j=0; j<num_nodes; j++) {
2447                 uint32_t pnn = nodes[j];
2448
2449                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2450                                           0, data, async_data, NULL);
2451                 if (state == NULL) {
2452                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2453                         talloc_free(async_data);
2454                         return -1;
2455                 }
2456                 if (!timeval_is_zero(&timeout)) {
2457                         event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2458                 }
2459                 
2460                 ctdb_client_async_add(async_data, state);
2461         }
2462
2463         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2464                 talloc_free(async_data);
2465                 return -1;
2466         }
2467
2468         talloc_free(async_data);
2469         return 0;
2470 }
2471
2472 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2473                                 struct ctdb_vnn_map *vnn_map,
2474                                 TALLOC_CTX *mem_ctx,
2475                                 bool include_self)
2476 {
2477         int i, j, num_nodes;
2478         uint32_t *nodes;
2479
2480         for (i=num_nodes=0;i<vnn_map->size;i++) {
2481                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2482                         continue;
2483                 }
2484                 num_nodes++;
2485         } 
2486
2487         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2488         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2489
2490         for (i=j=0;i<vnn_map->size;i++) {
2491                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2492                         continue;
2493                 }
2494                 nodes[j++] = vnn_map->map[i];
2495         } 
2496
2497         return nodes;
2498 }
2499
2500 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2501                                 struct ctdb_node_map *node_map,
2502                                 TALLOC_CTX *mem_ctx,
2503                                 bool include_self)
2504 {
2505         int i, j, num_nodes;
2506         uint32_t *nodes;
2507
2508         for (i=num_nodes=0;i<node_map->num;i++) {
2509                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2510                         continue;
2511                 }
2512                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2513                         continue;
2514                 }
2515                 num_nodes++;
2516         } 
2517
2518         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2519         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2520
2521         for (i=j=0;i<node_map->num;i++) {
2522                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2523                         continue;
2524                 }
2525                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2526                         continue;
2527                 }
2528                 nodes[j++] = node_map->nodes[i].pnn;
2529         } 
2530
2531         return nodes;
2532 }
2533
2534 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2535                                 struct ctdb_node_map *node_map,
2536                                 TALLOC_CTX *mem_ctx,
2537                                 uint32_t pnn)
2538 {
2539         int i, j, num_nodes;
2540         uint32_t *nodes;
2541
2542         for (i=num_nodes=0;i<node_map->num;i++) {
2543                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2544                         continue;
2545                 }
2546                 if (node_map->nodes[i].pnn == pnn) {
2547                         continue;
2548                 }
2549                 num_nodes++;
2550         } 
2551
2552         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2553         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2554
2555         for (i=j=0;i<node_map->num;i++) {
2556                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2557                         continue;
2558                 }
2559                 if (node_map->nodes[i].pnn == pnn) {
2560                         continue;
2561                 }
2562                 nodes[j++] = node_map->nodes[i].pnn;
2563         } 
2564
2565         return nodes;
2566 }
2567
2568 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2569                                 struct ctdb_node_map *node_map,
2570                                 TALLOC_CTX *mem_ctx,
2571                                 bool include_self)
2572 {
2573         int i, j, num_nodes;
2574         uint32_t *nodes;
2575
2576         for (i=num_nodes=0;i<node_map->num;i++) {
2577                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2578                         continue;
2579                 }
2580                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2581                         continue;
2582                 }
2583                 num_nodes++;
2584         } 
2585
2586         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2587         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2588
2589         for (i=j=0;i<node_map->num;i++) {
2590                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2591                         continue;
2592                 }
2593                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2594                         continue;
2595                 }
2596                 nodes[j++] = node_map->nodes[i].pnn;
2597         } 
2598
2599         return nodes;
2600 }
2601
2602 /* 
2603   this is used to test if a pnn lock exists and if it exists will return
2604   the number of connections that pnn has reported or -1 if that recovery
2605   daemon is not running.
2606 */
2607 int
2608 ctdb_read_pnn_lock(int fd, int32_t pnn)
2609 {
2610         struct flock lock;
2611         char c;
2612
2613         lock.l_type = F_WRLCK;
2614         lock.l_whence = SEEK_SET;
2615         lock.l_start = pnn;
2616         lock.l_len = 1;
2617         lock.l_pid = 0;
2618
2619         if (fcntl(fd, F_GETLK, &lock) != 0) {
2620                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2621                 return -1;
2622         }
2623
2624         if (lock.l_type == F_UNLCK) {
2625                 return -1;
2626         }
2627
2628         if (pread(fd, &c, 1, pnn) == -1) {
2629                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2630                 return -1;
2631         }
2632
2633         return c;
2634 }
2635
2636 /*
2637   get capabilities of a remote node
2638  */
2639 struct ctdb_client_control_state *
2640 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2641 {
2642         struct ctdb_client_control_state *state;
2643
2644         state = ctdb_control_send(ctdb, destnode, 0, 
2645                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2646                            mem_ctx, NULL);
2647         if (state != NULL && !timeval_is_zero(&timeout)) {
2648                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2649         }
2650
2651         return state;
2652 }
2653
2654 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2655 {
2656         int ret;
2657         int32_t res;
2658         TDB_DATA outdata;
2659
2660         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2661         if ( (ret != 0) || (res != 0) ) {
2662                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2663                 return -1;
2664         }
2665
2666         if (capabilities) {
2667                 *capabilities = *((uint32_t *)outdata.dptr);
2668         }
2669
2670         return 0;
2671 }
2672
2673 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2674 {
2675         struct ctdb_client_control_state *state;
2676         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2677         int ret;
2678
2679         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2680         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2681         talloc_free(tmp_ctx);
2682         return ret;
2683 }
2684
2685 /**
2686  * check whether a transaction is active on a given db on a given node
2687  */
2688 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2689                                      uint32_t destnode,
2690                                      uint32_t db_id)
2691 {
2692         int32_t status;
2693         int ret;
2694         TDB_DATA indata;
2695
2696         indata.dptr = (uint8_t *)&db_id;
2697         indata.dsize = sizeof(db_id);
2698
2699         ret = ctdb_control(ctdb, destnode, 0,
2700                            CTDB_CONTROL_TRANS2_ACTIVE,
2701                            0, indata, NULL, NULL, &status,
2702                            NULL, NULL);
2703
2704         if (ret != 0) {
2705                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2706                 return -1;
2707         }
2708
2709         return status;
2710 }
2711
2712
2713 struct ctdb_transaction_handle {
2714         struct ctdb_db_context *ctdb_db;
2715         bool in_replay;
2716         /*
2717          * we store the reads and writes done under a transaction:
2718          * - one list stores both reads and writes (m_all),
2719          * - the other just writes (m_write)
2720          */
2721         struct ctdb_marshall_buffer *m_all;
2722         struct ctdb_marshall_buffer *m_write;
2723 };
2724
2725 /* start a transaction on a database */
2726 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2727 {
2728         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2729         return 0;
2730 }
2731
2732 /* start a transaction on a database */
2733 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2734 {
2735         struct ctdb_record_handle *rh;
2736         TDB_DATA key;
2737         TDB_DATA data;
2738         struct ctdb_ltdb_header header;
2739         TALLOC_CTX *tmp_ctx;
2740         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2741         int ret;
2742         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2743         pid_t pid;
2744         int32_t status;
2745
2746         key.dptr = discard_const(keyname);
2747         key.dsize = strlen(keyname);
2748
2749         if (!ctdb_db->persistent) {
2750                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2751                 return -1;
2752         }
2753
2754 again:
2755         tmp_ctx = talloc_new(h);
2756
2757         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2758         if (rh == NULL) {
2759                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2760                 talloc_free(tmp_ctx);
2761                 return -1;
2762         }
2763
2764         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2765                                               CTDB_CURRENT_NODE,
2766                                               ctdb_db->db_id);
2767         if (status == 1) {
2768                 unsigned long int usec = (1000 + random()) % 100000;
2769                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2770                                     "on db_id[0x%08x]. waiting for %lu "
2771                                     "microseconds\n",
2772                                     ctdb_db->db_id, usec));
2773                 talloc_free(tmp_ctx);
2774                 usleep(usec);
2775                 goto again;
2776         }
2777
2778         /*
2779          * store the pid in the database:
2780          * it is not enough that the node is dmaster...
2781          */
2782         pid = getpid();
2783         data.dptr = (unsigned char *)&pid;
2784         data.dsize = sizeof(pid_t);
2785         rh->header.rsn++;
2786         rh->header.dmaster = ctdb_db->ctdb->pnn;
2787         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2788         if (ret != 0) {
2789                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2790                                   "transaction record\n"));
2791                 talloc_free(tmp_ctx);
2792                 return -1;
2793         }
2794
2795         talloc_free(rh);
2796
2797         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2798         if (ret != 0) {
2799                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2800                 talloc_free(tmp_ctx);
2801                 return -1;
2802         }
2803
2804         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2805         if (ret != 0) {
2806                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2807                                  "lock record inside transaction\n"));
2808                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2809                 talloc_free(tmp_ctx);
2810                 goto again;
2811         }
2812
2813         if (header.dmaster != ctdb_db->ctdb->pnn) {
2814                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2815                                    "transaction lock record\n"));
2816                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2817                 talloc_free(tmp_ctx);
2818                 goto again;
2819         }
2820
2821         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2822                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2823                                     "the transaction lock record\n"));
2824                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2825                 talloc_free(tmp_ctx);
2826                 goto again;
2827         }
2828
2829         talloc_free(tmp_ctx);
2830
2831         return 0;
2832 }
2833
2834
2835 /* start a transaction on a database */
2836 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2837                                                        TALLOC_CTX *mem_ctx)
2838 {
2839         struct ctdb_transaction_handle *h;
2840         int ret;
2841
2842         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2843         if (h == NULL) {
2844                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2845                 return NULL;
2846         }
2847
2848         h->ctdb_db = ctdb_db;
2849
2850         ret = ctdb_transaction_fetch_start(h);
2851         if (ret != 0) {
2852                 talloc_free(h);
2853                 return NULL;
2854         }
2855
2856         talloc_set_destructor(h, ctdb_transaction_destructor);
2857
2858         return h;
2859 }
2860
2861
2862
2863 /*
2864   fetch a record inside a transaction
2865  */
2866 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2867                            TALLOC_CTX *mem_ctx, 
2868                            TDB_DATA key, TDB_DATA *data)
2869 {
2870         struct ctdb_ltdb_header header;
2871         int ret;
2872
2873         ZERO_STRUCT(header);
2874
2875         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2876         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2877                 /* record doesn't exist yet */
2878                 *data = tdb_null;
2879                 ret = 0;
2880         }
2881         
2882         if (ret != 0) {
2883                 return ret;
2884         }
2885
2886         if (!h->in_replay) {
2887                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2888                 if (h->m_all == NULL) {
2889                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2890                         return -1;
2891                 }
2892         }
2893
2894         return 0;
2895 }
2896
2897 /*
2898   stores a record inside a transaction
2899  */
2900 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2901                            TDB_DATA key, TDB_DATA data)
2902 {
2903         TALLOC_CTX *tmp_ctx = talloc_new(h);
2904         struct ctdb_ltdb_header header;
2905         TDB_DATA olddata;
2906         int ret;
2907
2908         ZERO_STRUCT(header);
2909
2910         /* we need the header so we can update the RSN */
2911         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2912         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2913                 /* the record doesn't exist - create one with us as dmaster.
2914                    This is only safe because we are in a transaction and this
2915                    is a persistent database */
2916                 ZERO_STRUCT(header);
2917         } else if (ret != 0) {
2918                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2919                 talloc_free(tmp_ctx);
2920                 return ret;
2921         }
2922
2923         if (data.dsize == olddata.dsize &&
2924             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2925                 /* save writing the same data */
2926                 talloc_free(tmp_ctx);
2927                 return 0;
2928         }
2929
2930         header.dmaster = h->ctdb_db->ctdb->pnn;
2931         header.rsn++;
2932
2933         if (!h->in_replay) {
2934                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2935                 if (h->m_all == NULL) {
2936                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2937                         talloc_free(tmp_ctx);
2938                         return -1;
2939                 }
2940         }               
2941
2942         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2943         if (h->m_write == NULL) {
2944                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2945                 talloc_free(tmp_ctx);
2946                 return -1;
2947         }
2948         
2949         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2950
2951         talloc_free(tmp_ctx);
2952         
2953         return ret;
2954 }
2955
2956 /*
2957   replay a transaction
2958  */
2959 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2960 {
2961         int ret, i;
2962         struct ctdb_rec_data *rec = NULL;
2963
2964         h->in_replay = true;
2965         talloc_free(h->m_write);
2966         h->m_write = NULL;
2967
2968         ret = ctdb_transaction_fetch_start(h);
2969         if (ret != 0) {
2970                 return ret;
2971         }
2972
2973         for (i=0;i<h->m_all->count;i++) {
2974                 TDB_DATA key, data;
2975
2976                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2977                 if (rec == NULL) {
2978                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2979                         goto failed;
2980                 }
2981
2982                 if (rec->reqid == 0) {
2983                         /* its a store */
2984                         if (ctdb_transaction_store(h, key, data) != 0) {
2985                                 goto failed;
2986                         }
2987                 } else {
2988                         TDB_DATA data2;
2989                         TALLOC_CTX *tmp_ctx = talloc_new(h);
2990
2991                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
2992                                 talloc_free(tmp_ctx);
2993                                 goto failed;
2994                         }
2995                         if (data2.dsize != data.dsize ||
2996                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
2997                                 /* the record has changed on us - we have to give up */
2998                                 talloc_free(tmp_ctx);
2999                                 goto failed;
3000                         }
3001                         talloc_free(tmp_ctx);
3002                 }
3003         }
3004         
3005         return 0;
3006
3007 failed:
3008         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3009         return -1;
3010 }
3011
3012
3013 /*
3014   commit a transaction
3015  */
3016 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3017 {
3018         int ret, retries=0;
3019         int32_t status;
3020         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3021         struct timeval timeout;
3022         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3023
3024         talloc_set_destructor(h, NULL);
3025
3026         /* our commit strategy is quite complex.
3027
3028            - we first try to commit the changes to all other nodes
3029
3030            - if that works, then we commit locally and we are done
3031
3032            - if a commit on another node fails, then we need to cancel
3033              the transaction, then restart the transaction (thus
3034              opening a window of time for a pending recovery to
3035              complete), then replay the transaction, checking all the
3036              reads and writes (checking that reads give the same data,
3037              and writes succeed). Then we retry the transaction to the
3038              other nodes
3039         */
3040
3041 again:
3042         if (h->m_write == NULL) {
3043                 /* no changes were made */
3044                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3045                 talloc_free(h);
3046                 return 0;
3047         }
3048
3049         /* tell ctdbd to commit to the other nodes */
3050         timeout = timeval_current_ofs(1, 0);
3051         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3052                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3053                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3054                            &timeout, NULL);
3055         if (ret != 0 || status != 0) {
3056                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3057                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3058                                      ", retrying after 1 second...\n",
3059                                      (retries==0)?"":"retry "));
3060                 sleep(1);
3061
3062                 if (ret != 0) {
3063                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3064                 } else {
3065                         /* work out what error code we will give if we 
3066                            have to fail the operation */
3067                         switch ((enum ctdb_trans2_commit_error)status) {
3068                         case CTDB_TRANS2_COMMIT_SUCCESS:
3069                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3070                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3071                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3072                                 break;
3073                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3074                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3075                                 break;
3076                         }
3077                 }
3078
3079                 if (++retries == 100) {
3080                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3081                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3082                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3083                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3084                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3085                         talloc_free(h);
3086                         return -1;
3087                 }               
3088
3089                 if (ctdb_replay_transaction(h) != 0) {
3090                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3091                                           "transaction on db 0x%08x, "
3092                                           "failure control =%u\n",
3093                                           h->ctdb_db->db_id,
3094                                           (unsigned)failure_control));
3095                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3096                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3097                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3098                         talloc_free(h);
3099                         return -1;
3100                 }
3101                 goto again;
3102         } else {
3103                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3104         }
3105
3106         /* do the real commit locally */
3107         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3108         if (ret != 0) {
3109                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3110                                   "on db id 0x%08x locally, "
3111                                   "failure_control=%u\n",
3112                                   h->ctdb_db->db_id,
3113                                   (unsigned)failure_control));
3114                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3115                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3116                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3117                 talloc_free(h);
3118                 return ret;
3119         }
3120
3121         /* tell ctdbd that we are finished with our local commit */
3122         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3123                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3124                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3125         talloc_free(h);
3126         return 0;
3127 }
3128
3129 /*
3130   recovery daemon ping to main daemon
3131  */
3132 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3133 {
3134         int ret;
3135         int32_t res;
3136
3137         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3138                            ctdb, NULL, &res, NULL, NULL);
3139         if (ret != 0 || res != 0) {
3140                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3141                 return -1;
3142         }
3143
3144         return 0;
3145 }
3146
3147 /* when forking the main daemon and the child process needs to connect back
3148  * to the daemon as a client process, this function can be used to change
3149  * the ctdb context from daemon into client mode
3150  */
3151 int switch_from_server_to_client(struct ctdb_context *ctdb)
3152 {
3153         int ret;
3154
3155         /* shutdown the transport */
3156         if (ctdb->methods) {
3157                 ctdb->methods->shutdown(ctdb);
3158         }
3159
3160         /* get a new event context */
3161         talloc_free(ctdb->ev);
3162         ctdb->ev = event_context_init(ctdb);
3163
3164         close(ctdb->daemon.sd);
3165         ctdb->daemon.sd = -1;
3166
3167         /* initialise ctdb */
3168         ret = ctdb_socket_connect(ctdb);
3169         if (ret != 0) {
3170                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3171                 return -1;
3172         }
3173
3174          return 0;
3175 }
3176
3177 /*
3178   get the status of running the monitor eventscripts: NULL means never run.
3179  */
3180 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3181                 struct timeval timeout, uint32_t destnode, 
3182                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3183                 struct ctdb_scripts_wire **script_status)
3184 {
3185         int ret;
3186         TDB_DATA outdata, indata;
3187         int32_t res;
3188         uint32_t uinttype = type;
3189
3190         indata.dptr = (uint8_t *)&uinttype;
3191         indata.dsize = sizeof(uinttype);
3192
3193         ret = ctdb_control(ctdb, destnode, 0, 
3194                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3195                            mem_ctx, &outdata, &res, &timeout, NULL);
3196         if (ret != 0 || res != 0) {
3197                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3198                 return -1;
3199         }
3200
3201         if (outdata.dsize == 0) {
3202                 *script_status = NULL;
3203         } else {
3204                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3205                 talloc_free(outdata.dptr);
3206         }
3207                     
3208         return 0;
3209 }
3210
3211 /*
3212   tell the main daemon how long it took to lock the reclock file
3213  */
3214 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3215 {
3216         int ret;
3217         int32_t res;
3218         TDB_DATA data;
3219
3220         data.dptr = (uint8_t *)&latency;
3221         data.dsize = sizeof(latency);
3222
3223         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3224                            ctdb, NULL, &res, NULL, NULL);
3225         if (ret != 0 || res != 0) {
3226                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3227                 return -1;
3228         }
3229
3230         return 0;
3231 }
3232
3233 /*
3234   get the name of the reclock file
3235  */
3236 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3237                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3238                          const char **name)
3239 {
3240         int ret;
3241         int32_t res;
3242         TDB_DATA data;
3243
3244         ret = ctdb_control(ctdb, destnode, 0, 
3245                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3246                            mem_ctx, &data, &res, &timeout, NULL);
3247         if (ret != 0 || res != 0) {
3248                 return -1;
3249         }
3250
3251         if (data.dsize == 0) {
3252                 *name = NULL;
3253         } else {
3254                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3255         }
3256         talloc_free(data.dptr);
3257
3258         return 0;
3259 }
3260
3261 /*
3262   set the reclock filename for a node
3263  */
3264 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3265 {
3266         int ret;
3267         TDB_DATA data;
3268         int32_t res;
3269
3270         if (reclock == NULL) {
3271                 data.dsize = 0;
3272                 data.dptr  = NULL;
3273         } else {
3274                 data.dsize = strlen(reclock) + 1;
3275                 data.dptr  = discard_const(reclock);
3276         }
3277
3278         ret = ctdb_control(ctdb, destnode, 0, 
3279                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3280                            NULL, NULL, &res, &timeout, NULL);
3281         if (ret != 0 || res != 0) {
3282                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3283                 return -1;
3284         }
3285
3286         return 0;
3287 }
3288
3289 /*
3290   stop a node
3291  */
3292 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3293 {
3294         int ret;
3295         int32_t res;
3296
3297         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3298                            ctdb, NULL, &res, &timeout, NULL);
3299         if (ret != 0 || res != 0) {
3300                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3301                 return -1;
3302         }
3303
3304         return 0;
3305 }
3306
3307 /*
3308   continue a node
3309  */
3310 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3311 {
3312         int ret;
3313
3314         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3315                            ctdb, NULL, NULL, &timeout, NULL);
3316         if (ret != 0) {
3317                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3318                 return -1;
3319         }
3320
3321         return 0;
3322 }
3323
3324 /*
3325   set the natgw state for a node
3326  */
3327 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3328 {
3329         int ret;
3330         TDB_DATA data;
3331         int32_t res;
3332
3333         data.dsize = sizeof(natgwstate);
3334         data.dptr  = (uint8_t *)&natgwstate;
3335
3336         ret = ctdb_control(ctdb, destnode, 0, 
3337                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3338                            NULL, NULL, &res, &timeout, NULL);
3339         if (ret != 0 || res != 0) {
3340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3341                 return -1;
3342         }
3343
3344         return 0;
3345 }
3346
3347 /*
3348   set the lmaster role for a node
3349  */
3350 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3351 {
3352         int ret;
3353         TDB_DATA data;
3354         int32_t res;
3355
3356         data.dsize = sizeof(lmasterrole);
3357         data.dptr  = (uint8_t *)&lmasterrole;
3358
3359         ret = ctdb_control(ctdb, destnode, 0, 
3360                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3361                            NULL, NULL, &res, &timeout, NULL);
3362         if (ret != 0 || res != 0) {
3363                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3364                 return -1;
3365         }
3366
3367         return 0;
3368 }
3369
3370 /*
3371   set the recmaster role for a node
3372  */
3373 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3374 {
3375         int ret;
3376         TDB_DATA data;
3377         int32_t res;
3378
3379         data.dsize = sizeof(recmasterrole);
3380         data.dptr  = (uint8_t *)&recmasterrole;
3381
3382         ret = ctdb_control(ctdb, destnode, 0, 
3383                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3384                            NULL, NULL, &res, &timeout, NULL);
3385         if (ret != 0 || res != 0) {
3386                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3387                 return -1;
3388         }
3389
3390         return 0;
3391 }
3392
3393 /* enable an eventscript
3394  */
3395 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3396 {
3397         int ret;
3398         TDB_DATA data;
3399         int32_t res;
3400
3401         data.dsize = strlen(script) + 1;
3402         data.dptr  = discard_const(script);
3403
3404         ret = ctdb_control(ctdb, destnode, 0, 
3405                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3406                            NULL, NULL, &res, &timeout, NULL);
3407         if (ret != 0 || res != 0) {
3408                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3409                 return -1;
3410         }
3411
3412         return 0;
3413 }
3414
3415 /* disable an eventscript
3416  */
3417 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3418 {
3419         int ret;
3420         TDB_DATA data;
3421         int32_t res;
3422
3423         data.dsize = strlen(script) + 1;
3424         data.dptr  = discard_const(script);
3425
3426         ret = ctdb_control(ctdb, destnode, 0, 
3427                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3428                            NULL, NULL, &res, &timeout, NULL);
3429         if (ret != 0 || res != 0) {
3430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3431                 return -1;
3432         }
3433
3434         return 0;
3435 }
3436
3437
3438 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3439 {
3440         int ret;
3441         TDB_DATA data;
3442         int32_t res;
3443
3444         data.dsize = sizeof(*bantime);
3445         data.dptr  = (uint8_t *)bantime;
3446
3447         ret = ctdb_control(ctdb, destnode, 0, 
3448                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3449                            NULL, NULL, &res, &timeout, NULL);
3450         if (ret != 0 || res != 0) {
3451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3452                 return -1;
3453         }
3454
3455         return 0;
3456 }
3457
3458
3459 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3460 {
3461         int ret;
3462         TDB_DATA outdata;
3463         int32_t res;
3464         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3465
3466         ret = ctdb_control(ctdb, destnode, 0, 
3467                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3468                            tmp_ctx, &outdata, &res, &timeout, NULL);
3469         if (ret != 0 || res != 0) {
3470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3471                 talloc_free(tmp_ctx);
3472                 return -1;
3473         }
3474
3475         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3476         talloc_free(tmp_ctx);
3477
3478         return 0;
3479 }
3480
3481
3482 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3483 {
3484         int ret;
3485         int32_t res;
3486         TDB_DATA data;
3487         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3488
3489         data.dptr = (uint8_t*)db_prio;
3490         data.dsize = sizeof(*db_prio);
3491
3492         ret = ctdb_control(ctdb, destnode, 0, 
3493                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3494                            tmp_ctx, NULL, &res, &timeout, NULL);
3495         if (ret != 0 || res != 0) {
3496                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3497                 talloc_free(tmp_ctx);
3498                 return -1;
3499         }
3500
3501         talloc_free(tmp_ctx);
3502
3503         return 0;
3504 }
3505
3506 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3507 {
3508         int ret;
3509         int32_t res;
3510         TDB_DATA data;
3511         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3512
3513         data.dptr = (uint8_t*)&db_id;
3514         data.dsize = sizeof(db_id);
3515
3516         ret = ctdb_control(ctdb, destnode, 0, 
3517                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3518                            tmp_ctx, NULL, &res, &timeout, NULL);
3519         if (ret != 0 || res < 0) {
3520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3521                 talloc_free(tmp_ctx);
3522                 return -1;
3523         }
3524
3525         if (priority) {
3526                 *priority = res;
3527         }
3528
3529         talloc_free(tmp_ctx);
3530
3531         return 0;
3532 }
3533
3534 /* time out handler for ctdb_control */
3535 void ctdb_control_timeout_func(struct event_context *ev, struct timed_event *te, 
3536         struct timeval t, void *private_data)
3537 {
3538         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
3539
3540         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
3541                          "dstnode:%u\n", state->reqid, state->c->opcode,
3542                          state->c->hdr.destnode));
3543
3544         state->state = CTDB_CONTROL_TIMEOUT;
3545
3546         /* if we had a callback registered for this control, pull the response
3547            and call the callback.
3548         */
3549         if (state->async.fn) {
3550                 event_add_timed(state->ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
3551         }
3552 }
3553