make the ringbuffer logging more efficient and marshall the data by writing to a...
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
517                 }
518                 return 0;
519         }
520
521         printf("Number of nodes:%d\n", nodemap->num);
522         for(i=0;i<nodemap->num;i++){
523                 static const struct {
524                         uint32_t flag;
525                         const char *name;
526                 } flag_names[] = {
527                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
528                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
529                         { NODE_FLAGS_BANNED,                "BANNED" },
530                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
531                         { NODE_FLAGS_DELETED,               "DELETED" },
532                         { NODE_FLAGS_STOPPED,               "STOPPED" },
533                 };
534                 char *flags_str = NULL;
535                 int j;
536
537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
538                         continue;
539                 }
540                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
541                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
542                                 if (flags_str == NULL) {
543                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
544                                 } else {
545                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
546                                                                            flag_names[j].name);
547                                 }
548                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
549                         }
550                 }
551                 if (flags_str == NULL) {
552                         flags_str = talloc_strdup(ctdb, "OK");
553                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
554                 }
555                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
556                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
557                        flags_str,
558                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
559                 talloc_free(flags_str);
560         }
561
562         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
563         if (ret != 0) {
564                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
565                 return ret;
566         }
567         if (vnnmap->generation == INVALID_GENERATION) {
568                 printf("Generation:INVALID\n");
569         } else {
570                 printf("Generation:%d\n",vnnmap->generation);
571         }
572         printf("Size:%d\n",vnnmap->size);
573         for(i=0;i<vnnmap->size;i++){
574                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
575         }
576
577         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
580                 return ret;
581         }
582         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
583
584         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
585         if (ret != 0) {
586                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
587                 return ret;
588         }
589         printf("Recovery master:%d\n",recmaster);
590
591         return 0;
592 }
593
594
595 struct natgw_node {
596         struct natgw_node *next;
597         const char *addr;
598 };
599
600 /*
601   display the list of nodes belonging to this natgw configuration
602  */
603 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
604 {
605         int i, ret;
606         const char *natgw_list;
607         int nlines;
608         char **lines;
609         struct natgw_node *natgw_nodes = NULL;
610         struct natgw_node *natgw_node;
611         struct ctdb_node_map *nodemap=NULL;
612
613
614         /* read the natgw nodes file into a linked list */
615         natgw_list = getenv("NATGW_NODES");
616         if (natgw_list == NULL) {
617                 natgw_list = "/etc/ctdb/natgw_nodes";
618         }
619         lines = file_lines_load(natgw_list, &nlines, ctdb);
620         if (lines == NULL) {
621                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
622                 return -1;
623         }
624         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
625                 nlines--;
626         }
627         for (i=0;i<nlines;i++) {
628                 char *node;
629
630                 node = lines[i];
631                 /* strip leading spaces */
632                 while((*node == ' ') || (*node == '\t')) {
633                         node++;
634                 }
635                 if (*node == '#') {
636                         continue;
637                 }
638                 if (strcmp(node, "") == 0) {
639                         continue;
640                 }
641                 natgw_node = talloc(ctdb, struct natgw_node);
642                 natgw_node->addr = talloc_strdup(natgw_node, node);
643                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
644                 natgw_node->next = natgw_nodes;
645                 natgw_nodes = natgw_node;
646         }
647
648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
651                 return ret;
652         }
653
654         i=0;
655         while(i<nodemap->num) {
656                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
657                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
658                                 break;
659                         }
660                 }
661
662                 /* this node was not in the natgw so we just remove it from
663                  * the list
664                  */
665                 if ((natgw_node == NULL) 
666                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
667                         int j;
668
669                         for (j=i+1; j<nodemap->num; j++) {
670                                 nodemap->nodes[j-1] = nodemap->nodes[j];
671                         }
672                         nodemap->num--;
673                         continue;
674                 }
675
676                 i++;
677         }               
678
679         /* pick a node to be natgwmaster
680          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
681          */
682         for(i=0;i<nodemap->num;i++){
683                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
684                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
685                         break;
686                 }
687         }
688         /* we couldnt find any healthy node, try unhealthy ones */
689         if (i == nodemap->num) {
690                 for(i=0;i<nodemap->num;i++){
691                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
692                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
693                                 break;
694                         }
695                 }
696         }
697         /* unless all nodes are STOPPED, when we pick one anyway */
698         if (i == nodemap->num) {
699                 for(i=0;i<nodemap->num;i++){
700                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
701                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
702                                 break;
703                         }
704                 }
705                 /* or if we still can not find any */
706                 if (i == nodemap->num) {
707                         printf("-1 0.0.0.0\n");
708                 }
709         }
710
711         /* print the pruned list of nodes belonging to this natgw list */
712         for(i=0;i<nodemap->num;i++){
713                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
714                         continue;
715                 }
716                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
717                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
718                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
723         }
724
725         return 0;
726 }
727
728
729 /*
730   display the status of the monitoring scripts
731  */
732 static int control_scriptstatus(struct ctdb_context *ctdb, int argc, const char **argv)
733 {
734         int i, ret;
735         struct ctdb_monitoring_wire *script_status;
736
737         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, &script_status);
738         if (ret != 0) {
739                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
740                 return ret;
741         }
742
743         printf("%d scripts were executed last monitoring cycle\n", script_status->num_scripts);
744         for (i=0; i<script_status->num_scripts; i++) {
745                 if (script_status->scripts[i].disabled) {
746                         printf("%-20s Status:DISABLED\n",
747                                 script_status->scripts[i].name);
748                         continue;
749                 } 
750                 printf("%-20s Status:%s    ",
751                         script_status->scripts[i].name,
752                         script_status->scripts[i].timedout?"TIMEDOUT":script_status->scripts[i].status==0?"OK":"ERROR");
753                 if (script_status->scripts[i].timedout == 0) {
754                         printf("Duration:%.3lf ",
755                         timeval_delta(&script_status->scripts[i].finished,
756                               &script_status->scripts[i].start));
757                 }
758                 printf("%s",
759                         ctime(&script_status->scripts[i].start.tv_sec));
760                 if ((script_status->scripts[i].timedout != 0)
761                 ||  (script_status->scripts[i].status != 0) ) {
762                         printf("   OUTPUT:%s\n",
763                                 script_status->scripts[i].output);
764                 }
765         }
766
767         return 0;
768 }
769         
770
771 /*
772   enable an eventscript
773  */
774 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
775 {
776         int ret;
777
778         if (argc < 1) {
779                 usage();
780         }
781
782         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
783         if (ret != 0) {
784           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
785                 return ret;
786         }
787
788         return 0;
789 }
790
791 /*
792   disable an eventscript
793  */
794 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
795 {
796         int ret;
797
798         if (argc < 1) {
799                 usage();
800         }
801
802         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
803         if (ret != 0) {
804           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
805                 return ret;
806         }
807
808         return 0;
809 }
810
811 /*
812   display the pnn of the recovery master
813  */
814 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
815 {
816         int ret;
817         uint32_t recmaster;
818
819         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
820         if (ret != 0) {
821                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
822                 return ret;
823         }
824         printf("%d\n",recmaster);
825
826         return 0;
827 }
828
829 /*
830   get a list of all tickles for this pnn
831  */
832 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
833 {
834         struct ctdb_control_tcp_tickle_list *list;
835         ctdb_sock_addr addr;
836         int i, ret;
837
838         if (argc < 1) {
839                 usage();
840         }
841
842         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
843                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
844                 return -1;
845         }
846
847         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
848         if (ret == -1) {
849                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
850                 return -1;
851         }
852
853         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
854         printf("Num tickles:%u\n", list->tickles.num);
855         for (i=0;i<list->tickles.num;i++) {
856                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
857                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
858         }
859
860         talloc_free(list);
861         
862         return 0;
863 }
864
865
866
867 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
868 {
869         struct ctdb_all_public_ips *ips;
870         struct ctdb_public_ip ip;
871         int i, ret;
872         uint32_t *nodes;
873         uint32_t disable_time;
874         TDB_DATA data;
875         struct ctdb_node_map *nodemap=NULL;
876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
877
878         disable_time = 30;
879         data.dptr  = (uint8_t*)&disable_time;
880         data.dsize = sizeof(disable_time);
881         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
882         if (ret != 0) {
883                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
884                 return -1;
885         }
886
887
888
889         /* read the public ip list from the node */
890         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
891         if (ret != 0) {
892                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
893                 talloc_free(tmp_ctx);
894                 return -1;
895         }
896
897         for (i=0;i<ips->num;i++) {
898                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
899                         break;
900                 }
901         }
902         if (i==ips->num) {
903                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
904                         pnn, ctdb_addr_to_str(addr)));
905                 talloc_free(tmp_ctx);
906                 return -1;
907         }
908
909         ip.pnn  = pnn;
910         ip.addr = *addr;
911
912         data.dptr  = (uint8_t *)&ip;
913         data.dsize = sizeof(ip);
914
915         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
916         if (ret != 0) {
917                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
918                 talloc_free(tmp_ctx);
919                 return ret;
920         }
921
922         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
923         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
924                                         nodes, 0,
925                                         TIMELIMIT(),
926                                         false, data,
927                                         NULL, NULL,
928                                         NULL);
929         if (ret != 0) {
930                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
931                 talloc_free(tmp_ctx);
932                 return -1;
933         }
934
935         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
936         if (ret != 0) {
937                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
938                 talloc_free(tmp_ctx);
939                 return -1;
940         }
941
942         talloc_free(tmp_ctx);
943         return 0;
944 }
945
946 /*
947   move/failover an ip address to a specific node
948  */
949 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
950 {
951         uint32_t pnn;
952         ctdb_sock_addr addr;
953
954         if (argc < 2) {
955                 usage();
956                 return -1;
957         }
958
959         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
960                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
961                 return -1;
962         }
963
964
965         if (sscanf(argv[1], "%u", &pnn) != 1) {
966                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
967                 return -1;
968         }
969
970         if (move_ip(ctdb, &addr, pnn) != 0) {
971                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
972                 return -1;
973         }
974
975         return 0;
976 }
977
978 void getips_store_callback(void *param, void *data)
979 {
980         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
981         struct ctdb_all_public_ips *ips = param;
982         int i;
983
984         i = ips->num++;
985         ips->ips[i].pnn  = node_ip->pnn;
986         ips->ips[i].addr = node_ip->addr;
987 }
988
989 void getips_count_callback(void *param, void *data)
990 {
991         uint32_t *count = param;
992
993         (*count)++;
994 }
995
996 #define IP_KEYLEN       4
997 static uint32_t *ip_key(ctdb_sock_addr *ip)
998 {
999         static uint32_t key[IP_KEYLEN];
1000
1001         bzero(key, sizeof(key));
1002
1003         switch (ip->sa.sa_family) {
1004         case AF_INET:
1005                 key[0]  = ip->ip.sin_addr.s_addr;
1006                 break;
1007         case AF_INET6:
1008                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1009                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1010                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1011                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1012                 break;
1013         default:
1014                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1015                 return key;
1016         }
1017
1018         return key;
1019 }
1020
1021 static void *add_ip_callback(void *parm, void *data)
1022 {
1023         return parm;
1024 }
1025
1026 static int
1027 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1028 {
1029         struct ctdb_all_public_ips *tmp_ips;
1030         struct ctdb_node_map *nodemap=NULL;
1031         trbt_tree_t *ip_tree;
1032         int i, j, len, ret;
1033         uint32_t count;
1034
1035         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1036         if (ret != 0) {
1037                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1038                 return ret;
1039         }
1040
1041         ip_tree = trbt_create(tmp_ctx, 0);
1042
1043         for(i=0;i<nodemap->num;i++){
1044                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1045                         continue;
1046                 }
1047                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1048                         continue;
1049                 }
1050
1051                 /* read the public ip list from this node */
1052                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1053                 if (ret != 0) {
1054                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1055                         return -1;
1056                 }
1057         
1058                 for (j=0; j<tmp_ips->num;j++) {
1059                         struct ctdb_public_ip *node_ip;
1060
1061                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1062                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1063                         node_ip->addr = tmp_ips->ips[j].addr;
1064
1065                         trbt_insertarray32_callback(ip_tree,
1066                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1067                                 add_ip_callback,
1068                                 node_ip);
1069                 }
1070                 talloc_free(tmp_ips);
1071         }
1072
1073         /* traverse */
1074         count = 0;
1075         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1076
1077         len = offsetof(struct ctdb_all_public_ips, ips) + 
1078                 count*sizeof(struct ctdb_public_ip);
1079         tmp_ips = talloc_zero_size(tmp_ctx, len);
1080         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1081
1082         *ips = tmp_ips;
1083
1084         return 0;
1085 }
1086
1087
1088 /* 
1089  * scans all other nodes and returns a pnn for another node that can host this 
1090  * ip address or -1
1091  */
1092 static int
1093 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1094 {
1095         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1096         struct ctdb_all_public_ips *ips;
1097         struct ctdb_node_map *nodemap=NULL;
1098         int i, j, ret;
1099
1100         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1101         if (ret != 0) {
1102                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1103                 talloc_free(tmp_ctx);
1104                 return ret;
1105         }
1106
1107         for(i=0;i<nodemap->num;i++){
1108                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1109                         continue;
1110                 }
1111                 if (nodemap->nodes[i].pnn == options.pnn) {
1112                         continue;
1113                 }
1114
1115                 /* read the public ip list from this node */
1116                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1117                 if (ret != 0) {
1118                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1119                         return -1;
1120                 }
1121
1122                 for (j=0;j<ips->num;j++) {
1123                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1124                                 talloc_free(tmp_ctx);
1125                                 return nodemap->nodes[i].pnn;
1126                         }
1127                 }
1128                 talloc_free(ips);
1129         }
1130
1131         talloc_free(tmp_ctx);
1132         return -1;
1133 }
1134
1135 /*
1136   add a public ip address to a node
1137  */
1138 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1139 {
1140         int i, ret;
1141         int len;
1142         uint32_t pnn;
1143         unsigned mask;
1144         ctdb_sock_addr addr;
1145         struct ctdb_control_ip_iface *pub;
1146         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1147         struct ctdb_all_public_ips *ips;
1148
1149
1150         if (argc != 2) {
1151                 talloc_free(tmp_ctx);
1152                 usage();
1153         }
1154
1155         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1156                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1157                 talloc_free(tmp_ctx);
1158                 return -1;
1159         }
1160
1161         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1162         if (ret != 0) {
1163                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1164                 talloc_free(tmp_ctx);
1165                 return ret;
1166         }
1167
1168
1169         /* check if some other node is already serving this ip, if not,
1170          * we will claim it
1171          */
1172         for (i=0;i<ips->num;i++) {
1173                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1174                         break;
1175                 }
1176         }
1177
1178         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1179         pub = talloc_size(tmp_ctx, len); 
1180         CTDB_NO_MEMORY(ctdb, pub);
1181
1182         pub->addr  = addr;
1183         pub->mask  = mask;
1184         pub->len   = strlen(argv[1])+1;
1185         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1186
1187         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1188         if (ret != 0) {
1189                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1190                 talloc_free(tmp_ctx);
1191                 return ret;
1192         }
1193
1194         if (i == ips->num) {
1195                 /* no one has this ip so we claim it */
1196                 pnn  = options.pnn;
1197         } else {
1198                 pnn  = ips->ips[i].pnn;
1199         }
1200
1201         if (move_ip(ctdb, &addr, pnn) != 0) {
1202                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1203                 return -1;
1204         }
1205
1206         talloc_free(tmp_ctx);
1207         return 0;
1208 }
1209
1210 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1211
1212 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1213 {
1214         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1215         struct ctdb_node_map *nodemap=NULL;
1216         struct ctdb_all_public_ips *ips;
1217         int ret, i, j;
1218
1219         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1220         if (ret != 0) {
1221                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1222                 return ret;
1223         }
1224
1225         /* remove it from the nodes that are not hosting the ip currently */
1226         for(i=0;i<nodemap->num;i++){
1227                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1228                         continue;
1229                 }
1230                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1231                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1232                         continue;
1233                 }
1234
1235                 for (j=0;j<ips->num;j++) {
1236                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1237                                 break;
1238                         }
1239                 }
1240                 if (j==ips->num) {
1241                         continue;
1242                 }
1243
1244                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1245                         continue;
1246                 }
1247
1248                 options.pnn = nodemap->nodes[i].pnn;
1249                 control_delip(ctdb, argc, argv);
1250         }
1251
1252
1253         /* remove it from every node (also the one hosting it) */
1254         for(i=0;i<nodemap->num;i++){
1255                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1256                         continue;
1257                 }
1258                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1259                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1260                         continue;
1261                 }
1262
1263                 for (j=0;j<ips->num;j++) {
1264                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1265                                 break;
1266                         }
1267                 }
1268                 if (j==ips->num) {
1269                         continue;
1270                 }
1271
1272                 options.pnn = nodemap->nodes[i].pnn;
1273                 control_delip(ctdb, argc, argv);
1274         }
1275
1276         talloc_free(tmp_ctx);
1277         return 0;
1278 }
1279         
1280 /*
1281   delete a public ip address from a node
1282  */
1283 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1284 {
1285         int i, ret;
1286         ctdb_sock_addr addr;
1287         struct ctdb_control_ip_iface pub;
1288         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1289         struct ctdb_all_public_ips *ips;
1290
1291         if (argc != 1) {
1292                 talloc_free(tmp_ctx);
1293                 usage();
1294         }
1295
1296         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1297                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1298                 return -1;
1299         }
1300
1301         if (options.pnn == CTDB_BROADCAST_ALL) {
1302                 return control_delip_all(ctdb, argc, argv, &addr);
1303         }
1304
1305         pub.addr  = addr;
1306         pub.mask  = 0;
1307         pub.len   = 0;
1308
1309         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1310         if (ret != 0) {
1311                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1312                 talloc_free(tmp_ctx);
1313                 return ret;
1314         }
1315         
1316         for (i=0;i<ips->num;i++) {
1317                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1318                         break;
1319                 }
1320         }
1321
1322         if (i==ips->num) {
1323                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1324                         ctdb_addr_to_str(&addr)));
1325                 talloc_free(tmp_ctx);
1326                 return -1;
1327         }
1328
1329         if (ips->ips[i].pnn == options.pnn) {
1330                 ret = find_other_host_for_public_ip(ctdb, &addr);
1331                 if (ret != -1) {
1332                         if (move_ip(ctdb, &addr, ret) != 0) {
1333                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1334                                 return -1;
1335                         }
1336                 }
1337         }
1338
1339         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1340         if (ret != 0) {
1341                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1342                 talloc_free(tmp_ctx);
1343                 return ret;
1344         }
1345
1346         talloc_free(tmp_ctx);
1347         return 0;
1348 }
1349
1350 /*
1351   kill a tcp connection
1352  */
1353 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1354 {
1355         int ret;
1356         struct ctdb_control_killtcp killtcp;
1357
1358         if (argc < 2) {
1359                 usage();
1360         }
1361
1362         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1363                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1364                 return -1;
1365         }
1366
1367         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1368                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1369                 return -1;
1370         }
1371
1372         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1373         if (ret != 0) {
1374                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1375                 return ret;
1376         }
1377
1378         return 0;
1379 }
1380
1381
1382 /*
1383   send a gratious arp
1384  */
1385 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1386 {
1387         int ret;
1388         ctdb_sock_addr addr;
1389
1390         if (argc < 2) {
1391                 usage();
1392         }
1393
1394         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1395                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1396                 return -1;
1397         }
1398
1399         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1400         if (ret != 0) {
1401                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1402                 return ret;
1403         }
1404
1405         return 0;
1406 }
1407
1408 /*
1409   register a server id
1410  */
1411 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1412 {
1413         int ret;
1414         struct ctdb_server_id server_id;
1415
1416         if (argc < 3) {
1417                 usage();
1418         }
1419
1420         server_id.pnn       = strtoul(argv[0], NULL, 0);
1421         server_id.type      = strtoul(argv[1], NULL, 0);
1422         server_id.server_id = strtoul(argv[2], NULL, 0);
1423
1424         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1425         if (ret != 0) {
1426                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1427                 return ret;
1428         }
1429         return -1;
1430 }
1431
1432 /*
1433   unregister a server id
1434  */
1435 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1436 {
1437         int ret;
1438         struct ctdb_server_id server_id;
1439
1440         if (argc < 3) {
1441                 usage();
1442         }
1443
1444         server_id.pnn       = strtoul(argv[0], NULL, 0);
1445         server_id.type      = strtoul(argv[1], NULL, 0);
1446         server_id.server_id = strtoul(argv[2], NULL, 0);
1447
1448         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1449         if (ret != 0) {
1450                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1451                 return ret;
1452         }
1453         return -1;
1454 }
1455
1456 /*
1457   check if a server id exists
1458  */
1459 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1460 {
1461         uint32_t status;
1462         int ret;
1463         struct ctdb_server_id server_id;
1464
1465         if (argc < 3) {
1466                 usage();
1467         }
1468
1469         server_id.pnn       = strtoul(argv[0], NULL, 0);
1470         server_id.type      = strtoul(argv[1], NULL, 0);
1471         server_id.server_id = strtoul(argv[2], NULL, 0);
1472
1473         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1474         if (ret != 0) {
1475                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1476                 return ret;
1477         }
1478
1479         if (status) {
1480                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1481         } else {
1482                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1483         }
1484         return 0;
1485 }
1486
1487 /*
1488   get a list of all server ids that are registered on a node
1489  */
1490 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1491 {
1492         int i, ret;
1493         struct ctdb_server_id_list *server_ids;
1494
1495         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1496         if (ret != 0) {
1497                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1498                 return ret;
1499         }
1500
1501         for (i=0; i<server_ids->num; i++) {
1502                 printf("Server id %d:%d:%d\n", 
1503                         server_ids->server_ids[i].pnn, 
1504                         server_ids->server_ids[i].type, 
1505                         server_ids->server_ids[i].server_id); 
1506         }
1507
1508         return -1;
1509 }
1510
1511 /*
1512   send a tcp tickle ack
1513  */
1514 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1515 {
1516         int ret;
1517         ctdb_sock_addr  src, dst;
1518
1519         if (argc < 2) {
1520                 usage();
1521         }
1522
1523         if (!parse_ip_port(argv[0], &src)) {
1524                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1525                 return -1;
1526         }
1527
1528         if (!parse_ip_port(argv[1], &dst)) {
1529                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1530                 return -1;
1531         }
1532
1533         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1534         if (ret==0) {
1535                 return 0;
1536         }
1537         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1538
1539         return -1;
1540 }
1541
1542
1543 /*
1544   display public ip status
1545  */
1546 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1547 {
1548         int i, ret;
1549         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1550         struct ctdb_all_public_ips *ips;
1551
1552         if (options.pnn == CTDB_BROADCAST_ALL) {
1553                 /* read the list of public ips from all nodes */
1554                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1555         } else {
1556                 /* read the public ip list from this node */
1557                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1558         }
1559         if (ret != 0) {
1560                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1561                 talloc_free(tmp_ctx);
1562                 return ret;
1563         }
1564
1565         if (options.machinereadable){
1566                 printf(":Public IP:Node:\n");
1567         } else {
1568                 if (options.pnn == CTDB_BROADCAST_ALL) {
1569                         printf("Public IPs on ALL nodes\n");
1570                 } else {
1571                         printf("Public IPs on node %u\n", options.pnn);
1572                 }
1573         }
1574
1575         for (i=1;i<=ips->num;i++) {
1576                 if (options.machinereadable){
1577                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1578                 } else {
1579                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1580                 }
1581         }
1582
1583         talloc_free(tmp_ctx);
1584         return 0;
1585 }
1586
1587 /*
1588   display pid of a ctdb daemon
1589  */
1590 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1591 {
1592         uint32_t pid;
1593         int ret;
1594
1595         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1596         if (ret != 0) {
1597                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1598                 return ret;
1599         }
1600         printf("Pid:%d\n", pid);
1601
1602         return 0;
1603 }
1604
1605 /*
1606   handler for receiving the response to ipreallocate
1607 */
1608 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1609                              TDB_DATA data, void *private_data)
1610 {
1611         exit(0);
1612 }
1613
1614 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1615 {
1616         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1617
1618         event_add_timed(ctdb->ev, ctdb, 
1619                                 timeval_current_ofs(1, 0),
1620                                 ctdb_every_second, ctdb);
1621 }
1622
1623 /*
1624   ask the recovery daemon on the recovery master to perform a ip reallocation
1625  */
1626 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1627 {
1628         int i, ret;
1629         TDB_DATA data;
1630         struct takeover_run_reply rd;
1631         uint32_t recmaster;
1632         struct ctdb_node_map *nodemap=NULL;
1633         int retries=0;
1634         struct timeval tv = timeval_current();
1635
1636         /* we need some events to trigger so we can timeout and restart
1637            the loop
1638         */
1639         event_add_timed(ctdb->ev, ctdb, 
1640                                 timeval_current_ofs(1, 0),
1641                                 ctdb_every_second, ctdb);
1642
1643         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1644         if (rd.pnn == -1) {
1645                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1646                 return -1;
1647         }
1648         rd.srvid = getpid();
1649
1650         /* register a message port for receiveing the reply so that we
1651            can receive the reply
1652         */
1653         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1654
1655         data.dptr = (uint8_t *)&rd;
1656         data.dsize = sizeof(rd);
1657
1658 again:
1659         if (retries>5) {
1660                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1661                 exit(10);
1662         }
1663
1664         /* check that there are valid nodes available */
1665         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1666                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1667                 exit(10);
1668         }
1669         for (i=0; i<nodemap->num;i++) {
1670                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1671                         break;
1672                 }
1673         }
1674         if (i==nodemap->num) {
1675                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1676                 return 0;
1677         }
1678
1679
1680         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1681         if (ret != 0) {
1682                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1683                 return ret;
1684         }
1685
1686         /* verify the node exists */
1687         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1688                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1689                 exit(10);
1690         }
1691
1692
1693         /* check tha there are nodes available that can act as a recmaster */
1694         for (i=0; i<nodemap->num; i++) {
1695                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1696                         continue;
1697                 }
1698                 break;
1699         }
1700         if (i == nodemap->num) {
1701                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1702                 return 0;
1703         }
1704
1705         /* verify the recovery master is not STOPPED, nor BANNED */
1706         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1707                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1708                 retries++;
1709                 sleep(1);
1710                 goto again;
1711         } 
1712
1713         
1714         /* verify the recovery master is not STOPPED, nor BANNED */
1715         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1716                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1717                 retries++;
1718                 sleep(1);
1719                 goto again;
1720         } 
1721
1722         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1723         if (ret != 0) {
1724                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1725                 return -1;
1726         }
1727
1728         tv = timeval_current();
1729         /* this loop will terminate when we have received the reply */
1730         while (timeval_elapsed(&tv) < 3.0) {    
1731                 event_loop_once(ctdb->ev);
1732         }
1733
1734         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1735         retries++;
1736         sleep(1);
1737         goto again;
1738
1739         return 0;
1740 }
1741
1742
1743 /*
1744   disable a remote node
1745  */
1746 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1747 {
1748         int ret;
1749         struct ctdb_node_map *nodemap=NULL;
1750
1751         do {
1752                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1753                 if (ret != 0) {
1754                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1755                         return ret;
1756                 }
1757
1758                 sleep(1);
1759
1760                 /* read the nodemap and verify the change took effect */
1761                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1762                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1763                         exit(10);
1764                 }
1765
1766         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1767         ret = control_ipreallocate(ctdb, argc, argv);
1768         if (ret != 0) {
1769                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1770                 return ret;
1771         }
1772
1773         return 0;
1774 }
1775
1776 /*
1777   enable a disabled remote node
1778  */
1779 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1780 {
1781         int ret;
1782
1783         struct ctdb_node_map *nodemap=NULL;
1784
1785         do {
1786                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1787                 if (ret != 0) {
1788                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1789                         return ret;
1790                 }
1791
1792                 sleep(1);
1793
1794                 /* read the nodemap and verify the change took effect */
1795                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1796                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1797                         exit(10);
1798                 }
1799
1800         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1801         ret = control_ipreallocate(ctdb, argc, argv);
1802         if (ret != 0) {
1803                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1804                 return ret;
1805         }
1806
1807         return 0;
1808 }
1809
1810 /*
1811   stop a remote node
1812  */
1813 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1814 {
1815         int ret;
1816         struct ctdb_node_map *nodemap=NULL;
1817
1818         do {
1819                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1820                 if (ret != 0) {
1821                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1822                 }
1823         
1824                 sleep(1);
1825
1826                 /* read the nodemap and verify the change took effect */
1827                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1828                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1829                         exit(10);
1830                 }
1831
1832         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1833         ret = control_ipreallocate(ctdb, argc, argv);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1836                 return ret;
1837         }
1838
1839         return 0;
1840 }
1841
1842 /*
1843   restart a stopped remote node
1844  */
1845 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1846 {
1847         int ret;
1848
1849         struct ctdb_node_map *nodemap=NULL;
1850
1851         do {
1852                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1853                 if (ret != 0) {
1854                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1855                         return ret;
1856                 }
1857         
1858                 sleep(1);
1859
1860                 /* read the nodemap and verify the change took effect */
1861                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1862                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1863                         exit(10);
1864                 }
1865
1866         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1867         ret = control_ipreallocate(ctdb, argc, argv);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1870                 return ret;
1871         }
1872
1873         return 0;
1874 }
1875
1876 static uint32_t get_generation(struct ctdb_context *ctdb)
1877 {
1878         struct ctdb_vnn_map *vnnmap=NULL;
1879         int ret;
1880
1881         /* wait until the recmaster is not in recovery mode */
1882         while (1) {
1883                 uint32_t recmode, recmaster;
1884                 
1885                 if (vnnmap != NULL) {
1886                         talloc_free(vnnmap);
1887                         vnnmap = NULL;
1888                 }
1889
1890                 /* get the recmaster */
1891                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1892                 if (ret != 0) {
1893                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1894                         exit(10);
1895                 }
1896
1897                 /* get recovery mode */
1898                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1899                 if (ret != 0) {
1900                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1901                         exit(10);
1902                 }
1903
1904                 /* get the current generation number */
1905                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
1906                 if (ret != 0) {
1907                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
1908                         exit(10);
1909                 }
1910
1911                 if ((recmode == CTDB_RECOVERY_NORMAL)
1912                 &&  (vnnmap->generation != 1)){
1913                         return vnnmap->generation;
1914                 }
1915                 sleep(1);
1916         }
1917 }
1918
1919 /*
1920   ban a node from the cluster
1921  */
1922 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
1923 {
1924         int ret;
1925         struct ctdb_node_map *nodemap=NULL;
1926         struct ctdb_ban_time bantime;
1927
1928         if (argc < 1) {
1929                 usage();
1930         }
1931         
1932         /* verify the node exists */
1933         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1934         if (ret != 0) {
1935                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1936                 return ret;
1937         }
1938
1939         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
1940                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
1941                 return -1;
1942         }
1943
1944         bantime.pnn  = options.pnn;
1945         bantime.time = strtoul(argv[0], NULL, 0);
1946
1947         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
1948         if (ret != 0) {
1949                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
1950                 return -1;
1951         }       
1952
1953         ret = control_ipreallocate(ctdb, argc, argv);
1954         if (ret != 0) {
1955                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1956                 return ret;
1957         }
1958
1959         return 0;
1960 }
1961
1962
1963 /*
1964   unban a node from the cluster
1965  */
1966 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
1967 {
1968         int ret;
1969         struct ctdb_node_map *nodemap=NULL;
1970         struct ctdb_ban_time bantime;
1971
1972         /* verify the node exists */
1973         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1974         if (ret != 0) {
1975                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1976                 return ret;
1977         }
1978
1979         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
1980                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
1981                 return -1;
1982         }
1983
1984         bantime.pnn  = options.pnn;
1985         bantime.time = 0;
1986
1987         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
1988         if (ret != 0) {
1989                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
1990                 return -1;
1991         }       
1992
1993         ret = control_ipreallocate(ctdb, argc, argv);
1994         if (ret != 0) {
1995                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1996                 return ret;
1997         }
1998
1999         return 0;
2000 }
2001
2002
2003 /*
2004   show ban information for a node
2005  */
2006 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2007 {
2008         int ret;
2009         struct ctdb_node_map *nodemap=NULL;
2010         struct ctdb_ban_time *bantime;
2011
2012         /* verify the node exists */
2013         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2016                 return ret;
2017         }
2018
2019         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2020         if (ret != 0) {
2021                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2022                 return -1;
2023         }       
2024
2025         if (bantime->time == 0) {
2026                 printf("Node %u is not banned\n", bantime->pnn);
2027         } else {
2028                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2029         }
2030
2031         return 0;
2032 }
2033
2034 /*
2035   shutdown a daemon
2036  */
2037 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2038 {
2039         int ret;
2040
2041         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2042         if (ret != 0) {
2043                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2044                 return ret;
2045         }
2046
2047         return 0;
2048 }
2049
2050 /*
2051   trigger a recovery
2052  */
2053 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2054 {
2055         int ret;
2056         uint32_t generation, next_generation;
2057
2058         /* record the current generation number */
2059         generation = get_generation(ctdb);
2060
2061         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2062         if (ret != 0) {
2063                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2064                 return ret;
2065         }
2066
2067         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2068         if (ret != 0) {
2069                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2070                 return ret;
2071         }
2072
2073         /* wait until we are in a new generation */
2074         while (1) {
2075                 next_generation = get_generation(ctdb);
2076                 if (next_generation != generation) {
2077                         return 0;
2078                 }
2079                 sleep(1);
2080         }
2081
2082         return 0;
2083 }
2084
2085
2086 /*
2087   display monitoring mode of a remote node
2088  */
2089 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2090 {
2091         uint32_t monmode;
2092         int ret;
2093
2094         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2095         if (ret != 0) {
2096                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2097                 return ret;
2098         }
2099         if (!options.machinereadable){
2100                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2101         } else {
2102                 printf(":mode:\n");
2103                 printf(":%d:\n",monmode);
2104         }
2105         return 0;
2106 }
2107
2108
2109 /*
2110   display capabilities of a remote node
2111  */
2112 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2113 {
2114         uint32_t capabilities;
2115         int ret;
2116
2117         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2118         if (ret != 0) {
2119                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2120                 return ret;
2121         }
2122         
2123         if (!options.machinereadable){
2124                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2125                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2126                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2127                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2128         } else {
2129                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2130                 printf(":%d:%d:%d:%d:\n",
2131                         !!(capabilities&CTDB_CAP_RECMASTER),
2132                         !!(capabilities&CTDB_CAP_LMASTER),
2133                         !!(capabilities&CTDB_CAP_LVS),
2134                         !!(capabilities&CTDB_CAP_NATGW));
2135         }
2136         return 0;
2137 }
2138
2139 /*
2140   display lvs configuration
2141  */
2142 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2143 {
2144         uint32_t *capabilities;
2145         struct ctdb_node_map *nodemap=NULL;
2146         int i, ret;
2147         int healthy_count = 0;
2148
2149         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2150         if (ret != 0) {
2151                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2152                 return ret;
2153         }
2154
2155         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2156         CTDB_NO_MEMORY(ctdb, capabilities);
2157         
2158         /* collect capabilities for all connected nodes */
2159         for (i=0; i<nodemap->num; i++) {
2160                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2161                         continue;
2162                 }
2163                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2164                         continue;
2165                 }
2166         
2167                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2168                 if (ret != 0) {
2169                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2170                         return ret;
2171                 }
2172
2173                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2174                         continue;
2175                 }
2176
2177                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2178                         healthy_count++;
2179                 }
2180         }
2181
2182         /* Print all LVS nodes */
2183         for (i=0; i<nodemap->num; i++) {
2184                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2185                         continue;
2186                 }
2187                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2188                         continue;
2189                 }
2190                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2191                         continue;
2192                 }
2193
2194                 if (healthy_count != 0) {
2195                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2196                                 continue;
2197                         }
2198                 }
2199
2200                 printf("%d:%s\n", i, 
2201                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2202         }
2203
2204         return 0;
2205 }
2206
2207 /*
2208   display who is the lvs master
2209  */
2210 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2211 {
2212         uint32_t *capabilities;
2213         struct ctdb_node_map *nodemap=NULL;
2214         int i, ret;
2215         int healthy_count = 0;
2216
2217         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2218         if (ret != 0) {
2219                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2220                 return ret;
2221         }
2222
2223         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2224         CTDB_NO_MEMORY(ctdb, capabilities);
2225         
2226         /* collect capabilities for all connected nodes */
2227         for (i=0; i<nodemap->num; i++) {
2228                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2229                         continue;
2230                 }
2231                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2232                         continue;
2233                 }
2234         
2235                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2236                 if (ret != 0) {
2237                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2238                         return ret;
2239                 }
2240
2241                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2242                         continue;
2243                 }
2244
2245                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2246                         healthy_count++;
2247                 }
2248         }
2249
2250         /* find and show the lvsmaster */
2251         for (i=0; i<nodemap->num; i++) {
2252                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2253                         continue;
2254                 }
2255                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2256                         continue;
2257                 }
2258                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2259                         continue;
2260                 }
2261
2262                 if (healthy_count != 0) {
2263                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2264                                 continue;
2265                         }
2266                 }
2267
2268                 if (options.machinereadable){
2269                         printf("%d\n", i);
2270                 } else {
2271                         printf("Node %d is LVS master\n", i);
2272                 }
2273                 return 0;
2274         }
2275
2276         printf("There is no LVS master\n");
2277         return -1;
2278 }
2279
2280 /*
2281   disable monitoring on a  node
2282  */
2283 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2284 {
2285         
2286         int ret;
2287
2288         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2289         if (ret != 0) {
2290                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2291                 return ret;
2292         }
2293         printf("Monitoring mode:%s\n","DISABLED");
2294
2295         return 0;
2296 }
2297
2298 /*
2299   enable monitoring on a  node
2300  */
2301 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2302 {
2303         
2304         int ret;
2305
2306         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2307         if (ret != 0) {
2308                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2309                 return ret;
2310         }
2311         printf("Monitoring mode:%s\n","ACTIVE");
2312
2313         return 0;
2314 }
2315
2316 /*
2317   display remote list of keys/data for a db
2318  */
2319 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2320 {
2321         const char *db_name;
2322         struct ctdb_db_context *ctdb_db;
2323         int ret;
2324
2325         if (argc < 1) {
2326                 usage();
2327         }
2328
2329         db_name = argv[0];
2330
2331
2332         if (db_exists(ctdb, db_name)) {
2333                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2334                 return -1;
2335         }
2336
2337         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2338
2339         if (ctdb_db == NULL) {
2340                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2341                 return -1;
2342         }
2343
2344         /* traverse and dump the cluster tdb */
2345         ret = ctdb_dump_db(ctdb_db, stdout);
2346         if (ret == -1) {
2347                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2348                 return -1;
2349         }
2350         talloc_free(ctdb_db);
2351
2352         printf("Dumped %d records\n", ret);
2353         return 0;
2354 }
2355
2356
2357 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2358                              TDB_DATA data, void *private_data)
2359 {
2360         DEBUG(DEBUG_ERR,("Log data received\n"));
2361         if (data.dsize > 0) {
2362                 printf("%s", data.dptr);
2363         }
2364
2365         exit(0);
2366 }
2367
2368 /*
2369   display a list of log messages from the in memory ringbuffer
2370  */
2371 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2372 {
2373         int ret;
2374         int32_t res;
2375         struct ctdb_get_log_addr log_addr;
2376         TDB_DATA data;
2377         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2378         char *errmsg;
2379         struct timeval tv;
2380
2381         if (argc != 1) {
2382                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2383                 talloc_free(tmp_ctx);
2384                 return -1;
2385         }
2386
2387         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2388         log_addr.srvid = getpid();
2389         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2390                 log_addr.level = get_debug_by_desc(argv[0]);
2391         } else {
2392                 log_addr.level = strtol(argv[0], NULL, 0);
2393         }
2394
2395
2396         data.dptr = (unsigned char *)&log_addr;
2397         data.dsize = sizeof(log_addr);
2398
2399         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2400
2401         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2402         sleep(1);
2403
2404         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2405
2406         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2407                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2408         if (ret != 0 || res != 0) {
2409                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2410                 talloc_free(tmp_ctx);
2411                 return -1;
2412         }
2413
2414
2415         tv = timeval_current();
2416         /* this loop will terminate when we have received the reply */
2417         while (timeval_elapsed(&tv) < 3.0) {    
2418                 event_loop_once(ctdb->ev);
2419         }
2420
2421         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2422
2423         talloc_free(tmp_ctx);
2424         return 0;
2425 }
2426
2427 /*
2428   clear the in memory log area
2429  */
2430 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2431 {
2432         int ret;
2433         int32_t res;
2434         char *errmsg;
2435         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2436
2437         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2438                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2439         if (ret != 0 || res != 0) {
2440                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2441                 talloc_free(tmp_ctx);
2442                 return -1;
2443         }
2444
2445         talloc_free(tmp_ctx);
2446         return 0;
2447 }
2448
2449
2450
2451 /*
2452   display a list of the databases on a remote ctdb
2453  */
2454 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2455 {
2456         int i, ret;
2457         struct ctdb_dbid_map *dbmap=NULL;
2458
2459         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2460         if (ret != 0) {
2461                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2462                 return ret;
2463         }
2464
2465         printf("Number of databases:%d\n", dbmap->num);
2466         for(i=0;i<dbmap->num;i++){
2467                 const char *path;
2468                 const char *name;
2469                 bool persistent;
2470
2471                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2472                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2473                 persistent = dbmap->dbs[i].persistent;
2474                 printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name, 
2475                        path, persistent?"PERSISTENT":"");
2476         }
2477
2478         return 0;
2479 }
2480
2481 /*
2482   check if the local node is recmaster or not
2483   it will return 1 if this node is the recmaster and 0 if it is not
2484   or if the local ctdb daemon could not be contacted
2485  */
2486 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2487 {
2488         uint32_t mypnn, recmaster;
2489         int ret;
2490
2491         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2492         if (mypnn == -1) {
2493                 printf("Failed to get pnn of node\n");
2494                 return 1;
2495         }
2496
2497         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2498         if (ret != 0) {
2499                 printf("Failed to get the recmaster\n");
2500                 return 1;
2501         }
2502
2503         if (recmaster != mypnn) {
2504                 printf("this node is not the recmaster\n");
2505                 return 1;
2506         }
2507
2508         printf("this node is the recmaster\n");
2509         return 0;
2510 }
2511
2512 /*
2513   ping a node
2514  */
2515 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2516 {
2517         int ret;
2518         struct timeval tv = timeval_current();
2519         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2520         if (ret == -1) {
2521                 printf("Unable to get ping response from node %u\n", options.pnn);
2522                 return -1;
2523         } else {
2524                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2525                        options.pnn, timeval_elapsed(&tv), ret);
2526         }
2527         return 0;
2528 }
2529
2530
2531 /*
2532   get a tunable
2533  */
2534 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2535 {
2536         const char *name;
2537         uint32_t value;
2538         int ret;
2539
2540         if (argc < 1) {
2541                 usage();
2542         }
2543
2544         name = argv[0];
2545         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2546         if (ret == -1) {
2547                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2548                 return -1;
2549         }
2550
2551         printf("%-19s = %u\n", name, value);
2552         return 0;
2553 }
2554
2555 /*
2556   set a tunable
2557  */
2558 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2559 {
2560         const char *name;
2561         uint32_t value;
2562         int ret;
2563
2564         if (argc < 2) {
2565                 usage();
2566         }
2567
2568         name = argv[0];
2569         value = strtoul(argv[1], NULL, 0);
2570
2571         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2572         if (ret == -1) {
2573                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2574                 return -1;
2575         }
2576         return 0;
2577 }
2578
2579 /*
2580   list all tunables
2581  */
2582 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2583 {
2584         uint32_t count;
2585         const char **list;
2586         int ret, i;
2587
2588         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2589         if (ret == -1) {
2590                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2591                 return -1;
2592         }
2593
2594         for (i=0;i<count;i++) {
2595                 control_getvar(ctdb, 1, &list[i]);
2596         }
2597
2598         talloc_free(list);
2599         
2600         return 0;
2601 }
2602
2603 /*
2604   display debug level on a node
2605  */
2606 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2607 {
2608         int ret;
2609         int32_t level;
2610
2611         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2612         if (ret != 0) {
2613                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2614                 return ret;
2615         } else {
2616                 if (options.machinereadable){
2617                         printf(":Name:Level:\n");
2618                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2619                 } else {
2620                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2621                 }
2622         }
2623         return 0;
2624 }
2625
2626 /*
2627   display reclock file of a node
2628  */
2629 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2630 {
2631         int ret;
2632         const char *reclock;
2633
2634         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2635         if (ret != 0) {
2636                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2637                 return ret;
2638         } else {
2639                 if (options.machinereadable){
2640                         if (reclock != NULL) {
2641                                 printf("%s", reclock);
2642                         }
2643                 } else {
2644                         if (reclock == NULL) {
2645                                 printf("No reclock file used.\n");
2646                         } else {
2647                                 printf("Reclock file:%s\n", reclock);
2648                         }
2649                 }
2650         }
2651         return 0;
2652 }
2653
2654 /*
2655   set the reclock file of a node
2656  */
2657 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2658 {
2659         int ret;
2660         const char *reclock;
2661
2662         if (argc == 0) {
2663                 reclock = NULL;
2664         } else if (argc == 1) {
2665                 reclock = argv[0];
2666         } else {
2667                 usage();
2668         }
2669
2670         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2671         if (ret != 0) {
2672                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2673                 return ret;
2674         }
2675         return 0;
2676 }
2677
2678 /*
2679   set the natgw state on/off
2680  */
2681 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2682 {
2683         int ret;
2684         uint32_t natgwstate;
2685
2686         if (argc == 0) {
2687                 usage();
2688         }
2689
2690         if (!strcmp(argv[0], "on")) {
2691                 natgwstate = 1;
2692         } else if (!strcmp(argv[0], "off")) {
2693                 natgwstate = 0;
2694         } else {
2695                 usage();
2696         }
2697
2698         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2699         if (ret != 0) {
2700                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2701                 return ret;
2702         }
2703
2704         return 0;
2705 }
2706
2707 /*
2708   set the lmaster role on/off
2709  */
2710 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2711 {
2712         int ret;
2713         uint32_t lmasterrole;
2714
2715         if (argc == 0) {
2716                 usage();
2717         }
2718
2719         if (!strcmp(argv[0], "on")) {
2720                 lmasterrole = 1;
2721         } else if (!strcmp(argv[0], "off")) {
2722                 lmasterrole = 0;
2723         } else {
2724                 usage();
2725         }
2726
2727         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2728         if (ret != 0) {
2729                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2730                 return ret;
2731         }
2732
2733         return 0;
2734 }
2735
2736 /*
2737   set the recmaster role on/off
2738  */
2739 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2740 {
2741         int ret;
2742         uint32_t recmasterrole;
2743
2744         if (argc == 0) {
2745                 usage();
2746         }
2747
2748         if (!strcmp(argv[0], "on")) {
2749                 recmasterrole = 1;
2750         } else if (!strcmp(argv[0], "off")) {
2751                 recmasterrole = 0;
2752         } else {
2753                 usage();
2754         }
2755
2756         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2757         if (ret != 0) {
2758                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2759                 return ret;
2760         }
2761
2762         return 0;
2763 }
2764
2765 /*
2766   set debug level on a node or all nodes
2767  */
2768 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2769 {
2770         int i, ret;
2771         int32_t level;
2772
2773         if (argc == 0) {
2774                 printf("You must specify the debug level. Valid levels are:\n");
2775                 for (i=0; debug_levels[i].description != NULL; i++) {
2776                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2777                 }
2778
2779                 return 0;
2780         }
2781
2782         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2783                 level = get_debug_by_desc(argv[0]);
2784         } else {
2785                 level = strtol(argv[0], NULL, 0);
2786         }
2787
2788         for (i=0; debug_levels[i].description != NULL; i++) {
2789                 if (level == debug_levels[i].level) {
2790                         break;
2791                 }
2792         }
2793         if (debug_levels[i].description == NULL) {
2794                 printf("Invalid debug level, must be one of\n");
2795                 for (i=0; debug_levels[i].description != NULL; i++) {
2796                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2797                 }
2798                 return -1;
2799         }
2800
2801         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2802         if (ret != 0) {
2803                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2804         }
2805         return 0;
2806 }
2807
2808
2809 /*
2810   freeze a node
2811  */
2812 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2813 {
2814         int ret;
2815         uint32_t priority;
2816         
2817         if (argc == 1) {
2818                 priority = strtol(argv[0], NULL, 0);
2819         } else {
2820                 priority = 0;
2821         }
2822         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2823
2824         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2825         if (ret != 0) {
2826                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2827         }               
2828         return 0;
2829 }
2830
2831 /*
2832   thaw a node
2833  */
2834 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
2835 {
2836         int ret;
2837         uint32_t priority;
2838         
2839         if (argc == 1) {
2840                 priority = strtol(argv[0], NULL, 0);
2841         } else {
2842                 priority = 0;
2843         }
2844         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
2845
2846         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2847         if (ret != 0) {
2848                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
2849         }               
2850         return 0;
2851 }
2852
2853
2854 /*
2855   attach to a database
2856  */
2857 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
2858 {
2859         const char *db_name;
2860         struct ctdb_db_context *ctdb_db;
2861
2862         if (argc < 1) {
2863                 usage();
2864         }
2865         db_name = argv[0];
2866
2867         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2868         if (ctdb_db == NULL) {
2869                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2870                 return -1;
2871         }
2872
2873         return 0;
2874 }
2875
2876 /*
2877   set db priority
2878  */
2879 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2880 {
2881         struct ctdb_db_priority db_prio;
2882         int ret;
2883
2884         if (argc < 2) {
2885                 usage();
2886         }
2887
2888         db_prio.db_id    = strtoul(argv[0], NULL, 0);
2889         db_prio.priority = strtoul(argv[1], NULL, 0);
2890
2891         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
2892         if (ret != 0) {
2893                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
2894                 return -1;
2895         }
2896
2897         return 0;
2898 }
2899
2900 /*
2901   get db priority
2902  */
2903 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2904 {
2905         uint32_t db_id, priority;
2906         int ret;
2907
2908         if (argc < 1) {
2909                 usage();
2910         }
2911
2912         db_id = strtoul(argv[0], NULL, 0);
2913
2914         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
2915         if (ret != 0) {
2916                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
2917                 return -1;
2918         }
2919
2920         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
2921
2922         return 0;
2923 }
2924
2925 /*
2926   run an eventscript on a node
2927  */
2928 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
2929 {
2930         TDB_DATA data;
2931         int ret;
2932         int32_t res;
2933         char *errmsg;
2934         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2935
2936         if (argc != 1) {
2937                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2938                 return -1;
2939         }
2940
2941         data.dptr = (unsigned char *)discard_const(argv[0]);
2942         data.dsize = strlen((char *)data.dptr) + 1;
2943
2944         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
2945
2946         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
2947                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
2950                 talloc_free(tmp_ctx);
2951                 return -1;
2952         }
2953         talloc_free(tmp_ctx);
2954         return 0;
2955 }
2956
2957 #define DB_VERSION 1
2958 #define MAX_DB_NAME 64
2959 struct db_file_header {
2960         unsigned long version;
2961         time_t timestamp;
2962         unsigned long persistent;
2963         unsigned long size;
2964         const char name[MAX_DB_NAME];
2965 };
2966
2967 struct backup_data {
2968         struct ctdb_marshall_buffer *records;
2969         uint32_t len;
2970         uint32_t total;
2971         bool traverse_error;
2972 };
2973
2974 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
2975 {
2976         struct backup_data *bd = talloc_get_type(private, struct backup_data);
2977         struct ctdb_rec_data *rec;
2978
2979         /* add the record */
2980         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
2981         if (rec == NULL) {
2982                 bd->traverse_error = true;
2983                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
2984                 return -1;
2985         }
2986         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
2987         if (bd->records == NULL) {
2988                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
2989                 bd->traverse_error = true;
2990                 return -1;
2991         }
2992         bd->records->count++;
2993         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
2994         bd->len += rec->length;
2995         talloc_free(rec);
2996
2997         bd->total++;
2998         return 0;
2999 }
3000
3001 /*
3002  * backup a database to a file 
3003  */
3004 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3005 {
3006         int i, ret;
3007         struct ctdb_dbid_map *dbmap=NULL;
3008         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3009         struct db_file_header dbhdr;
3010         struct ctdb_db_context *ctdb_db;
3011         struct backup_data *bd;
3012         int fh = -1;
3013         int status = -1;
3014
3015         if (argc != 2) {
3016                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3017                 return -1;
3018         }
3019
3020         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3021         if (ret != 0) {
3022                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3023                 return ret;
3024         }
3025
3026         for(i=0;i<dbmap->num;i++){
3027                 const char *name;
3028
3029                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3030                 if(!strcmp(argv[0], name)){
3031                         talloc_free(discard_const(name));
3032                         break;
3033                 }
3034                 talloc_free(discard_const(name));
3035         }
3036         if (i == dbmap->num) {
3037                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3038                 talloc_free(tmp_ctx);
3039                 return -1;
3040         }
3041
3042
3043         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3044         if (ctdb_db == NULL) {
3045                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3046                 talloc_free(tmp_ctx);
3047                 return -1;
3048         }
3049
3050
3051         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3052         if (ret == -1) {
3053                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3054                 talloc_free(tmp_ctx);
3055                 return -1;
3056         }
3057
3058
3059         bd = talloc_zero(tmp_ctx, struct backup_data);
3060         if (bd == NULL) {
3061                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3062                 talloc_free(tmp_ctx);
3063                 return -1;
3064         }
3065
3066         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3067         if (bd->records == NULL) {
3068                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3069                 talloc_free(tmp_ctx);
3070                 return -1;
3071         }
3072
3073         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3074         bd->records->db_id = ctdb_db->db_id;
3075         /* traverse the database collecting all records */
3076         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3077             bd->traverse_error) {
3078                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3079                 talloc_free(tmp_ctx);
3080                 return -1;              
3081         }
3082
3083         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3084
3085
3086         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3087         if (fh == -1) {
3088                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3089                 talloc_free(tmp_ctx);
3090                 return -1;
3091         }
3092
3093         dbhdr.version = DB_VERSION;
3094         dbhdr.timestamp = time(NULL);
3095         dbhdr.persistent = dbmap->dbs[i].persistent;
3096         dbhdr.size = bd->len;
3097         if (strlen(argv[0]) >= MAX_DB_NAME) {
3098                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3099                 goto done;
3100         }
3101         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3102         ret = write(fh, &dbhdr, sizeof(dbhdr));
3103         if (ret == -1) {
3104                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3105                 goto done;
3106         }
3107         ret = write(fh, bd->records, bd->len);
3108         if (ret == -1) {
3109                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3110                 goto done;
3111         }
3112
3113         status = 0;
3114 done:
3115         if (fh != -1) {
3116                 ret = close(fh);
3117                 if (ret == -1) {
3118                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3119                 }
3120         }
3121         talloc_free(tmp_ctx);
3122         return status;
3123 }
3124
3125 /*
3126  * restore a database from a file 
3127  */
3128 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3129 {
3130         int ret;
3131         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3132         TDB_DATA outdata;
3133         TDB_DATA data;
3134         struct db_file_header dbhdr;
3135         struct ctdb_db_context *ctdb_db;
3136         struct ctdb_node_map *nodemap=NULL;
3137         struct ctdb_vnn_map *vnnmap=NULL;
3138         int i, fh;
3139         struct ctdb_control_wipe_database w;
3140         uint32_t *nodes;
3141         uint32_t generation;
3142         struct tm *tm;
3143         char tbuf[100];
3144
3145         if (argc != 1) {
3146                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3147                 return -1;
3148         }
3149
3150         fh = open(argv[0], O_RDONLY);
3151         if (fh == -1) {
3152                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3153                 talloc_free(tmp_ctx);
3154                 return -1;
3155         }
3156
3157         read(fh, &dbhdr, sizeof(dbhdr));
3158         if (dbhdr.version != DB_VERSION) {
3159                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3160                 talloc_free(tmp_ctx);
3161                 return -1;
3162         }
3163
3164         outdata.dsize = dbhdr.size;
3165         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3166         if (outdata.dptr == NULL) {
3167                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3168                 close(fh);
3169                 talloc_free(tmp_ctx);
3170                 return -1;
3171         }               
3172         read(fh, outdata.dptr, outdata.dsize);
3173         close(fh);
3174
3175         tm = localtime(&dbhdr.timestamp);
3176         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3177         printf("Restoring database '%s' from backup @ %s\n",
3178                 dbhdr.name, tbuf);
3179
3180
3181         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3182         if (ctdb_db == NULL) {
3183                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3184                 talloc_free(tmp_ctx);
3185                 return -1;
3186         }
3187
3188         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3189         if (ret != 0) {
3190                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3191                 talloc_free(tmp_ctx);
3192                 return ret;
3193         }
3194
3195
3196         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3197         if (ret != 0) {
3198                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3199                 talloc_free(tmp_ctx);
3200                 return ret;
3201         }
3202
3203         /* freeze all nodes */
3204         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3205         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3206                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3207                                         nodes, i,
3208                                         TIMELIMIT(),
3209                                         false, tdb_null,
3210                                         NULL, NULL,
3211                                         NULL) != 0) {
3212                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3213                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3214                         talloc_free(tmp_ctx);
3215                         return -1;
3216                 }
3217         }
3218
3219         generation = vnnmap->generation;
3220         data.dptr = (void *)&generation;
3221         data.dsize = sizeof(generation);
3222
3223         /* start a cluster wide transaction */
3224         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3225         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3226                                         nodes, 0,
3227                                         TIMELIMIT(), false, data,
3228                                         NULL, NULL,
3229                                         NULL) != 0) {
3230                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3231                 return -1;
3232         }
3233
3234
3235         w.db_id = ctdb_db->db_id;
3236         w.transaction_id = generation;
3237
3238         data.dptr = (void *)&w;
3239         data.dsize = sizeof(w);
3240
3241         /* wipe all the remote databases. */
3242         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3243         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3244                                         nodes, 0,
3245                                         TIMELIMIT(), false, data,
3246                                         NULL, NULL,
3247                                         NULL) != 0) {
3248                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3249                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3250                 talloc_free(tmp_ctx);
3251                 return -1;
3252         }
3253         
3254         /* push the database */
3255         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3256         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3257                                         nodes, 0,
3258                                         TIMELIMIT(), false, outdata,
3259                                         NULL, NULL,
3260                                         NULL) != 0) {
3261                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3262                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3263                 talloc_free(tmp_ctx);
3264                 return -1;
3265         }
3266
3267         data.dptr = (void *)&generation;
3268         data.dsize = sizeof(generation);
3269
3270         /* commit all the changes */
3271         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3272                                         nodes, 0,
3273                                         TIMELIMIT(), false, data,
3274                                         NULL, NULL,
3275                                         NULL) != 0) {
3276                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3277                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3278                 talloc_free(tmp_ctx);
3279                 return -1;
3280         }
3281
3282
3283         /* thaw all nodes */
3284         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3285         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3286                                         nodes, 0,
3287                                         TIMELIMIT(),
3288                                         false, tdb_null,
3289                                         NULL, NULL,
3290                                         NULL) != 0) {
3291                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3292                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3293                 talloc_free(tmp_ctx);
3294                 return -1;
3295         }
3296
3297
3298         talloc_free(tmp_ctx);
3299         return 0;
3300 }
3301
3302 /*
3303  * set flags of a node in the nodemap
3304  */
3305 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3306 {
3307         int ret;
3308         int32_t status;
3309         int node;
3310         int flags;
3311         TDB_DATA data;
3312         struct ctdb_node_flag_change c;
3313
3314         if (argc != 2) {
3315                 usage();
3316                 return -1;
3317         }
3318
3319         if (sscanf(argv[0], "%d", &node) != 1) {
3320                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3321                 usage();
3322                 return -1;
3323         }
3324         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3325                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3326                 usage();
3327                 return -1;
3328         }
3329
3330         c.pnn       = node;
3331         c.old_flags = 0;
3332         c.new_flags = flags;
3333
3334         data.dsize = sizeof(c);
3335         data.dptr = (unsigned char *)&c;
3336
3337         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3338                            data, NULL, NULL, &status, NULL, NULL);
3339         if (ret != 0 || status != 0) {
3340                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3341                 return -1;
3342         }
3343         return 0;
3344 }
3345
3346 /*
3347   dump memory usage
3348  */
3349 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3350 {
3351         TDB_DATA data;
3352         int ret;
3353         int32_t res;
3354         char *errmsg;
3355         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3356         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3357                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3358         if (ret != 0 || res != 0) {
3359                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3360                 talloc_free(tmp_ctx);
3361                 return -1;
3362         }
3363         write(1, data.dptr, data.dsize);
3364         talloc_free(tmp_ctx);
3365         return 0;
3366 }
3367
3368 /*
3369   handler for memory dumps
3370 */
3371 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3372                              TDB_DATA data, void *private_data)
3373 {
3374         write(1, data.dptr, data.dsize);
3375         exit(0);
3376 }
3377
3378 /*
3379   dump memory usage on the recovery daemon
3380  */
3381 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3382 {
3383         int ret;
3384         TDB_DATA data;
3385         struct rd_memdump_reply rd;
3386
3387         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3388         if (rd.pnn == -1) {
3389                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3390                 return -1;
3391         }
3392         rd.srvid = getpid();
3393
3394         /* register a message port for receiveing the reply so that we
3395            can receive the reply
3396         */
3397         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3398
3399
3400         data.dptr = (uint8_t *)&rd;
3401         data.dsize = sizeof(rd);
3402
3403         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3404         if (ret != 0) {
3405                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3406                 return -1;
3407         }
3408
3409         /* this loop will terminate when we have received the reply */
3410         while (1) {     
3411                 event_loop_once(ctdb->ev);
3412         }
3413
3414         return 0;
3415 }
3416
3417 /*
3418   list all nodes in the cluster
3419   if the daemon is running, we read the data from the daemon.
3420   if the daemon is not running we parse the nodes file directly
3421  */
3422 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3423 {
3424         int i, ret;
3425         struct ctdb_node_map *nodemap=NULL;
3426
3427         if (ctdb != NULL) {
3428                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3429                 if (ret != 0) {
3430                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3431                         return ret;
3432                 }
3433
3434                 for(i=0;i<nodemap->num;i++){
3435                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3436                                 continue;
3437                         }
3438                         if (options.machinereadable){
3439                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3440                         } else {
3441                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3442                         }
3443                 }
3444         } else {
3445                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3446                 struct pnn_node *pnn_nodes;
3447                 struct pnn_node *pnn_node;
3448         
3449                 pnn_nodes = read_nodes_file(mem_ctx);
3450                 if (pnn_nodes == NULL) {
3451                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3452                         talloc_free(mem_ctx);
3453                         return -1;
3454                 }
3455
3456                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3457                         ctdb_sock_addr addr;
3458
3459                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3460                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3461                                 talloc_free(mem_ctx);
3462                                 return -1;
3463                         }
3464
3465                         if (options.machinereadable){
3466                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3467                         } else {
3468                                 printf("%s\n", pnn_node->addr);
3469                         }
3470                 }
3471                 talloc_free(mem_ctx);
3472         }
3473
3474         return 0;
3475 }
3476
3477 /*
3478   reload the nodes file on the local node
3479  */
3480 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3481 {
3482         int i, ret;
3483         int mypnn;
3484         struct ctdb_node_map *nodemap=NULL;
3485
3486         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3487         if (mypnn == -1) {
3488                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3489                 return -1;
3490         }
3491
3492         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3493         if (ret != 0) {
3494                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3495                 return ret;
3496         }
3497
3498         /* reload the nodes file on all remote nodes */
3499         for (i=0;i<nodemap->num;i++) {
3500                 if (nodemap->nodes[i].pnn == mypnn) {
3501                         continue;
3502                 }
3503                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3504                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3505                         nodemap->nodes[i].pnn);
3506                 if (ret != 0) {
3507                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3508                 }
3509         }
3510
3511         /* reload the nodes file on the local node */
3512         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3513         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3514         if (ret != 0) {
3515                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3516         }
3517
3518         /* initiate a recovery */
3519         control_recover(ctdb, argc, argv);
3520
3521         return 0;
3522 }
3523
3524
3525 static const struct {
3526         const char *name;
3527         int (*fn)(struct ctdb_context *, int, const char **);
3528         bool auto_all;
3529         bool without_daemon; /* can be run without daemon running ? */
3530         const char *msg;
3531         const char *args;
3532 } ctdb_commands[] = {
3533 #ifdef CTDB_VERS
3534         { "version",         control_version,           true,   false,  "show version of ctdb" },
3535 #endif
3536         { "status",          control_status,            true,   false,  "show node status" },
3537         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
3538         { "ping",            control_ping,              true,   false,  "ping all nodes" },
3539         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
3540         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
3541         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
3542         { "statistics",      control_statistics,        false,  false, "show statistics" },
3543         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
3544         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
3545         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
3546         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
3547         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
3548         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
3549         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
3550         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
3551         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
3552         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
3553         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
3554         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
3555         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
3556         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
3557         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
3558         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
3559         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
3560         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
3561         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
3562         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
3563         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
3564         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
3565         { "stop",            control_stop,              true,   false,  "stop a node" },
3566         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
3567         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
3568         { "unban",           control_unban,             true,   false,  "unban a node" },
3569         { "showban",         control_showban,           true,   false,  "show ban information"},
3570         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
3571         { "recover",         control_recover,           true,   false,  "force recovery" },
3572         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
3573         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
3574         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
3575         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
3576         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
3577         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
3578         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
3579         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
3580
3581         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
3582         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
3583         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
3584         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
3585         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
3586         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
3587         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
3588         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
3589         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
3590         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
3591         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
3592         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
3593         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
3594         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
3595         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
3596         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
3597         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts"},
3598         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
3599         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
3600         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
3601         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
3602         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
3603         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
3604         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
3605         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
3606         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
3607         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
3608         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
3609 };
3610
3611 /*
3612   show usage message
3613  */
3614 static void usage(void)
3615 {
3616         int i;
3617         printf(
3618 "Usage: ctdb [options] <control>\n" \
3619 "Options:\n" \
3620 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
3621 "   -Y                 generate machinereadable output\n"
3622 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
3623         printf("Controls:\n");
3624         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3625                 printf("  %-15s %-27s  %s\n", 
3626                        ctdb_commands[i].name, 
3627                        ctdb_commands[i].args?ctdb_commands[i].args:"",
3628                        ctdb_commands[i].msg);
3629         }
3630         exit(1);
3631 }
3632
3633
3634 static void ctdb_alarm(int sig)
3635 {
3636         printf("Maximum runtime exceeded - exiting\n");
3637         _exit(ERR_TIMEOUT);
3638 }
3639
3640 /*
3641   main program
3642 */
3643 int main(int argc, const char *argv[])
3644 {
3645         struct ctdb_context *ctdb;
3646         char *nodestring = NULL;
3647         struct poptOption popt_options[] = {
3648                 POPT_AUTOHELP
3649                 POPT_CTDB_CMDLINE
3650                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
3651                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
3652                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
3653                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
3654                 POPT_TABLEEND
3655         };
3656         int opt;
3657         const char **extra_argv;
3658         int extra_argc = 0;
3659         int ret=-1, i;
3660         poptContext pc;
3661         struct event_context *ev;
3662         const char *control;
3663
3664         setlinebuf(stdout);
3665         
3666         /* set some defaults */
3667         options.maxruntime = 0;
3668         options.timelimit = 3;
3669         options.pnn = CTDB_CURRENT_NODE;
3670
3671         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
3672
3673         while ((opt = poptGetNextOpt(pc)) != -1) {
3674                 switch (opt) {
3675                 default:
3676                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
3677                                 poptBadOption(pc, 0), poptStrerror(opt)));
3678                         exit(1);
3679                 }
3680         }
3681
3682         /* setup the remaining options for the main program to use */
3683         extra_argv = poptGetArgs(pc);
3684         if (extra_argv) {
3685                 extra_argv++;
3686                 while (extra_argv[extra_argc]) extra_argc++;
3687         }
3688
3689         if (extra_argc < 1) {
3690                 usage();
3691         }
3692
3693         if (options.maxruntime == 0) {
3694                 const char *ctdb_timeout;
3695                 ctdb_timeout = getenv("CTDB_TIMEOUT");
3696                 if (ctdb_timeout != NULL) {
3697                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
3698                 } else {
3699                         /* default timeout is 120 seconds */
3700                         options.maxruntime = 120;
3701                 }
3702         }
3703
3704         signal(SIGALRM, ctdb_alarm);
3705         alarm(options.maxruntime);
3706
3707         /* setup the node number to contact */
3708         if (nodestring != NULL) {
3709                 if (strcmp(nodestring, "all") == 0) {
3710                         options.pnn = CTDB_BROADCAST_ALL;
3711                 } else {
3712                         options.pnn = strtoul(nodestring, NULL, 0);
3713                 }
3714         }
3715
3716         control = extra_argv[0];
3717
3718         ev = event_context_init(NULL);
3719
3720         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3721                 if (strcmp(control, ctdb_commands[i].name) == 0) {
3722                         int j;
3723
3724                         if (ctdb_commands[i].without_daemon == true) {
3725                                 close(2);
3726                         }
3727
3728                         /* initialise ctdb */
3729                         ctdb = ctdb_cmdline_client(ev);
3730
3731                         if (ctdb_commands[i].without_daemon == false) {
3732                                 if (ctdb == NULL) {
3733                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
3734                                         exit(1);
3735                                 }
3736
3737                                 /* verify the node exists */
3738                                 verify_node(ctdb);
3739
3740                                 if (options.pnn == CTDB_CURRENT_NODE) {
3741                                         int pnn;
3742                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
3743                                         if (pnn == -1) {
3744                                                 return -1;
3745                                         }
3746                                         options.pnn = pnn;
3747                                 }
3748                         }
3749
3750                         if (ctdb_commands[i].auto_all && 
3751                             options.pnn == CTDB_BROADCAST_ALL) {
3752                                 uint32_t *nodes;
3753                                 uint32_t num_nodes;
3754                                 ret = 0;
3755
3756                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
3757                                 CTDB_NO_MEMORY(ctdb, nodes);
3758         
3759                                 for (j=0;j<num_nodes;j++) {
3760                                         options.pnn = nodes[j];
3761                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
3762                                 }
3763                                 talloc_free(nodes);
3764                         } else {
3765                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
3766                         }
3767                         break;
3768                 }
3769         }
3770
3771         if (i == ARRAY_SIZE(ctdb_commands)) {
3772                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
3773                 exit(1);
3774         }
3775
3776         return ret;
3777 }