ae6438e85c6dee5a443988f6a6b41b0a66f59884
[rusty/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
517                 }
518                 return 0;
519         }
520
521         printf("Number of nodes:%d\n", nodemap->num);
522         for(i=0;i<nodemap->num;i++){
523                 static const struct {
524                         uint32_t flag;
525                         const char *name;
526                 } flag_names[] = {
527                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
528                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
529                         { NODE_FLAGS_BANNED,                "BANNED" },
530                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
531                         { NODE_FLAGS_DELETED,               "DELETED" },
532                         { NODE_FLAGS_STOPPED,               "STOPPED" },
533                 };
534                 char *flags_str = NULL;
535                 int j;
536
537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
538                         continue;
539                 }
540                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
541                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
542                                 if (flags_str == NULL) {
543                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
544                                 } else {
545                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
546                                                                            flag_names[j].name);
547                                 }
548                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
549                         }
550                 }
551                 if (flags_str == NULL) {
552                         flags_str = talloc_strdup(ctdb, "OK");
553                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
554                 }
555                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
556                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
557                        flags_str,
558                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
559                 talloc_free(flags_str);
560         }
561
562         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
563         if (ret != 0) {
564                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
565                 return ret;
566         }
567         if (vnnmap->generation == INVALID_GENERATION) {
568                 printf("Generation:INVALID\n");
569         } else {
570                 printf("Generation:%d\n",vnnmap->generation);
571         }
572         printf("Size:%d\n",vnnmap->size);
573         for(i=0;i<vnnmap->size;i++){
574                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
575         }
576
577         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
580                 return ret;
581         }
582         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
583
584         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
585         if (ret != 0) {
586                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
587                 return ret;
588         }
589         printf("Recovery master:%d\n",recmaster);
590
591         return 0;
592 }
593
594
595 struct natgw_node {
596         struct natgw_node *next;
597         const char *addr;
598 };
599
600 /*
601   display the list of nodes belonging to this natgw configuration
602  */
603 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
604 {
605         int i, ret;
606         const char *natgw_list;
607         int nlines;
608         char **lines;
609         struct natgw_node *natgw_nodes = NULL;
610         struct natgw_node *natgw_node;
611         struct ctdb_node_map *nodemap=NULL;
612
613
614         /* read the natgw nodes file into a linked list */
615         natgw_list = getenv("NATGW_NODES");
616         if (natgw_list == NULL) {
617                 natgw_list = "/etc/ctdb/natgw_nodes";
618         }
619         lines = file_lines_load(natgw_list, &nlines, ctdb);
620         if (lines == NULL) {
621                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
622                 return -1;
623         }
624         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
625                 nlines--;
626         }
627         for (i=0;i<nlines;i++) {
628                 char *node;
629
630                 node = lines[i];
631                 /* strip leading spaces */
632                 while((*node == ' ') || (*node == '\t')) {
633                         node++;
634                 }
635                 if (*node == '#') {
636                         continue;
637                 }
638                 if (strcmp(node, "") == 0) {
639                         continue;
640                 }
641                 natgw_node = talloc(ctdb, struct natgw_node);
642                 natgw_node->addr = talloc_strdup(natgw_node, node);
643                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
644                 natgw_node->next = natgw_nodes;
645                 natgw_nodes = natgw_node;
646         }
647
648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
651                 return ret;
652         }
653
654         i=0;
655         while(i<nodemap->num) {
656                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
657                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
658                                 break;
659                         }
660                 }
661
662                 /* this node was not in the natgw so we just remove it from
663                  * the list
664                  */
665                 if ((natgw_node == NULL) 
666                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
667                         int j;
668
669                         for (j=i+1; j<nodemap->num; j++) {
670                                 nodemap->nodes[j-1] = nodemap->nodes[j];
671                         }
672                         nodemap->num--;
673                         continue;
674                 }
675
676                 i++;
677         }               
678
679         /* pick a node to be natgwmaster
680          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
681          */
682         for(i=0;i<nodemap->num;i++){
683                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
684                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
685                         break;
686                 }
687         }
688         /* we couldnt find any healthy node, try unhealthy ones */
689         if (i == nodemap->num) {
690                 for(i=0;i<nodemap->num;i++){
691                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
692                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
693                                 break;
694                         }
695                 }
696         }
697         /* unless all nodes are STOPPED, when we pick one anyway */
698         if (i == nodemap->num) {
699                 for(i=0;i<nodemap->num;i++){
700                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
701                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
702                                 break;
703                         }
704                 }
705                 /* or if we still can not find any */
706                 if (i == nodemap->num) {
707                         printf("-1 0.0.0.0\n");
708                 }
709         }
710
711         /* print the pruned list of nodes belonging to this natgw list */
712         for(i=0;i<nodemap->num;i++){
713                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
714                         continue;
715                 }
716                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
717                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
718                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
723         }
724
725         return 0;
726 }
727
728
729 /*
730   display the status of the monitoring scripts
731  */
732 static int control_scriptstatus(struct ctdb_context *ctdb, int argc, const char **argv)
733 {
734         int i, ret;
735         struct ctdb_scripts_wire *script_status;
736
737         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, &script_status);
738         if (ret != 0) {
739                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
740                 return ret;
741         }
742
743         printf("%d scripts were executed last monitoring cycle\n", script_status->num_scripts);
744         for (i=0; i<script_status->num_scripts; i++) {
745                 const char *status = NULL;
746
747                 switch (script_status->scripts[i].status) {
748                 case -ETIME:
749                         status = "TIMEDOUT";
750                         break;
751                 case -ENOEXEC:
752                         status = "DISABLED";
753                         break;
754                 case 0:
755                         status = "OK";
756                         break;
757                 default:
758                         if (script_status->scripts[i].status > 0)
759                                 status = "ERROR";
760                         break;
761                 }
762                 if (status)
763                         printf("%-20s Status:%s    ",
764                                script_status->scripts[i].name, status);
765                 else
766                         /* Some other error, eg from stat. */
767                         printf("%-20s Status:CANNOT RUN (%s)",
768                                script_status->scripts[i].name,
769                                strerror(-script_status->scripts[i].status));
770
771                 if (script_status->scripts[i].status >= 0) {
772                         printf("Duration:%.3lf ",
773                         timeval_delta(&script_status->scripts[i].finished,
774                               &script_status->scripts[i].start));
775                 }
776                 if (script_status->scripts[i].status != -ENOEXEC) {
777                         printf("%s",
778                                ctime(&script_status->scripts[i].start.tv_sec));
779                 }
780                 if (script_status->scripts[i].status != 0) {
781                         printf("   OUTPUT:%s\n",
782                                 script_status->scripts[i].output);
783                 }
784         }
785
786         return 0;
787 }
788         
789
790 /*
791   enable an eventscript
792  */
793 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
794 {
795         int ret;
796
797         if (argc < 1) {
798                 usage();
799         }
800
801         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
802         if (ret != 0) {
803           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
804                 return ret;
805         }
806
807         return 0;
808 }
809
810 /*
811   disable an eventscript
812  */
813 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
814 {
815         int ret;
816
817         if (argc < 1) {
818                 usage();
819         }
820
821         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
822         if (ret != 0) {
823           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
824                 return ret;
825         }
826
827         return 0;
828 }
829
830 /*
831   display the pnn of the recovery master
832  */
833 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
834 {
835         int ret;
836         uint32_t recmaster;
837
838         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
839         if (ret != 0) {
840                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
841                 return ret;
842         }
843         printf("%d\n",recmaster);
844
845         return 0;
846 }
847
848 /*
849   get a list of all tickles for this pnn
850  */
851 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
852 {
853         struct ctdb_control_tcp_tickle_list *list;
854         ctdb_sock_addr addr;
855         int i, ret;
856
857         if (argc < 1) {
858                 usage();
859         }
860
861         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
862                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
863                 return -1;
864         }
865
866         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
867         if (ret == -1) {
868                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
869                 return -1;
870         }
871
872         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
873         printf("Num tickles:%u\n", list->tickles.num);
874         for (i=0;i<list->tickles.num;i++) {
875                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
876                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
877         }
878
879         talloc_free(list);
880         
881         return 0;
882 }
883
884
885
886 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
887 {
888         struct ctdb_all_public_ips *ips;
889         struct ctdb_public_ip ip;
890         int i, ret;
891         uint32_t *nodes;
892         uint32_t disable_time;
893         TDB_DATA data;
894         struct ctdb_node_map *nodemap=NULL;
895         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
896
897         disable_time = 30;
898         data.dptr  = (uint8_t*)&disable_time;
899         data.dsize = sizeof(disable_time);
900         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
901         if (ret != 0) {
902                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
903                 return -1;
904         }
905
906
907
908         /* read the public ip list from the node */
909         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
910         if (ret != 0) {
911                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
912                 talloc_free(tmp_ctx);
913                 return -1;
914         }
915
916         for (i=0;i<ips->num;i++) {
917                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
918                         break;
919                 }
920         }
921         if (i==ips->num) {
922                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
923                         pnn, ctdb_addr_to_str(addr)));
924                 talloc_free(tmp_ctx);
925                 return -1;
926         }
927
928         ip.pnn  = pnn;
929         ip.addr = *addr;
930
931         data.dptr  = (uint8_t *)&ip;
932         data.dsize = sizeof(ip);
933
934         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
935         if (ret != 0) {
936                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
937                 talloc_free(tmp_ctx);
938                 return ret;
939         }
940
941         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
942         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
943                                         nodes, 0,
944                                         TIMELIMIT(),
945                                         false, data,
946                                         NULL, NULL,
947                                         NULL);
948         if (ret != 0) {
949                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
950                 talloc_free(tmp_ctx);
951                 return -1;
952         }
953
954         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
957                 talloc_free(tmp_ctx);
958                 return -1;
959         }
960
961         talloc_free(tmp_ctx);
962         return 0;
963 }
964
965 /*
966   move/failover an ip address to a specific node
967  */
968 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
969 {
970         uint32_t pnn;
971         ctdb_sock_addr addr;
972
973         if (argc < 2) {
974                 usage();
975                 return -1;
976         }
977
978         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
979                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
980                 return -1;
981         }
982
983
984         if (sscanf(argv[1], "%u", &pnn) != 1) {
985                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
986                 return -1;
987         }
988
989         if (move_ip(ctdb, &addr, pnn) != 0) {
990                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
991                 return -1;
992         }
993
994         return 0;
995 }
996
997 void getips_store_callback(void *param, void *data)
998 {
999         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1000         struct ctdb_all_public_ips *ips = param;
1001         int i;
1002
1003         i = ips->num++;
1004         ips->ips[i].pnn  = node_ip->pnn;
1005         ips->ips[i].addr = node_ip->addr;
1006 }
1007
1008 void getips_count_callback(void *param, void *data)
1009 {
1010         uint32_t *count = param;
1011
1012         (*count)++;
1013 }
1014
1015 #define IP_KEYLEN       4
1016 static uint32_t *ip_key(ctdb_sock_addr *ip)
1017 {
1018         static uint32_t key[IP_KEYLEN];
1019
1020         bzero(key, sizeof(key));
1021
1022         switch (ip->sa.sa_family) {
1023         case AF_INET:
1024                 key[0]  = ip->ip.sin_addr.s_addr;
1025                 break;
1026         case AF_INET6:
1027                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1028                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1029                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1030                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1031                 break;
1032         default:
1033                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1034                 return key;
1035         }
1036
1037         return key;
1038 }
1039
1040 static void *add_ip_callback(void *parm, void *data)
1041 {
1042         return parm;
1043 }
1044
1045 static int
1046 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1047 {
1048         struct ctdb_all_public_ips *tmp_ips;
1049         struct ctdb_node_map *nodemap=NULL;
1050         trbt_tree_t *ip_tree;
1051         int i, j, len, ret;
1052         uint32_t count;
1053
1054         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1055         if (ret != 0) {
1056                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1057                 return ret;
1058         }
1059
1060         ip_tree = trbt_create(tmp_ctx, 0);
1061
1062         for(i=0;i<nodemap->num;i++){
1063                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1064                         continue;
1065                 }
1066                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1067                         continue;
1068                 }
1069
1070                 /* read the public ip list from this node */
1071                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1072                 if (ret != 0) {
1073                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1074                         return -1;
1075                 }
1076         
1077                 for (j=0; j<tmp_ips->num;j++) {
1078                         struct ctdb_public_ip *node_ip;
1079
1080                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1081                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1082                         node_ip->addr = tmp_ips->ips[j].addr;
1083
1084                         trbt_insertarray32_callback(ip_tree,
1085                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1086                                 add_ip_callback,
1087                                 node_ip);
1088                 }
1089                 talloc_free(tmp_ips);
1090         }
1091
1092         /* traverse */
1093         count = 0;
1094         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1095
1096         len = offsetof(struct ctdb_all_public_ips, ips) + 
1097                 count*sizeof(struct ctdb_public_ip);
1098         tmp_ips = talloc_zero_size(tmp_ctx, len);
1099         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1100
1101         *ips = tmp_ips;
1102
1103         return 0;
1104 }
1105
1106
1107 /* 
1108  * scans all other nodes and returns a pnn for another node that can host this 
1109  * ip address or -1
1110  */
1111 static int
1112 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1113 {
1114         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1115         struct ctdb_all_public_ips *ips;
1116         struct ctdb_node_map *nodemap=NULL;
1117         int i, j, ret;
1118
1119         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1120         if (ret != 0) {
1121                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1122                 talloc_free(tmp_ctx);
1123                 return ret;
1124         }
1125
1126         for(i=0;i<nodemap->num;i++){
1127                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1128                         continue;
1129                 }
1130                 if (nodemap->nodes[i].pnn == options.pnn) {
1131                         continue;
1132                 }
1133
1134                 /* read the public ip list from this node */
1135                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1136                 if (ret != 0) {
1137                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1138                         return -1;
1139                 }
1140
1141                 for (j=0;j<ips->num;j++) {
1142                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1143                                 talloc_free(tmp_ctx);
1144                                 return nodemap->nodes[i].pnn;
1145                         }
1146                 }
1147                 talloc_free(ips);
1148         }
1149
1150         talloc_free(tmp_ctx);
1151         return -1;
1152 }
1153
1154 /*
1155   add a public ip address to a node
1156  */
1157 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1158 {
1159         int i, ret;
1160         int len;
1161         uint32_t pnn;
1162         unsigned mask;
1163         ctdb_sock_addr addr;
1164         struct ctdb_control_ip_iface *pub;
1165         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1166         struct ctdb_all_public_ips *ips;
1167
1168
1169         if (argc != 2) {
1170                 talloc_free(tmp_ctx);
1171                 usage();
1172         }
1173
1174         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1175                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1176                 talloc_free(tmp_ctx);
1177                 return -1;
1178         }
1179
1180         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1181         if (ret != 0) {
1182                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1183                 talloc_free(tmp_ctx);
1184                 return ret;
1185         }
1186
1187
1188         /* check if some other node is already serving this ip, if not,
1189          * we will claim it
1190          */
1191         for (i=0;i<ips->num;i++) {
1192                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1193                         break;
1194                 }
1195         }
1196
1197         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1198         pub = talloc_size(tmp_ctx, len); 
1199         CTDB_NO_MEMORY(ctdb, pub);
1200
1201         pub->addr  = addr;
1202         pub->mask  = mask;
1203         pub->len   = strlen(argv[1])+1;
1204         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1205
1206         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1207         if (ret != 0) {
1208                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1209                 talloc_free(tmp_ctx);
1210                 return ret;
1211         }
1212
1213         if (i == ips->num) {
1214                 /* no one has this ip so we claim it */
1215                 pnn  = options.pnn;
1216         } else {
1217                 pnn  = ips->ips[i].pnn;
1218         }
1219
1220         if (move_ip(ctdb, &addr, pnn) != 0) {
1221                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1222                 return -1;
1223         }
1224
1225         talloc_free(tmp_ctx);
1226         return 0;
1227 }
1228
1229 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1230
1231 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1232 {
1233         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1234         struct ctdb_node_map *nodemap=NULL;
1235         struct ctdb_all_public_ips *ips;
1236         int ret, i, j;
1237
1238         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1239         if (ret != 0) {
1240                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1241                 return ret;
1242         }
1243
1244         /* remove it from the nodes that are not hosting the ip currently */
1245         for(i=0;i<nodemap->num;i++){
1246                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1247                         continue;
1248                 }
1249                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1250                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1251                         continue;
1252                 }
1253
1254                 for (j=0;j<ips->num;j++) {
1255                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1256                                 break;
1257                         }
1258                 }
1259                 if (j==ips->num) {
1260                         continue;
1261                 }
1262
1263                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1264                         continue;
1265                 }
1266
1267                 options.pnn = nodemap->nodes[i].pnn;
1268                 control_delip(ctdb, argc, argv);
1269         }
1270
1271
1272         /* remove it from every node (also the one hosting it) */
1273         for(i=0;i<nodemap->num;i++){
1274                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1275                         continue;
1276                 }
1277                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1278                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1279                         continue;
1280                 }
1281
1282                 for (j=0;j<ips->num;j++) {
1283                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1284                                 break;
1285                         }
1286                 }
1287                 if (j==ips->num) {
1288                         continue;
1289                 }
1290
1291                 options.pnn = nodemap->nodes[i].pnn;
1292                 control_delip(ctdb, argc, argv);
1293         }
1294
1295         talloc_free(tmp_ctx);
1296         return 0;
1297 }
1298         
1299 /*
1300   delete a public ip address from a node
1301  */
1302 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1303 {
1304         int i, ret;
1305         ctdb_sock_addr addr;
1306         struct ctdb_control_ip_iface pub;
1307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1308         struct ctdb_all_public_ips *ips;
1309
1310         if (argc != 1) {
1311                 talloc_free(tmp_ctx);
1312                 usage();
1313         }
1314
1315         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1316                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1317                 return -1;
1318         }
1319
1320         if (options.pnn == CTDB_BROADCAST_ALL) {
1321                 return control_delip_all(ctdb, argc, argv, &addr);
1322         }
1323
1324         pub.addr  = addr;
1325         pub.mask  = 0;
1326         pub.len   = 0;
1327
1328         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1329         if (ret != 0) {
1330                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1331                 talloc_free(tmp_ctx);
1332                 return ret;
1333         }
1334         
1335         for (i=0;i<ips->num;i++) {
1336                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1337                         break;
1338                 }
1339         }
1340
1341         if (i==ips->num) {
1342                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1343                         ctdb_addr_to_str(&addr)));
1344                 talloc_free(tmp_ctx);
1345                 return -1;
1346         }
1347
1348         if (ips->ips[i].pnn == options.pnn) {
1349                 ret = find_other_host_for_public_ip(ctdb, &addr);
1350                 if (ret != -1) {
1351                         if (move_ip(ctdb, &addr, ret) != 0) {
1352                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1353                                 return -1;
1354                         }
1355                 }
1356         }
1357
1358         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1359         if (ret != 0) {
1360                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1361                 talloc_free(tmp_ctx);
1362                 return ret;
1363         }
1364
1365         talloc_free(tmp_ctx);
1366         return 0;
1367 }
1368
1369 /*
1370   kill a tcp connection
1371  */
1372 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1373 {
1374         int ret;
1375         struct ctdb_control_killtcp killtcp;
1376
1377         if (argc < 2) {
1378                 usage();
1379         }
1380
1381         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1382                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1383                 return -1;
1384         }
1385
1386         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1387                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1388                 return -1;
1389         }
1390
1391         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1392         if (ret != 0) {
1393                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1394                 return ret;
1395         }
1396
1397         return 0;
1398 }
1399
1400
1401 /*
1402   send a gratious arp
1403  */
1404 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1405 {
1406         int ret;
1407         ctdb_sock_addr addr;
1408
1409         if (argc < 2) {
1410                 usage();
1411         }
1412
1413         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1414                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1415                 return -1;
1416         }
1417
1418         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1419         if (ret != 0) {
1420                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1421                 return ret;
1422         }
1423
1424         return 0;
1425 }
1426
1427 /*
1428   register a server id
1429  */
1430 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1431 {
1432         int ret;
1433         struct ctdb_server_id server_id;
1434
1435         if (argc < 3) {
1436                 usage();
1437         }
1438
1439         server_id.pnn       = strtoul(argv[0], NULL, 0);
1440         server_id.type      = strtoul(argv[1], NULL, 0);
1441         server_id.server_id = strtoul(argv[2], NULL, 0);
1442
1443         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1446                 return ret;
1447         }
1448         return -1;
1449 }
1450
1451 /*
1452   unregister a server id
1453  */
1454 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1455 {
1456         int ret;
1457         struct ctdb_server_id server_id;
1458
1459         if (argc < 3) {
1460                 usage();
1461         }
1462
1463         server_id.pnn       = strtoul(argv[0], NULL, 0);
1464         server_id.type      = strtoul(argv[1], NULL, 0);
1465         server_id.server_id = strtoul(argv[2], NULL, 0);
1466
1467         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1468         if (ret != 0) {
1469                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1470                 return ret;
1471         }
1472         return -1;
1473 }
1474
1475 /*
1476   check if a server id exists
1477  */
1478 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1479 {
1480         uint32_t status;
1481         int ret;
1482         struct ctdb_server_id server_id;
1483
1484         if (argc < 3) {
1485                 usage();
1486         }
1487
1488         server_id.pnn       = strtoul(argv[0], NULL, 0);
1489         server_id.type      = strtoul(argv[1], NULL, 0);
1490         server_id.server_id = strtoul(argv[2], NULL, 0);
1491
1492         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1493         if (ret != 0) {
1494                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1495                 return ret;
1496         }
1497
1498         if (status) {
1499                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1500         } else {
1501                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1502         }
1503         return 0;
1504 }
1505
1506 /*
1507   get a list of all server ids that are registered on a node
1508  */
1509 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1510 {
1511         int i, ret;
1512         struct ctdb_server_id_list *server_ids;
1513
1514         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1515         if (ret != 0) {
1516                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1517                 return ret;
1518         }
1519
1520         for (i=0; i<server_ids->num; i++) {
1521                 printf("Server id %d:%d:%d\n", 
1522                         server_ids->server_ids[i].pnn, 
1523                         server_ids->server_ids[i].type, 
1524                         server_ids->server_ids[i].server_id); 
1525         }
1526
1527         return -1;
1528 }
1529
1530 /*
1531   send a tcp tickle ack
1532  */
1533 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1534 {
1535         int ret;
1536         ctdb_sock_addr  src, dst;
1537
1538         if (argc < 2) {
1539                 usage();
1540         }
1541
1542         if (!parse_ip_port(argv[0], &src)) {
1543                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1544                 return -1;
1545         }
1546
1547         if (!parse_ip_port(argv[1], &dst)) {
1548                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1549                 return -1;
1550         }
1551
1552         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1553         if (ret==0) {
1554                 return 0;
1555         }
1556         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1557
1558         return -1;
1559 }
1560
1561
1562 /*
1563   display public ip status
1564  */
1565 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1566 {
1567         int i, ret;
1568         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1569         struct ctdb_all_public_ips *ips;
1570
1571         if (options.pnn == CTDB_BROADCAST_ALL) {
1572                 /* read the list of public ips from all nodes */
1573                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1574         } else {
1575                 /* read the public ip list from this node */
1576                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1577         }
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1580                 talloc_free(tmp_ctx);
1581                 return ret;
1582         }
1583
1584         if (options.machinereadable){
1585                 printf(":Public IP:Node:\n");
1586         } else {
1587                 if (options.pnn == CTDB_BROADCAST_ALL) {
1588                         printf("Public IPs on ALL nodes\n");
1589                 } else {
1590                         printf("Public IPs on node %u\n", options.pnn);
1591                 }
1592         }
1593
1594         for (i=1;i<=ips->num;i++) {
1595                 if (options.machinereadable){
1596                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1597                 } else {
1598                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1599                 }
1600         }
1601
1602         talloc_free(tmp_ctx);
1603         return 0;
1604 }
1605
1606 /*
1607   display pid of a ctdb daemon
1608  */
1609 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1610 {
1611         uint32_t pid;
1612         int ret;
1613
1614         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1615         if (ret != 0) {
1616                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1617                 return ret;
1618         }
1619         printf("Pid:%d\n", pid);
1620
1621         return 0;
1622 }
1623
1624 /*
1625   handler for receiving the response to ipreallocate
1626 */
1627 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1628                              TDB_DATA data, void *private_data)
1629 {
1630         exit(0);
1631 }
1632
1633 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1634 {
1635         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1636
1637         event_add_timed(ctdb->ev, ctdb, 
1638                                 timeval_current_ofs(1, 0),
1639                                 ctdb_every_second, ctdb);
1640 }
1641
1642 /*
1643   ask the recovery daemon on the recovery master to perform a ip reallocation
1644  */
1645 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1646 {
1647         int i, ret;
1648         TDB_DATA data;
1649         struct takeover_run_reply rd;
1650         uint32_t recmaster;
1651         struct ctdb_node_map *nodemap=NULL;
1652         int retries=0;
1653         struct timeval tv = timeval_current();
1654
1655         /* we need some events to trigger so we can timeout and restart
1656            the loop
1657         */
1658         event_add_timed(ctdb->ev, ctdb, 
1659                                 timeval_current_ofs(1, 0),
1660                                 ctdb_every_second, ctdb);
1661
1662         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1663         if (rd.pnn == -1) {
1664                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1665                 return -1;
1666         }
1667         rd.srvid = getpid();
1668
1669         /* register a message port for receiveing the reply so that we
1670            can receive the reply
1671         */
1672         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1673
1674         data.dptr = (uint8_t *)&rd;
1675         data.dsize = sizeof(rd);
1676
1677 again:
1678         if (retries>5) {
1679                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1680                 exit(10);
1681         }
1682
1683         /* check that there are valid nodes available */
1684         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1685                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1686                 exit(10);
1687         }
1688         for (i=0; i<nodemap->num;i++) {
1689                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1690                         break;
1691                 }
1692         }
1693         if (i==nodemap->num) {
1694                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1695                 return 0;
1696         }
1697
1698
1699         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1700         if (ret != 0) {
1701                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1702                 return ret;
1703         }
1704
1705         /* verify the node exists */
1706         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1707                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1708                 exit(10);
1709         }
1710
1711
1712         /* check tha there are nodes available that can act as a recmaster */
1713         for (i=0; i<nodemap->num; i++) {
1714                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1715                         continue;
1716                 }
1717                 break;
1718         }
1719         if (i == nodemap->num) {
1720                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1721                 return 0;
1722         }
1723
1724         /* verify the recovery master is not STOPPED, nor BANNED */
1725         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1726                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1727                 retries++;
1728                 sleep(1);
1729                 goto again;
1730         } 
1731
1732         
1733         /* verify the recovery master is not STOPPED, nor BANNED */
1734         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1735                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1736                 retries++;
1737                 sleep(1);
1738                 goto again;
1739         } 
1740
1741         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1742         if (ret != 0) {
1743                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1744                 return -1;
1745         }
1746
1747         tv = timeval_current();
1748         /* this loop will terminate when we have received the reply */
1749         while (timeval_elapsed(&tv) < 3.0) {    
1750                 event_loop_once(ctdb->ev);
1751         }
1752
1753         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1754         retries++;
1755         sleep(1);
1756         goto again;
1757
1758         return 0;
1759 }
1760
1761
1762 /*
1763   disable a remote node
1764  */
1765 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1766 {
1767         int ret;
1768         struct ctdb_node_map *nodemap=NULL;
1769
1770         do {
1771                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1772                 if (ret != 0) {
1773                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1774                         return ret;
1775                 }
1776
1777                 sleep(1);
1778
1779                 /* read the nodemap and verify the change took effect */
1780                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1781                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1782                         exit(10);
1783                 }
1784
1785         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1786         ret = control_ipreallocate(ctdb, argc, argv);
1787         if (ret != 0) {
1788                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1789                 return ret;
1790         }
1791
1792         return 0;
1793 }
1794
1795 /*
1796   enable a disabled remote node
1797  */
1798 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1799 {
1800         int ret;
1801
1802         struct ctdb_node_map *nodemap=NULL;
1803
1804         do {
1805                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1806                 if (ret != 0) {
1807                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1808                         return ret;
1809                 }
1810
1811                 sleep(1);
1812
1813                 /* read the nodemap and verify the change took effect */
1814                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1815                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1816                         exit(10);
1817                 }
1818
1819         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1820         ret = control_ipreallocate(ctdb, argc, argv);
1821         if (ret != 0) {
1822                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1823                 return ret;
1824         }
1825
1826         return 0;
1827 }
1828
1829 /*
1830   stop a remote node
1831  */
1832 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1833 {
1834         int ret;
1835         struct ctdb_node_map *nodemap=NULL;
1836
1837         do {
1838                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1839                 if (ret != 0) {
1840                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1841                 }
1842         
1843                 sleep(1);
1844
1845                 /* read the nodemap and verify the change took effect */
1846                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1847                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1848                         exit(10);
1849                 }
1850
1851         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1852         ret = control_ipreallocate(ctdb, argc, argv);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1855                 return ret;
1856         }
1857
1858         return 0;
1859 }
1860
1861 /*
1862   restart a stopped remote node
1863  */
1864 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1865 {
1866         int ret;
1867
1868         struct ctdb_node_map *nodemap=NULL;
1869
1870         do {
1871                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1872                 if (ret != 0) {
1873                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1874                         return ret;
1875                 }
1876         
1877                 sleep(1);
1878
1879                 /* read the nodemap and verify the change took effect */
1880                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1881                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1882                         exit(10);
1883                 }
1884
1885         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1886         ret = control_ipreallocate(ctdb, argc, argv);
1887         if (ret != 0) {
1888                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1889                 return ret;
1890         }
1891
1892         return 0;
1893 }
1894
1895 static uint32_t get_generation(struct ctdb_context *ctdb)
1896 {
1897         struct ctdb_vnn_map *vnnmap=NULL;
1898         int ret;
1899
1900         /* wait until the recmaster is not in recovery mode */
1901         while (1) {
1902                 uint32_t recmode, recmaster;
1903                 
1904                 if (vnnmap != NULL) {
1905                         talloc_free(vnnmap);
1906                         vnnmap = NULL;
1907                 }
1908
1909                 /* get the recmaster */
1910                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1911                 if (ret != 0) {
1912                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1913                         exit(10);
1914                 }
1915
1916                 /* get recovery mode */
1917                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1918                 if (ret != 0) {
1919                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1920                         exit(10);
1921                 }
1922
1923                 /* get the current generation number */
1924                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
1925                 if (ret != 0) {
1926                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
1927                         exit(10);
1928                 }
1929
1930                 if ((recmode == CTDB_RECOVERY_NORMAL)
1931                 &&  (vnnmap->generation != 1)){
1932                         return vnnmap->generation;
1933                 }
1934                 sleep(1);
1935         }
1936 }
1937
1938 /*
1939   ban a node from the cluster
1940  */
1941 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
1942 {
1943         int ret;
1944         struct ctdb_node_map *nodemap=NULL;
1945         struct ctdb_ban_time bantime;
1946
1947         if (argc < 1) {
1948                 usage();
1949         }
1950         
1951         /* verify the node exists */
1952         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1953         if (ret != 0) {
1954                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1955                 return ret;
1956         }
1957
1958         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
1959                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
1960                 return -1;
1961         }
1962
1963         bantime.pnn  = options.pnn;
1964         bantime.time = strtoul(argv[0], NULL, 0);
1965
1966         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
1967         if (ret != 0) {
1968                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
1969                 return -1;
1970         }       
1971
1972         ret = control_ipreallocate(ctdb, argc, argv);
1973         if (ret != 0) {
1974                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1975                 return ret;
1976         }
1977
1978         return 0;
1979 }
1980
1981
1982 /*
1983   unban a node from the cluster
1984  */
1985 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
1986 {
1987         int ret;
1988         struct ctdb_node_map *nodemap=NULL;
1989         struct ctdb_ban_time bantime;
1990
1991         /* verify the node exists */
1992         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1993         if (ret != 0) {
1994                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1995                 return ret;
1996         }
1997
1998         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
1999                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2000                 return -1;
2001         }
2002
2003         bantime.pnn  = options.pnn;
2004         bantime.time = 0;
2005
2006         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2007         if (ret != 0) {
2008                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2009                 return -1;
2010         }       
2011
2012         ret = control_ipreallocate(ctdb, argc, argv);
2013         if (ret != 0) {
2014                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2015                 return ret;
2016         }
2017
2018         return 0;
2019 }
2020
2021
2022 /*
2023   show ban information for a node
2024  */
2025 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2026 {
2027         int ret;
2028         struct ctdb_node_map *nodemap=NULL;
2029         struct ctdb_ban_time *bantime;
2030
2031         /* verify the node exists */
2032         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2033         if (ret != 0) {
2034                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2035                 return ret;
2036         }
2037
2038         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2041                 return -1;
2042         }       
2043
2044         if (bantime->time == 0) {
2045                 printf("Node %u is not banned\n", bantime->pnn);
2046         } else {
2047                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2048         }
2049
2050         return 0;
2051 }
2052
2053 /*
2054   shutdown a daemon
2055  */
2056 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2057 {
2058         int ret;
2059
2060         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2061         if (ret != 0) {
2062                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2063                 return ret;
2064         }
2065
2066         return 0;
2067 }
2068
2069 /*
2070   trigger a recovery
2071  */
2072 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2073 {
2074         int ret;
2075         uint32_t generation, next_generation;
2076
2077         /* record the current generation number */
2078         generation = get_generation(ctdb);
2079
2080         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2083                 return ret;
2084         }
2085
2086         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2087         if (ret != 0) {
2088                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2089                 return ret;
2090         }
2091
2092         /* wait until we are in a new generation */
2093         while (1) {
2094                 next_generation = get_generation(ctdb);
2095                 if (next_generation != generation) {
2096                         return 0;
2097                 }
2098                 sleep(1);
2099         }
2100
2101         return 0;
2102 }
2103
2104
2105 /*
2106   display monitoring mode of a remote node
2107  */
2108 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2109 {
2110         uint32_t monmode;
2111         int ret;
2112
2113         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2114         if (ret != 0) {
2115                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2116                 return ret;
2117         }
2118         if (!options.machinereadable){
2119                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2120         } else {
2121                 printf(":mode:\n");
2122                 printf(":%d:\n",monmode);
2123         }
2124         return 0;
2125 }
2126
2127
2128 /*
2129   display capabilities of a remote node
2130  */
2131 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2132 {
2133         uint32_t capabilities;
2134         int ret;
2135
2136         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2139                 return ret;
2140         }
2141         
2142         if (!options.machinereadable){
2143                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2144                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2145                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2146                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2147         } else {
2148                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2149                 printf(":%d:%d:%d:%d:\n",
2150                         !!(capabilities&CTDB_CAP_RECMASTER),
2151                         !!(capabilities&CTDB_CAP_LMASTER),
2152                         !!(capabilities&CTDB_CAP_LVS),
2153                         !!(capabilities&CTDB_CAP_NATGW));
2154         }
2155         return 0;
2156 }
2157
2158 /*
2159   display lvs configuration
2160  */
2161 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2162 {
2163         uint32_t *capabilities;
2164         struct ctdb_node_map *nodemap=NULL;
2165         int i, ret;
2166         int healthy_count = 0;
2167
2168         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2169         if (ret != 0) {
2170                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2171                 return ret;
2172         }
2173
2174         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2175         CTDB_NO_MEMORY(ctdb, capabilities);
2176         
2177         /* collect capabilities for all connected nodes */
2178         for (i=0; i<nodemap->num; i++) {
2179                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2180                         continue;
2181                 }
2182                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2183                         continue;
2184                 }
2185         
2186                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2187                 if (ret != 0) {
2188                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2189                         return ret;
2190                 }
2191
2192                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2193                         continue;
2194                 }
2195
2196                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2197                         healthy_count++;
2198                 }
2199         }
2200
2201         /* Print all LVS nodes */
2202         for (i=0; i<nodemap->num; i++) {
2203                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2204                         continue;
2205                 }
2206                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2207                         continue;
2208                 }
2209                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2210                         continue;
2211                 }
2212
2213                 if (healthy_count != 0) {
2214                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2215                                 continue;
2216                         }
2217                 }
2218
2219                 printf("%d:%s\n", i, 
2220                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2221         }
2222
2223         return 0;
2224 }
2225
2226 /*
2227   display who is the lvs master
2228  */
2229 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2230 {
2231         uint32_t *capabilities;
2232         struct ctdb_node_map *nodemap=NULL;
2233         int i, ret;
2234         int healthy_count = 0;
2235
2236         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2237         if (ret != 0) {
2238                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2239                 return ret;
2240         }
2241
2242         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2243         CTDB_NO_MEMORY(ctdb, capabilities);
2244         
2245         /* collect capabilities for all connected nodes */
2246         for (i=0; i<nodemap->num; i++) {
2247                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2248                         continue;
2249                 }
2250                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2251                         continue;
2252                 }
2253         
2254                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2255                 if (ret != 0) {
2256                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2257                         return ret;
2258                 }
2259
2260                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2261                         continue;
2262                 }
2263
2264                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2265                         healthy_count++;
2266                 }
2267         }
2268
2269         /* find and show the lvsmaster */
2270         for (i=0; i<nodemap->num; i++) {
2271                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2272                         continue;
2273                 }
2274                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2275                         continue;
2276                 }
2277                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2278                         continue;
2279                 }
2280
2281                 if (healthy_count != 0) {
2282                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2283                                 continue;
2284                         }
2285                 }
2286
2287                 if (options.machinereadable){
2288                         printf("%d\n", i);
2289                 } else {
2290                         printf("Node %d is LVS master\n", i);
2291                 }
2292                 return 0;
2293         }
2294
2295         printf("There is no LVS master\n");
2296         return -1;
2297 }
2298
2299 /*
2300   disable monitoring on a  node
2301  */
2302 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2303 {
2304         
2305         int ret;
2306
2307         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2308         if (ret != 0) {
2309                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2310                 return ret;
2311         }
2312         printf("Monitoring mode:%s\n","DISABLED");
2313
2314         return 0;
2315 }
2316
2317 /*
2318   enable monitoring on a  node
2319  */
2320 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2321 {
2322         
2323         int ret;
2324
2325         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2326         if (ret != 0) {
2327                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2328                 return ret;
2329         }
2330         printf("Monitoring mode:%s\n","ACTIVE");
2331
2332         return 0;
2333 }
2334
2335 /*
2336   display remote list of keys/data for a db
2337  */
2338 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2339 {
2340         const char *db_name;
2341         struct ctdb_db_context *ctdb_db;
2342         int ret;
2343
2344         if (argc < 1) {
2345                 usage();
2346         }
2347
2348         db_name = argv[0];
2349
2350
2351         if (db_exists(ctdb, db_name)) {
2352                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2353                 return -1;
2354         }
2355
2356         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2357
2358         if (ctdb_db == NULL) {
2359                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2360                 return -1;
2361         }
2362
2363         /* traverse and dump the cluster tdb */
2364         ret = ctdb_dump_db(ctdb_db, stdout);
2365         if (ret == -1) {
2366                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2367                 return -1;
2368         }
2369         talloc_free(ctdb_db);
2370
2371         printf("Dumped %d records\n", ret);
2372         return 0;
2373 }
2374
2375
2376 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2377                              TDB_DATA data, void *private_data)
2378 {
2379         DEBUG(DEBUG_ERR,("Log data received\n"));
2380         if (data.dsize > 0) {
2381                 printf("%s", data.dptr);
2382         }
2383
2384         exit(0);
2385 }
2386
2387 /*
2388   display a list of log messages from the in memory ringbuffer
2389  */
2390 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2391 {
2392         int ret;
2393         int32_t res;
2394         struct ctdb_get_log_addr log_addr;
2395         TDB_DATA data;
2396         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2397         char *errmsg;
2398         struct timeval tv;
2399
2400         if (argc != 1) {
2401                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2402                 talloc_free(tmp_ctx);
2403                 return -1;
2404         }
2405
2406         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2407         log_addr.srvid = getpid();
2408         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2409                 log_addr.level = get_debug_by_desc(argv[0]);
2410         } else {
2411                 log_addr.level = strtol(argv[0], NULL, 0);
2412         }
2413
2414
2415         data.dptr = (unsigned char *)&log_addr;
2416         data.dsize = sizeof(log_addr);
2417
2418         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2419
2420         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2421         sleep(1);
2422
2423         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2424
2425         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2426                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2427         if (ret != 0 || res != 0) {
2428                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2429                 talloc_free(tmp_ctx);
2430                 return -1;
2431         }
2432
2433
2434         tv = timeval_current();
2435         /* this loop will terminate when we have received the reply */
2436         while (timeval_elapsed(&tv) < 3.0) {    
2437                 event_loop_once(ctdb->ev);
2438         }
2439
2440         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2441
2442         talloc_free(tmp_ctx);
2443         return 0;
2444 }
2445
2446 /*
2447   clear the in memory log area
2448  */
2449 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2450 {
2451         int ret;
2452         int32_t res;
2453         char *errmsg;
2454         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2455
2456         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2457                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2458         if (ret != 0 || res != 0) {
2459                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2460                 talloc_free(tmp_ctx);
2461                 return -1;
2462         }
2463
2464         talloc_free(tmp_ctx);
2465         return 0;
2466 }
2467
2468
2469
2470 /*
2471   display a list of the databases on a remote ctdb
2472  */
2473 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2474 {
2475         int i, ret;
2476         struct ctdb_dbid_map *dbmap=NULL;
2477
2478         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2479         if (ret != 0) {
2480                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2481                 return ret;
2482         }
2483
2484         printf("Number of databases:%d\n", dbmap->num);
2485         for(i=0;i<dbmap->num;i++){
2486                 const char *path;
2487                 const char *name;
2488                 bool persistent;
2489
2490                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2491                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2492                 persistent = dbmap->dbs[i].persistent;
2493                 printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name, 
2494                        path, persistent?"PERSISTENT":"");
2495         }
2496
2497         return 0;
2498 }
2499
2500 /*
2501   check if the local node is recmaster or not
2502   it will return 1 if this node is the recmaster and 0 if it is not
2503   or if the local ctdb daemon could not be contacted
2504  */
2505 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2506 {
2507         uint32_t mypnn, recmaster;
2508         int ret;
2509
2510         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2511         if (mypnn == -1) {
2512                 printf("Failed to get pnn of node\n");
2513                 return 1;
2514         }
2515
2516         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2517         if (ret != 0) {
2518                 printf("Failed to get the recmaster\n");
2519                 return 1;
2520         }
2521
2522         if (recmaster != mypnn) {
2523                 printf("this node is not the recmaster\n");
2524                 return 1;
2525         }
2526
2527         printf("this node is the recmaster\n");
2528         return 0;
2529 }
2530
2531 /*
2532   ping a node
2533  */
2534 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2535 {
2536         int ret;
2537         struct timeval tv = timeval_current();
2538         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2539         if (ret == -1) {
2540                 printf("Unable to get ping response from node %u\n", options.pnn);
2541                 return -1;
2542         } else {
2543                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2544                        options.pnn, timeval_elapsed(&tv), ret);
2545         }
2546         return 0;
2547 }
2548
2549
2550 /*
2551   get a tunable
2552  */
2553 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2554 {
2555         const char *name;
2556         uint32_t value;
2557         int ret;
2558
2559         if (argc < 1) {
2560                 usage();
2561         }
2562
2563         name = argv[0];
2564         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2565         if (ret == -1) {
2566                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2567                 return -1;
2568         }
2569
2570         printf("%-19s = %u\n", name, value);
2571         return 0;
2572 }
2573
2574 /*
2575   set a tunable
2576  */
2577 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2578 {
2579         const char *name;
2580         uint32_t value;
2581         int ret;
2582
2583         if (argc < 2) {
2584                 usage();
2585         }
2586
2587         name = argv[0];
2588         value = strtoul(argv[1], NULL, 0);
2589
2590         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2591         if (ret == -1) {
2592                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2593                 return -1;
2594         }
2595         return 0;
2596 }
2597
2598 /*
2599   list all tunables
2600  */
2601 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2602 {
2603         uint32_t count;
2604         const char **list;
2605         int ret, i;
2606
2607         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2608         if (ret == -1) {
2609                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2610                 return -1;
2611         }
2612
2613         for (i=0;i<count;i++) {
2614                 control_getvar(ctdb, 1, &list[i]);
2615         }
2616
2617         talloc_free(list);
2618         
2619         return 0;
2620 }
2621
2622 /*
2623   display debug level on a node
2624  */
2625 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2626 {
2627         int ret;
2628         int32_t level;
2629
2630         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2631         if (ret != 0) {
2632                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2633                 return ret;
2634         } else {
2635                 if (options.machinereadable){
2636                         printf(":Name:Level:\n");
2637                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2638                 } else {
2639                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2640                 }
2641         }
2642         return 0;
2643 }
2644
2645 /*
2646   display reclock file of a node
2647  */
2648 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2649 {
2650         int ret;
2651         const char *reclock;
2652
2653         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2654         if (ret != 0) {
2655                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2656                 return ret;
2657         } else {
2658                 if (options.machinereadable){
2659                         if (reclock != NULL) {
2660                                 printf("%s", reclock);
2661                         }
2662                 } else {
2663                         if (reclock == NULL) {
2664                                 printf("No reclock file used.\n");
2665                         } else {
2666                                 printf("Reclock file:%s\n", reclock);
2667                         }
2668                 }
2669         }
2670         return 0;
2671 }
2672
2673 /*
2674   set the reclock file of a node
2675  */
2676 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2677 {
2678         int ret;
2679         const char *reclock;
2680
2681         if (argc == 0) {
2682                 reclock = NULL;
2683         } else if (argc == 1) {
2684                 reclock = argv[0];
2685         } else {
2686                 usage();
2687         }
2688
2689         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2690         if (ret != 0) {
2691                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2692                 return ret;
2693         }
2694         return 0;
2695 }
2696
2697 /*
2698   set the natgw state on/off
2699  */
2700 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2701 {
2702         int ret;
2703         uint32_t natgwstate;
2704
2705         if (argc == 0) {
2706                 usage();
2707         }
2708
2709         if (!strcmp(argv[0], "on")) {
2710                 natgwstate = 1;
2711         } else if (!strcmp(argv[0], "off")) {
2712                 natgwstate = 0;
2713         } else {
2714                 usage();
2715         }
2716
2717         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2718         if (ret != 0) {
2719                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2720                 return ret;
2721         }
2722
2723         return 0;
2724 }
2725
2726 /*
2727   set the lmaster role on/off
2728  */
2729 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2730 {
2731         int ret;
2732         uint32_t lmasterrole;
2733
2734         if (argc == 0) {
2735                 usage();
2736         }
2737
2738         if (!strcmp(argv[0], "on")) {
2739                 lmasterrole = 1;
2740         } else if (!strcmp(argv[0], "off")) {
2741                 lmasterrole = 0;
2742         } else {
2743                 usage();
2744         }
2745
2746         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2747         if (ret != 0) {
2748                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2749                 return ret;
2750         }
2751
2752         return 0;
2753 }
2754
2755 /*
2756   set the recmaster role on/off
2757  */
2758 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2759 {
2760         int ret;
2761         uint32_t recmasterrole;
2762
2763         if (argc == 0) {
2764                 usage();
2765         }
2766
2767         if (!strcmp(argv[0], "on")) {
2768                 recmasterrole = 1;
2769         } else if (!strcmp(argv[0], "off")) {
2770                 recmasterrole = 0;
2771         } else {
2772                 usage();
2773         }
2774
2775         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2776         if (ret != 0) {
2777                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2778                 return ret;
2779         }
2780
2781         return 0;
2782 }
2783
2784 /*
2785   set debug level on a node or all nodes
2786  */
2787 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2788 {
2789         int i, ret;
2790         int32_t level;
2791
2792         if (argc == 0) {
2793                 printf("You must specify the debug level. Valid levels are:\n");
2794                 for (i=0; debug_levels[i].description != NULL; i++) {
2795                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2796                 }
2797
2798                 return 0;
2799         }
2800
2801         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2802                 level = get_debug_by_desc(argv[0]);
2803         } else {
2804                 level = strtol(argv[0], NULL, 0);
2805         }
2806
2807         for (i=0; debug_levels[i].description != NULL; i++) {
2808                 if (level == debug_levels[i].level) {
2809                         break;
2810                 }
2811         }
2812         if (debug_levels[i].description == NULL) {
2813                 printf("Invalid debug level, must be one of\n");
2814                 for (i=0; debug_levels[i].description != NULL; i++) {
2815                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2816                 }
2817                 return -1;
2818         }
2819
2820         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2821         if (ret != 0) {
2822                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2823         }
2824         return 0;
2825 }
2826
2827
2828 /*
2829   freeze a node
2830  */
2831 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2832 {
2833         int ret;
2834         uint32_t priority;
2835         
2836         if (argc == 1) {
2837                 priority = strtol(argv[0], NULL, 0);
2838         } else {
2839                 priority = 0;
2840         }
2841         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2842
2843         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2844         if (ret != 0) {
2845                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2846         }               
2847         return 0;
2848 }
2849
2850 /*
2851   thaw a node
2852  */
2853 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
2854 {
2855         int ret;
2856         uint32_t priority;
2857         
2858         if (argc == 1) {
2859                 priority = strtol(argv[0], NULL, 0);
2860         } else {
2861                 priority = 0;
2862         }
2863         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
2864
2865         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2866         if (ret != 0) {
2867                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
2868         }               
2869         return 0;
2870 }
2871
2872
2873 /*
2874   attach to a database
2875  */
2876 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
2877 {
2878         const char *db_name;
2879         struct ctdb_db_context *ctdb_db;
2880
2881         if (argc < 1) {
2882                 usage();
2883         }
2884         db_name = argv[0];
2885
2886         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2887         if (ctdb_db == NULL) {
2888                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2889                 return -1;
2890         }
2891
2892         return 0;
2893 }
2894
2895 /*
2896   set db priority
2897  */
2898 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2899 {
2900         struct ctdb_db_priority db_prio;
2901         int ret;
2902
2903         if (argc < 2) {
2904                 usage();
2905         }
2906
2907         db_prio.db_id    = strtoul(argv[0], NULL, 0);
2908         db_prio.priority = strtoul(argv[1], NULL, 0);
2909
2910         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
2913                 return -1;
2914         }
2915
2916         return 0;
2917 }
2918
2919 /*
2920   get db priority
2921  */
2922 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2923 {
2924         uint32_t db_id, priority;
2925         int ret;
2926
2927         if (argc < 1) {
2928                 usage();
2929         }
2930
2931         db_id = strtoul(argv[0], NULL, 0);
2932
2933         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
2934         if (ret != 0) {
2935                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
2936                 return -1;
2937         }
2938
2939         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
2940
2941         return 0;
2942 }
2943
2944 /*
2945   run an eventscript on a node
2946  */
2947 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
2948 {
2949         TDB_DATA data;
2950         int ret;
2951         int32_t res;
2952         char *errmsg;
2953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2954
2955         if (argc != 1) {
2956                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2957                 return -1;
2958         }
2959
2960         data.dptr = (unsigned char *)discard_const(argv[0]);
2961         data.dsize = strlen((char *)data.dptr) + 1;
2962
2963         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
2964
2965         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
2966                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2967         if (ret != 0 || res != 0) {
2968                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
2969                 talloc_free(tmp_ctx);
2970                 return -1;
2971         }
2972         talloc_free(tmp_ctx);
2973         return 0;
2974 }
2975
2976 #define DB_VERSION 1
2977 #define MAX_DB_NAME 64
2978 struct db_file_header {
2979         unsigned long version;
2980         time_t timestamp;
2981         unsigned long persistent;
2982         unsigned long size;
2983         const char name[MAX_DB_NAME];
2984 };
2985
2986 struct backup_data {
2987         struct ctdb_marshall_buffer *records;
2988         uint32_t len;
2989         uint32_t total;
2990         bool traverse_error;
2991 };
2992
2993 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
2994 {
2995         struct backup_data *bd = talloc_get_type(private, struct backup_data);
2996         struct ctdb_rec_data *rec;
2997
2998         /* add the record */
2999         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3000         if (rec == NULL) {
3001                 bd->traverse_error = true;
3002                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3003                 return -1;
3004         }
3005         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3006         if (bd->records == NULL) {
3007                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3008                 bd->traverse_error = true;
3009                 return -1;
3010         }
3011         bd->records->count++;
3012         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3013         bd->len += rec->length;
3014         talloc_free(rec);
3015
3016         bd->total++;
3017         return 0;
3018 }
3019
3020 /*
3021  * backup a database to a file 
3022  */
3023 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3024 {
3025         int i, ret;
3026         struct ctdb_dbid_map *dbmap=NULL;
3027         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3028         struct db_file_header dbhdr;
3029         struct ctdb_db_context *ctdb_db;
3030         struct backup_data *bd;
3031         int fh = -1;
3032         int status = -1;
3033
3034         if (argc != 2) {
3035                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3036                 return -1;
3037         }
3038
3039         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3040         if (ret != 0) {
3041                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3042                 return ret;
3043         }
3044
3045         for(i=0;i<dbmap->num;i++){
3046                 const char *name;
3047
3048                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3049                 if(!strcmp(argv[0], name)){
3050                         talloc_free(discard_const(name));
3051                         break;
3052                 }
3053                 talloc_free(discard_const(name));
3054         }
3055         if (i == dbmap->num) {
3056                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3057                 talloc_free(tmp_ctx);
3058                 return -1;
3059         }
3060
3061
3062         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3063         if (ctdb_db == NULL) {
3064                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3065                 talloc_free(tmp_ctx);
3066                 return -1;
3067         }
3068
3069
3070         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3071         if (ret == -1) {
3072                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3073                 talloc_free(tmp_ctx);
3074                 return -1;
3075         }
3076
3077
3078         bd = talloc_zero(tmp_ctx, struct backup_data);
3079         if (bd == NULL) {
3080                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3081                 talloc_free(tmp_ctx);
3082                 return -1;
3083         }
3084
3085         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3086         if (bd->records == NULL) {
3087                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3088                 talloc_free(tmp_ctx);
3089                 return -1;
3090         }
3091
3092         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3093         bd->records->db_id = ctdb_db->db_id;
3094         /* traverse the database collecting all records */
3095         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3096             bd->traverse_error) {
3097                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3098                 talloc_free(tmp_ctx);
3099                 return -1;              
3100         }
3101
3102         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3103
3104
3105         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3106         if (fh == -1) {
3107                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3108                 talloc_free(tmp_ctx);
3109                 return -1;
3110         }
3111
3112         dbhdr.version = DB_VERSION;
3113         dbhdr.timestamp = time(NULL);
3114         dbhdr.persistent = dbmap->dbs[i].persistent;
3115         dbhdr.size = bd->len;
3116         if (strlen(argv[0]) >= MAX_DB_NAME) {
3117                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3118                 goto done;
3119         }
3120         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3121         ret = write(fh, &dbhdr, sizeof(dbhdr));
3122         if (ret == -1) {
3123                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3124                 goto done;
3125         }
3126         ret = write(fh, bd->records, bd->len);
3127         if (ret == -1) {
3128                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3129                 goto done;
3130         }
3131
3132         status = 0;
3133 done:
3134         if (fh != -1) {
3135                 ret = close(fh);
3136                 if (ret == -1) {
3137                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3138                 }
3139         }
3140         talloc_free(tmp_ctx);
3141         return status;
3142 }
3143
3144 /*
3145  * restore a database from a file 
3146  */
3147 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3148 {
3149         int ret;
3150         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3151         TDB_DATA outdata;
3152         TDB_DATA data;
3153         struct db_file_header dbhdr;
3154         struct ctdb_db_context *ctdb_db;
3155         struct ctdb_node_map *nodemap=NULL;
3156         struct ctdb_vnn_map *vnnmap=NULL;
3157         int i, fh;
3158         struct ctdb_control_wipe_database w;
3159         uint32_t *nodes;
3160         uint32_t generation;
3161         struct tm *tm;
3162         char tbuf[100];
3163
3164         if (argc != 1) {
3165                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3166                 return -1;
3167         }
3168
3169         fh = open(argv[0], O_RDONLY);
3170         if (fh == -1) {
3171                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3172                 talloc_free(tmp_ctx);
3173                 return -1;
3174         }
3175
3176         read(fh, &dbhdr, sizeof(dbhdr));
3177         if (dbhdr.version != DB_VERSION) {
3178                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3179                 talloc_free(tmp_ctx);
3180                 return -1;
3181         }
3182
3183         outdata.dsize = dbhdr.size;
3184         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3185         if (outdata.dptr == NULL) {
3186                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3187                 close(fh);
3188                 talloc_free(tmp_ctx);
3189                 return -1;
3190         }               
3191         read(fh, outdata.dptr, outdata.dsize);
3192         close(fh);
3193
3194         tm = localtime(&dbhdr.timestamp);
3195         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3196         printf("Restoring database '%s' from backup @ %s\n",
3197                 dbhdr.name, tbuf);
3198
3199
3200         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3201         if (ctdb_db == NULL) {
3202                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3203                 talloc_free(tmp_ctx);
3204                 return -1;
3205         }
3206
3207         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3208         if (ret != 0) {
3209                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3210                 talloc_free(tmp_ctx);
3211                 return ret;
3212         }
3213
3214
3215         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3216         if (ret != 0) {
3217                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3218                 talloc_free(tmp_ctx);
3219                 return ret;
3220         }
3221
3222         /* freeze all nodes */
3223         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3224         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3225                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3226                                         nodes, i,
3227                                         TIMELIMIT(),
3228                                         false, tdb_null,
3229                                         NULL, NULL,
3230                                         NULL) != 0) {
3231                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3232                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3233                         talloc_free(tmp_ctx);
3234                         return -1;
3235                 }
3236         }
3237
3238         generation = vnnmap->generation;
3239         data.dptr = (void *)&generation;
3240         data.dsize = sizeof(generation);
3241
3242         /* start a cluster wide transaction */
3243         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3244         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3245                                         nodes, 0,
3246                                         TIMELIMIT(), false, data,
3247                                         NULL, NULL,
3248                                         NULL) != 0) {
3249                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3250                 return -1;
3251         }
3252
3253
3254         w.db_id = ctdb_db->db_id;
3255         w.transaction_id = generation;
3256
3257         data.dptr = (void *)&w;
3258         data.dsize = sizeof(w);
3259
3260         /* wipe all the remote databases. */
3261         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3262         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3263                                         nodes, 0,
3264                                         TIMELIMIT(), false, data,
3265                                         NULL, NULL,
3266                                         NULL) != 0) {
3267                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3268                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3269                 talloc_free(tmp_ctx);
3270                 return -1;
3271         }
3272         
3273         /* push the database */
3274         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3275         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3276                                         nodes, 0,
3277                                         TIMELIMIT(), false, outdata,
3278                                         NULL, NULL,
3279                                         NULL) != 0) {
3280                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3281                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3282                 talloc_free(tmp_ctx);
3283                 return -1;
3284         }
3285
3286         data.dptr = (void *)&generation;
3287         data.dsize = sizeof(generation);
3288
3289         /* commit all the changes */
3290         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3291                                         nodes, 0,
3292                                         TIMELIMIT(), false, data,
3293                                         NULL, NULL,
3294                                         NULL) != 0) {
3295                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3296                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3297                 talloc_free(tmp_ctx);
3298                 return -1;
3299         }
3300
3301
3302         /* thaw all nodes */
3303         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3304         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3305                                         nodes, 0,
3306                                         TIMELIMIT(),
3307                                         false, tdb_null,
3308                                         NULL, NULL,
3309                                         NULL) != 0) {
3310                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3311                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3312                 talloc_free(tmp_ctx);
3313                 return -1;
3314         }
3315
3316
3317         talloc_free(tmp_ctx);
3318         return 0;
3319 }
3320
3321 /*
3322  * wipe a database from a file
3323  */
3324 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3325                           const char **argv)
3326 {
3327         int ret;
3328         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3329         TDB_DATA data;
3330         struct ctdb_db_context *ctdb_db;
3331         struct ctdb_node_map *nodemap = NULL;
3332         struct ctdb_vnn_map *vnnmap = NULL;
3333         int i;
3334         struct ctdb_control_wipe_database w;
3335         uint32_t *nodes;
3336         uint32_t generation;
3337         struct ctdb_dbid_map *dbmap = NULL;
3338
3339         if (argc != 1) {
3340                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3341                 return -1;
3342         }
3343
3344         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3345                                  &dbmap);
3346         if (ret != 0) {
3347                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3348                                   options.pnn));
3349                 return ret;
3350         }
3351
3352         for(i=0;i<dbmap->num;i++){
3353                 const char *name;
3354
3355                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3356                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3357                 if(!strcmp(argv[0], name)){
3358                         talloc_free(discard_const(name));
3359                         break;
3360                 }
3361                 talloc_free(discard_const(name));
3362         }
3363         if (i == dbmap->num) {
3364                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3365                                   argv[0]));
3366                 talloc_free(tmp_ctx);
3367                 return -1;
3368         }
3369
3370         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3371         if (ctdb_db == NULL) {
3372                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3373                                   argv[0]));
3374                 talloc_free(tmp_ctx);
3375                 return -1;
3376         }
3377
3378         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3379                                    &nodemap);
3380         if (ret != 0) {
3381                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3382                                   options.pnn));
3383                 talloc_free(tmp_ctx);
3384                 return ret;
3385         }
3386
3387         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3388                                   &vnnmap);
3389         if (ret != 0) {
3390                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3391                                   options.pnn));
3392                 talloc_free(tmp_ctx);
3393                 return ret;
3394         }
3395
3396         /* freeze all nodes */
3397         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3398         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3399                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3400                                                 nodes, i,
3401                                                 TIMELIMIT(),
3402                                                 false, tdb_null,
3403                                                 NULL, NULL,
3404                                                 NULL);
3405                 if (ret != 0) {
3406                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3407                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3408                                              CTDB_RECOVERY_ACTIVE);
3409                         talloc_free(tmp_ctx);
3410                         return -1;
3411                 }
3412         }
3413
3414         generation = vnnmap->generation;
3415         data.dptr = (void *)&generation;
3416         data.dsize = sizeof(generation);
3417
3418         /* start a cluster wide transaction */
3419         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3420         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3421                                         nodes, 0,
3422                                         TIMELIMIT(), false, data,
3423                                         NULL, NULL,
3424                                         NULL);
3425         if (ret!= 0) {
3426                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3427                                   "transactions.\n"));
3428                 return -1;
3429         }
3430
3431         w.db_id = ctdb_db->db_id;
3432         w.transaction_id = generation;
3433
3434         data.dptr = (void *)&w;
3435         data.dsize = sizeof(w);
3436
3437         /* wipe all the remote databases. */
3438         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3439         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3440                                         nodes, 0,
3441                                         TIMELIMIT(), false, data,
3442                                         NULL, NULL,
3443                                         NULL) != 0) {
3444                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3445                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3446                 talloc_free(tmp_ctx);
3447                 return -1;
3448         }
3449
3450         data.dptr = (void *)&generation;
3451         data.dsize = sizeof(generation);
3452
3453         /* commit all the changes */
3454         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3455                                         nodes, 0,
3456                                         TIMELIMIT(), false, data,
3457                                         NULL, NULL,
3458                                         NULL) != 0) {
3459                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3460                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3461                 talloc_free(tmp_ctx);
3462                 return -1;
3463         }
3464
3465         /* thaw all nodes */
3466         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3467         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3468                                         nodes, 0,
3469                                         TIMELIMIT(),
3470                                         false, tdb_null,
3471                                         NULL, NULL,
3472                                         NULL) != 0) {
3473                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3474                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3475                 talloc_free(tmp_ctx);
3476                 return -1;
3477         }
3478
3479         talloc_free(tmp_ctx);
3480         return 0;
3481 }
3482
3483 /*
3484  * set flags of a node in the nodemap
3485  */
3486 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3487 {
3488         int ret;
3489         int32_t status;
3490         int node;
3491         int flags;
3492         TDB_DATA data;
3493         struct ctdb_node_flag_change c;
3494
3495         if (argc != 2) {
3496                 usage();
3497                 return -1;
3498         }
3499
3500         if (sscanf(argv[0], "%d", &node) != 1) {
3501                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3502                 usage();
3503                 return -1;
3504         }
3505         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3506                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3507                 usage();
3508                 return -1;
3509         }
3510
3511         c.pnn       = node;
3512         c.old_flags = 0;
3513         c.new_flags = flags;
3514
3515         data.dsize = sizeof(c);
3516         data.dptr = (unsigned char *)&c;
3517
3518         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3519                            data, NULL, NULL, &status, NULL, NULL);
3520         if (ret != 0 || status != 0) {
3521                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3522                 return -1;
3523         }
3524         return 0;
3525 }
3526
3527 /*
3528   dump memory usage
3529  */
3530 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3531 {
3532         TDB_DATA data;
3533         int ret;
3534         int32_t res;
3535         char *errmsg;
3536         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3537         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3538                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3539         if (ret != 0 || res != 0) {
3540                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3541                 talloc_free(tmp_ctx);
3542                 return -1;
3543         }
3544         write(1, data.dptr, data.dsize);
3545         talloc_free(tmp_ctx);
3546         return 0;
3547 }
3548
3549 /*
3550   handler for memory dumps
3551 */
3552 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3553                              TDB_DATA data, void *private_data)
3554 {
3555         write(1, data.dptr, data.dsize);
3556         exit(0);
3557 }
3558
3559 /*
3560   dump memory usage on the recovery daemon
3561  */
3562 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3563 {
3564         int ret;
3565         TDB_DATA data;
3566         struct rd_memdump_reply rd;
3567
3568         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3569         if (rd.pnn == -1) {
3570                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3571                 return -1;
3572         }
3573         rd.srvid = getpid();
3574
3575         /* register a message port for receiveing the reply so that we
3576            can receive the reply
3577         */
3578         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3579
3580
3581         data.dptr = (uint8_t *)&rd;
3582         data.dsize = sizeof(rd);
3583
3584         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3585         if (ret != 0) {
3586                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3587                 return -1;
3588         }
3589
3590         /* this loop will terminate when we have received the reply */
3591         while (1) {     
3592                 event_loop_once(ctdb->ev);
3593         }
3594
3595         return 0;
3596 }
3597
3598 /*
3599   list all nodes in the cluster
3600   if the daemon is running, we read the data from the daemon.
3601   if the daemon is not running we parse the nodes file directly
3602  */
3603 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3604 {
3605         int i, ret;
3606         struct ctdb_node_map *nodemap=NULL;
3607
3608         if (ctdb != NULL) {
3609                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3610                 if (ret != 0) {
3611                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3612                         return ret;
3613                 }
3614
3615                 for(i=0;i<nodemap->num;i++){
3616                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3617                                 continue;
3618                         }
3619                         if (options.machinereadable){
3620                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3621                         } else {
3622                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3623                         }
3624                 }
3625         } else {
3626                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3627                 struct pnn_node *pnn_nodes;
3628                 struct pnn_node *pnn_node;
3629         
3630                 pnn_nodes = read_nodes_file(mem_ctx);
3631                 if (pnn_nodes == NULL) {
3632                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3633                         talloc_free(mem_ctx);
3634                         return -1;
3635                 }
3636
3637                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3638                         ctdb_sock_addr addr;
3639
3640                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3641                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3642                                 talloc_free(mem_ctx);
3643                                 return -1;
3644                         }
3645
3646                         if (options.machinereadable){
3647                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3648                         } else {
3649                                 printf("%s\n", pnn_node->addr);
3650                         }
3651                 }
3652                 talloc_free(mem_ctx);
3653         }
3654
3655         return 0;
3656 }
3657
3658 /*
3659   reload the nodes file on the local node
3660  */
3661 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3662 {
3663         int i, ret;
3664         int mypnn;
3665         struct ctdb_node_map *nodemap=NULL;
3666
3667         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3668         if (mypnn == -1) {
3669                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3670                 return -1;
3671         }
3672
3673         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3674         if (ret != 0) {
3675                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3676                 return ret;
3677         }
3678
3679         /* reload the nodes file on all remote nodes */
3680         for (i=0;i<nodemap->num;i++) {
3681                 if (nodemap->nodes[i].pnn == mypnn) {
3682                         continue;
3683                 }
3684                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3685                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3686                         nodemap->nodes[i].pnn);
3687                 if (ret != 0) {
3688                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3689                 }
3690         }
3691
3692         /* reload the nodes file on the local node */
3693         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3694         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3695         if (ret != 0) {
3696                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3697         }
3698
3699         /* initiate a recovery */
3700         control_recover(ctdb, argc, argv);
3701
3702         return 0;
3703 }
3704
3705
3706 static const struct {
3707         const char *name;
3708         int (*fn)(struct ctdb_context *, int, const char **);
3709         bool auto_all;
3710         bool without_daemon; /* can be run without daemon running ? */
3711         const char *msg;
3712         const char *args;
3713 } ctdb_commands[] = {
3714 #ifdef CTDB_VERS
3715         { "version",         control_version,           true,   false,  "show version of ctdb" },
3716 #endif
3717         { "status",          control_status,            true,   false,  "show node status" },
3718         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
3719         { "ping",            control_ping,              true,   false,  "ping all nodes" },
3720         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
3721         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
3722         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
3723         { "statistics",      control_statistics,        false,  false, "show statistics" },
3724         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
3725         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
3726         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
3727         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
3728         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
3729         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
3730         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
3731         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
3732         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
3733         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
3734         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
3735         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
3736         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
3737         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
3738         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
3739         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
3740         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
3741         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
3742         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
3743         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
3744         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
3745         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
3746         { "stop",            control_stop,              true,   false,  "stop a node" },
3747         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
3748         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
3749         { "unban",           control_unban,             true,   false,  "unban a node" },
3750         { "showban",         control_showban,           true,   false,  "show ban information"},
3751         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
3752         { "recover",         control_recover,           true,   false,  "force recovery" },
3753         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
3754         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
3755         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
3756         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
3757         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
3758         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
3759         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
3760         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
3761
3762         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
3763         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
3764         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
3765         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
3766         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
3767         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
3768         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
3769         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
3770         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
3771         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
3772         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
3773         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
3774         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
3775         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
3776         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
3777         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
3778         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
3779         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts"},
3780         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
3781         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
3782         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
3783         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
3784         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
3785         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
3786         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
3787         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
3788         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
3789         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
3790         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
3791 };
3792
3793 /*
3794   show usage message
3795  */
3796 static void usage(void)
3797 {
3798         int i;
3799         printf(
3800 "Usage: ctdb [options] <control>\n" \
3801 "Options:\n" \
3802 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
3803 "   -Y                 generate machinereadable output\n"
3804 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
3805         printf("Controls:\n");
3806         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3807                 printf("  %-15s %-27s  %s\n", 
3808                        ctdb_commands[i].name, 
3809                        ctdb_commands[i].args?ctdb_commands[i].args:"",
3810                        ctdb_commands[i].msg);
3811         }
3812         exit(1);
3813 }
3814
3815
3816 static void ctdb_alarm(int sig)
3817 {
3818         printf("Maximum runtime exceeded - exiting\n");
3819         _exit(ERR_TIMEOUT);
3820 }
3821
3822 /*
3823   main program
3824 */
3825 int main(int argc, const char *argv[])
3826 {
3827         struct ctdb_context *ctdb;
3828         char *nodestring = NULL;
3829         struct poptOption popt_options[] = {
3830                 POPT_AUTOHELP
3831                 POPT_CTDB_CMDLINE
3832                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
3833                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
3834                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
3835                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
3836                 POPT_TABLEEND
3837         };
3838         int opt;
3839         const char **extra_argv;
3840         int extra_argc = 0;
3841         int ret=-1, i;
3842         poptContext pc;
3843         struct event_context *ev;
3844         const char *control;
3845
3846         setlinebuf(stdout);
3847         
3848         /* set some defaults */
3849         options.maxruntime = 0;
3850         options.timelimit = 3;
3851         options.pnn = CTDB_CURRENT_NODE;
3852
3853         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
3854
3855         while ((opt = poptGetNextOpt(pc)) != -1) {
3856                 switch (opt) {
3857                 default:
3858                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
3859                                 poptBadOption(pc, 0), poptStrerror(opt)));
3860                         exit(1);
3861                 }
3862         }
3863
3864         /* setup the remaining options for the main program to use */
3865         extra_argv = poptGetArgs(pc);
3866         if (extra_argv) {
3867                 extra_argv++;
3868                 while (extra_argv[extra_argc]) extra_argc++;
3869         }
3870
3871         if (extra_argc < 1) {
3872                 usage();
3873         }
3874
3875         if (options.maxruntime == 0) {
3876                 const char *ctdb_timeout;
3877                 ctdb_timeout = getenv("CTDB_TIMEOUT");
3878                 if (ctdb_timeout != NULL) {
3879                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
3880                 } else {
3881                         /* default timeout is 120 seconds */
3882                         options.maxruntime = 120;
3883                 }
3884         }
3885
3886         signal(SIGALRM, ctdb_alarm);
3887         alarm(options.maxruntime);
3888
3889         /* setup the node number to contact */
3890         if (nodestring != NULL) {
3891                 if (strcmp(nodestring, "all") == 0) {
3892                         options.pnn = CTDB_BROADCAST_ALL;
3893                 } else {
3894                         options.pnn = strtoul(nodestring, NULL, 0);
3895                 }
3896         }
3897
3898         control = extra_argv[0];
3899
3900         ev = event_context_init(NULL);
3901
3902         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3903                 if (strcmp(control, ctdb_commands[i].name) == 0) {
3904                         int j;
3905
3906                         if (ctdb_commands[i].without_daemon == true) {
3907                                 close(2);
3908                         }
3909
3910                         /* initialise ctdb */
3911                         ctdb = ctdb_cmdline_client(ev);
3912
3913                         if (ctdb_commands[i].without_daemon == false) {
3914                                 if (ctdb == NULL) {
3915                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
3916                                         exit(1);
3917                                 }
3918
3919                                 /* verify the node exists */
3920                                 verify_node(ctdb);
3921
3922                                 if (options.pnn == CTDB_CURRENT_NODE) {
3923                                         int pnn;
3924                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
3925                                         if (pnn == -1) {
3926                                                 return -1;
3927                                         }
3928                                         options.pnn = pnn;
3929                                 }
3930                         }
3931
3932                         if (ctdb_commands[i].auto_all && 
3933                             options.pnn == CTDB_BROADCAST_ALL) {
3934                                 uint32_t *nodes;
3935                                 uint32_t num_nodes;
3936                                 ret = 0;
3937
3938                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
3939                                 CTDB_NO_MEMORY(ctdb, nodes);
3940         
3941                                 for (j=0;j<num_nodes;j++) {
3942                                         options.pnn = nodes[j];
3943                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
3944                                 }
3945                                 talloc_free(nodes);
3946                         } else {
3947                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
3948                         }
3949                         break;
3950                 }
3951         }
3952
3953         if (i == ARRAY_SIZE(ctdb_commands)) {
3954                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
3955                 exit(1);
3956         }
3957
3958         return ret;
3959 }