tools/ctdb: add "ctdb getdbstatus <dbname>"
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
517                 }
518                 return 0;
519         }
520
521         printf("Number of nodes:%d\n", nodemap->num);
522         for(i=0;i<nodemap->num;i++){
523                 static const struct {
524                         uint32_t flag;
525                         const char *name;
526                 } flag_names[] = {
527                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
528                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
529                         { NODE_FLAGS_BANNED,                "BANNED" },
530                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
531                         { NODE_FLAGS_DELETED,               "DELETED" },
532                         { NODE_FLAGS_STOPPED,               "STOPPED" },
533                 };
534                 char *flags_str = NULL;
535                 int j;
536
537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
538                         continue;
539                 }
540                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
541                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
542                                 if (flags_str == NULL) {
543                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
544                                 } else {
545                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
546                                                                            flag_names[j].name);
547                                 }
548                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
549                         }
550                 }
551                 if (flags_str == NULL) {
552                         flags_str = talloc_strdup(ctdb, "OK");
553                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
554                 }
555                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
556                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
557                        flags_str,
558                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
559                 talloc_free(flags_str);
560         }
561
562         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
563         if (ret != 0) {
564                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
565                 return ret;
566         }
567         if (vnnmap->generation == INVALID_GENERATION) {
568                 printf("Generation:INVALID\n");
569         } else {
570                 printf("Generation:%d\n",vnnmap->generation);
571         }
572         printf("Size:%d\n",vnnmap->size);
573         for(i=0;i<vnnmap->size;i++){
574                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
575         }
576
577         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
580                 return ret;
581         }
582         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
583
584         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
585         if (ret != 0) {
586                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
587                 return ret;
588         }
589         printf("Recovery master:%d\n",recmaster);
590
591         return 0;
592 }
593
594
595 struct natgw_node {
596         struct natgw_node *next;
597         const char *addr;
598 };
599
600 /*
601   display the list of nodes belonging to this natgw configuration
602  */
603 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
604 {
605         int i, ret;
606         const char *natgw_list;
607         int nlines;
608         char **lines;
609         struct natgw_node *natgw_nodes = NULL;
610         struct natgw_node *natgw_node;
611         struct ctdb_node_map *nodemap=NULL;
612
613
614         /* read the natgw nodes file into a linked list */
615         natgw_list = getenv("NATGW_NODES");
616         if (natgw_list == NULL) {
617                 natgw_list = "/etc/ctdb/natgw_nodes";
618         }
619         lines = file_lines_load(natgw_list, &nlines, ctdb);
620         if (lines == NULL) {
621                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
622                 return -1;
623         }
624         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
625                 nlines--;
626         }
627         for (i=0;i<nlines;i++) {
628                 char *node;
629
630                 node = lines[i];
631                 /* strip leading spaces */
632                 while((*node == ' ') || (*node == '\t')) {
633                         node++;
634                 }
635                 if (*node == '#') {
636                         continue;
637                 }
638                 if (strcmp(node, "") == 0) {
639                         continue;
640                 }
641                 natgw_node = talloc(ctdb, struct natgw_node);
642                 natgw_node->addr = talloc_strdup(natgw_node, node);
643                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
644                 natgw_node->next = natgw_nodes;
645                 natgw_nodes = natgw_node;
646         }
647
648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
651                 return ret;
652         }
653
654         i=0;
655         while(i<nodemap->num) {
656                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
657                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
658                                 break;
659                         }
660                 }
661
662                 /* this node was not in the natgw so we just remove it from
663                  * the list
664                  */
665                 if ((natgw_node == NULL) 
666                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
667                         int j;
668
669                         for (j=i+1; j<nodemap->num; j++) {
670                                 nodemap->nodes[j-1] = nodemap->nodes[j];
671                         }
672                         nodemap->num--;
673                         continue;
674                 }
675
676                 i++;
677         }               
678
679         /* pick a node to be natgwmaster
680          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
681          */
682         for(i=0;i<nodemap->num;i++){
683                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
684                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
685                         break;
686                 }
687         }
688         /* we couldnt find any healthy node, try unhealthy ones */
689         if (i == nodemap->num) {
690                 for(i=0;i<nodemap->num;i++){
691                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
692                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
693                                 break;
694                         }
695                 }
696         }
697         /* unless all nodes are STOPPED, when we pick one anyway */
698         if (i == nodemap->num) {
699                 for(i=0;i<nodemap->num;i++){
700                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
701                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
702                                 break;
703                         }
704                 }
705                 /* or if we still can not find any */
706                 if (i == nodemap->num) {
707                         printf("-1 0.0.0.0\n");
708                 }
709         }
710
711         /* print the pruned list of nodes belonging to this natgw list */
712         for(i=0;i<nodemap->num;i++){
713                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
714                         continue;
715                 }
716                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
717                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
718                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
723         }
724
725         return 0;
726 }
727
728 /*
729   display the status of the scripts for monitoring (or other events)
730  */
731 static int control_one_scriptstatus(struct ctdb_context *ctdb,
732                                     enum ctdb_eventscript_call type)
733 {
734         struct ctdb_scripts_wire *script_status;
735         int ret, i;
736
737         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
738         if (ret != 0) {
739                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
740                 return ret;
741         }
742
743         if (script_status == NULL) {
744                 if (!options.machinereadable) {
745                         printf("%s cycle never run\n",
746                                ctdb_eventscript_call_names[type]);
747                 }
748                 return 0;
749         }
750
751         if (!options.machinereadable) {
752                 printf("%d scripts were executed last %s cycle\n",
753                        script_status->num_scripts,
754                        ctdb_eventscript_call_names[type]);
755         }
756         for (i=0; i<script_status->num_scripts; i++) {
757                 const char *status = NULL;
758
759                 switch (script_status->scripts[i].status) {
760                 case -ETIME:
761                         status = "TIMEDOUT";
762                         break;
763                 case -ENOEXEC:
764                         status = "DISABLED";
765                         break;
766                 case 0:
767                         status = "OK";
768                         break;
769                 default:
770                         if (script_status->scripts[i].status > 0)
771                                 status = "ERROR";
772                         break;
773                 }
774                 if (options.machinereadable) {
775                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
776                                ctdb_eventscript_call_names[type],
777                                script_status->scripts[i].name,
778                                script_status->scripts[i].status,
779                                status,
780                                (long)script_status->scripts[i].start.tv_sec,
781                                (long)script_status->scripts[i].start.tv_usec,
782                                (long)script_status->scripts[i].finished.tv_sec,
783                                (long)script_status->scripts[i].finished.tv_usec,
784                                script_status->scripts[i].output);
785                         continue;
786                 }
787                 if (status)
788                         printf("%-20s Status:%s    ",
789                                script_status->scripts[i].name, status);
790                 else
791                         /* Some other error, eg from stat. */
792                         printf("%-20s Status:CANNOT RUN (%s)",
793                                script_status->scripts[i].name,
794                                strerror(-script_status->scripts[i].status));
795
796                 if (script_status->scripts[i].status >= 0) {
797                         printf("Duration:%.3lf ",
798                         timeval_delta(&script_status->scripts[i].finished,
799                               &script_status->scripts[i].start));
800                 }
801                 if (script_status->scripts[i].status != -ENOEXEC) {
802                         printf("%s",
803                                ctime(&script_status->scripts[i].start.tv_sec));
804                         if (script_status->scripts[i].status != 0) {
805                                 printf("   OUTPUT:%s\n",
806                                        script_status->scripts[i].output);
807                         }
808                 } else {
809                         printf("\n");
810                 }
811         }
812         return 0;
813 }
814
815
816 static int control_scriptstatus(struct ctdb_context *ctdb,
817                                 int argc, const char **argv)
818 {
819         int ret;
820         enum ctdb_eventscript_call type, min, max;
821         const char *arg;
822
823         if (argc > 1) {
824                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
825                 return -1;
826         }
827
828         if (argc == 0)
829                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
830         else
831                 arg = argv[0];
832
833         for (type = 0; type < CTDB_EVENT_MAX; type++) {
834                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
835                         min = type;
836                         max = type+1;
837                         break;
838                 }
839         }
840         if (type == CTDB_EVENT_MAX) {
841                 if (strcmp(arg, "all") == 0) {
842                         min = 0;
843                         max = CTDB_EVENT_MAX;
844                 } else {
845                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
846                         return -1;
847                 }
848         }
849
850         if (options.machinereadable) {
851                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
852         }
853
854         for (type = min; type < max; type++) {
855                 ret = control_one_scriptstatus(ctdb, type);
856                 if (ret != 0) {
857                         return ret;
858                 }
859         }
860
861         return 0;
862 }
863
864 /*
865   enable an eventscript
866  */
867 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
868 {
869         int ret;
870
871         if (argc < 1) {
872                 usage();
873         }
874
875         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
876         if (ret != 0) {
877           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
878                 return ret;
879         }
880
881         return 0;
882 }
883
884 /*
885   disable an eventscript
886  */
887 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
888 {
889         int ret;
890
891         if (argc < 1) {
892                 usage();
893         }
894
895         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
896         if (ret != 0) {
897           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
898                 return ret;
899         }
900
901         return 0;
902 }
903
904 /*
905   display the pnn of the recovery master
906  */
907 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
908 {
909         int ret;
910         uint32_t recmaster;
911
912         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
913         if (ret != 0) {
914                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
915                 return ret;
916         }
917         printf("%d\n",recmaster);
918
919         return 0;
920 }
921
922 /*
923   get a list of all tickles for this pnn
924  */
925 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
926 {
927         struct ctdb_control_tcp_tickle_list *list;
928         ctdb_sock_addr addr;
929         int i, ret;
930
931         if (argc < 1) {
932                 usage();
933         }
934
935         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
936                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
937                 return -1;
938         }
939
940         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
941         if (ret == -1) {
942                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
943                 return -1;
944         }
945
946         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
947         printf("Num tickles:%u\n", list->tickles.num);
948         for (i=0;i<list->tickles.num;i++) {
949                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
950                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
951         }
952
953         talloc_free(list);
954         
955         return 0;
956 }
957
958
959
960 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
961 {
962         struct ctdb_all_public_ips *ips;
963         struct ctdb_public_ip ip;
964         int i, ret;
965         uint32_t *nodes;
966         uint32_t disable_time;
967         TDB_DATA data;
968         struct ctdb_node_map *nodemap=NULL;
969         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
970
971         disable_time = 30;
972         data.dptr  = (uint8_t*)&disable_time;
973         data.dsize = sizeof(disable_time);
974         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
975         if (ret != 0) {
976                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
977                 return -1;
978         }
979
980
981
982         /* read the public ip list from the node */
983         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
984         if (ret != 0) {
985                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
986                 talloc_free(tmp_ctx);
987                 return -1;
988         }
989
990         for (i=0;i<ips->num;i++) {
991                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
992                         break;
993                 }
994         }
995         if (i==ips->num) {
996                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
997                         pnn, ctdb_addr_to_str(addr)));
998                 talloc_free(tmp_ctx);
999                 return -1;
1000         }
1001
1002         ip.pnn  = pnn;
1003         ip.addr = *addr;
1004
1005         data.dptr  = (uint8_t *)&ip;
1006         data.dsize = sizeof(ip);
1007
1008         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1009         if (ret != 0) {
1010                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1011                 talloc_free(tmp_ctx);
1012                 return ret;
1013         }
1014
1015         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1016         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1017                                         nodes, 0,
1018                                         TIMELIMIT(),
1019                                         false, data,
1020                                         NULL, NULL,
1021                                         NULL);
1022         if (ret != 0) {
1023                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1024                 talloc_free(tmp_ctx);
1025                 return -1;
1026         }
1027
1028         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1029         if (ret != 0) {
1030                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1031                 talloc_free(tmp_ctx);
1032                 return -1;
1033         }
1034
1035         talloc_free(tmp_ctx);
1036         return 0;
1037 }
1038
1039 /*
1040   move/failover an ip address to a specific node
1041  */
1042 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1043 {
1044         uint32_t pnn;
1045         ctdb_sock_addr addr;
1046
1047         if (argc < 2) {
1048                 usage();
1049                 return -1;
1050         }
1051
1052         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1053                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1054                 return -1;
1055         }
1056
1057
1058         if (sscanf(argv[1], "%u", &pnn) != 1) {
1059                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1060                 return -1;
1061         }
1062
1063         if (move_ip(ctdb, &addr, pnn) != 0) {
1064                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1065                 return -1;
1066         }
1067
1068         return 0;
1069 }
1070
1071 void getips_store_callback(void *param, void *data)
1072 {
1073         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1074         struct ctdb_all_public_ips *ips = param;
1075         int i;
1076
1077         i = ips->num++;
1078         ips->ips[i].pnn  = node_ip->pnn;
1079         ips->ips[i].addr = node_ip->addr;
1080 }
1081
1082 void getips_count_callback(void *param, void *data)
1083 {
1084         uint32_t *count = param;
1085
1086         (*count)++;
1087 }
1088
1089 #define IP_KEYLEN       4
1090 static uint32_t *ip_key(ctdb_sock_addr *ip)
1091 {
1092         static uint32_t key[IP_KEYLEN];
1093
1094         bzero(key, sizeof(key));
1095
1096         switch (ip->sa.sa_family) {
1097         case AF_INET:
1098                 key[0]  = ip->ip.sin_addr.s_addr;
1099                 break;
1100         case AF_INET6:
1101                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1102                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1103                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1104                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1105                 break;
1106         default:
1107                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1108                 return key;
1109         }
1110
1111         return key;
1112 }
1113
1114 static void *add_ip_callback(void *parm, void *data)
1115 {
1116         return parm;
1117 }
1118
1119 static int
1120 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1121 {
1122         struct ctdb_all_public_ips *tmp_ips;
1123         struct ctdb_node_map *nodemap=NULL;
1124         trbt_tree_t *ip_tree;
1125         int i, j, len, ret;
1126         uint32_t count;
1127
1128         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1129         if (ret != 0) {
1130                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1131                 return ret;
1132         }
1133
1134         ip_tree = trbt_create(tmp_ctx, 0);
1135
1136         for(i=0;i<nodemap->num;i++){
1137                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1138                         continue;
1139                 }
1140                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1141                         continue;
1142                 }
1143
1144                 /* read the public ip list from this node */
1145                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1146                 if (ret != 0) {
1147                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1148                         return -1;
1149                 }
1150         
1151                 for (j=0; j<tmp_ips->num;j++) {
1152                         struct ctdb_public_ip *node_ip;
1153
1154                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1155                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1156                         node_ip->addr = tmp_ips->ips[j].addr;
1157
1158                         trbt_insertarray32_callback(ip_tree,
1159                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1160                                 add_ip_callback,
1161                                 node_ip);
1162                 }
1163                 talloc_free(tmp_ips);
1164         }
1165
1166         /* traverse */
1167         count = 0;
1168         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1169
1170         len = offsetof(struct ctdb_all_public_ips, ips) + 
1171                 count*sizeof(struct ctdb_public_ip);
1172         tmp_ips = talloc_zero_size(tmp_ctx, len);
1173         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1174
1175         *ips = tmp_ips;
1176
1177         return 0;
1178 }
1179
1180
1181 /* 
1182  * scans all other nodes and returns a pnn for another node that can host this 
1183  * ip address or -1
1184  */
1185 static int
1186 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1187 {
1188         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1189         struct ctdb_all_public_ips *ips;
1190         struct ctdb_node_map *nodemap=NULL;
1191         int i, j, ret;
1192
1193         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1194         if (ret != 0) {
1195                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1196                 talloc_free(tmp_ctx);
1197                 return ret;
1198         }
1199
1200         for(i=0;i<nodemap->num;i++){
1201                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1202                         continue;
1203                 }
1204                 if (nodemap->nodes[i].pnn == options.pnn) {
1205                         continue;
1206                 }
1207
1208                 /* read the public ip list from this node */
1209                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1210                 if (ret != 0) {
1211                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1212                         return -1;
1213                 }
1214
1215                 for (j=0;j<ips->num;j++) {
1216                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1217                                 talloc_free(tmp_ctx);
1218                                 return nodemap->nodes[i].pnn;
1219                         }
1220                 }
1221                 talloc_free(ips);
1222         }
1223
1224         talloc_free(tmp_ctx);
1225         return -1;
1226 }
1227
1228 /*
1229   add a public ip address to a node
1230  */
1231 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1232 {
1233         int i, ret;
1234         int len;
1235         uint32_t pnn;
1236         unsigned mask;
1237         ctdb_sock_addr addr;
1238         struct ctdb_control_ip_iface *pub;
1239         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1240         struct ctdb_all_public_ips *ips;
1241
1242
1243         if (argc != 2) {
1244                 talloc_free(tmp_ctx);
1245                 usage();
1246         }
1247
1248         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1249                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1250                 talloc_free(tmp_ctx);
1251                 return -1;
1252         }
1253
1254         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1255         if (ret != 0) {
1256                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1257                 talloc_free(tmp_ctx);
1258                 return ret;
1259         }
1260
1261
1262         /* check if some other node is already serving this ip, if not,
1263          * we will claim it
1264          */
1265         for (i=0;i<ips->num;i++) {
1266                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1267                         break;
1268                 }
1269         }
1270
1271         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1272         pub = talloc_size(tmp_ctx, len); 
1273         CTDB_NO_MEMORY(ctdb, pub);
1274
1275         pub->addr  = addr;
1276         pub->mask  = mask;
1277         pub->len   = strlen(argv[1])+1;
1278         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1279
1280         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1281         if (ret != 0) {
1282                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1283                 talloc_free(tmp_ctx);
1284                 return ret;
1285         }
1286
1287         if (i == ips->num) {
1288                 /* no one has this ip so we claim it */
1289                 pnn  = options.pnn;
1290         } else {
1291                 pnn  = ips->ips[i].pnn;
1292         }
1293
1294         if (move_ip(ctdb, &addr, pnn) != 0) {
1295                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1296                 return -1;
1297         }
1298
1299         talloc_free(tmp_ctx);
1300         return 0;
1301 }
1302
1303 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1304
1305 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1306 {
1307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1308         struct ctdb_node_map *nodemap=NULL;
1309         struct ctdb_all_public_ips *ips;
1310         int ret, i, j;
1311
1312         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1313         if (ret != 0) {
1314                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1315                 return ret;
1316         }
1317
1318         /* remove it from the nodes that are not hosting the ip currently */
1319         for(i=0;i<nodemap->num;i++){
1320                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1321                         continue;
1322                 }
1323                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1324                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1325                         continue;
1326                 }
1327
1328                 for (j=0;j<ips->num;j++) {
1329                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1330                                 break;
1331                         }
1332                 }
1333                 if (j==ips->num) {
1334                         continue;
1335                 }
1336
1337                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1338                         continue;
1339                 }
1340
1341                 options.pnn = nodemap->nodes[i].pnn;
1342                 control_delip(ctdb, argc, argv);
1343         }
1344
1345
1346         /* remove it from every node (also the one hosting it) */
1347         for(i=0;i<nodemap->num;i++){
1348                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1349                         continue;
1350                 }
1351                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1352                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1353                         continue;
1354                 }
1355
1356                 for (j=0;j<ips->num;j++) {
1357                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1358                                 break;
1359                         }
1360                 }
1361                 if (j==ips->num) {
1362                         continue;
1363                 }
1364
1365                 options.pnn = nodemap->nodes[i].pnn;
1366                 control_delip(ctdb, argc, argv);
1367         }
1368
1369         talloc_free(tmp_ctx);
1370         return 0;
1371 }
1372         
1373 /*
1374   delete a public ip address from a node
1375  */
1376 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1377 {
1378         int i, ret;
1379         ctdb_sock_addr addr;
1380         struct ctdb_control_ip_iface pub;
1381         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1382         struct ctdb_all_public_ips *ips;
1383
1384         if (argc != 1) {
1385                 talloc_free(tmp_ctx);
1386                 usage();
1387         }
1388
1389         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1390                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1391                 return -1;
1392         }
1393
1394         if (options.pnn == CTDB_BROADCAST_ALL) {
1395                 return control_delip_all(ctdb, argc, argv, &addr);
1396         }
1397
1398         pub.addr  = addr;
1399         pub.mask  = 0;
1400         pub.len   = 0;
1401
1402         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1405                 talloc_free(tmp_ctx);
1406                 return ret;
1407         }
1408         
1409         for (i=0;i<ips->num;i++) {
1410                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1411                         break;
1412                 }
1413         }
1414
1415         if (i==ips->num) {
1416                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1417                         ctdb_addr_to_str(&addr)));
1418                 talloc_free(tmp_ctx);
1419                 return -1;
1420         }
1421
1422         if (ips->ips[i].pnn == options.pnn) {
1423                 ret = find_other_host_for_public_ip(ctdb, &addr);
1424                 if (ret != -1) {
1425                         if (move_ip(ctdb, &addr, ret) != 0) {
1426                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1427                                 return -1;
1428                         }
1429                 }
1430         }
1431
1432         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1433         if (ret != 0) {
1434                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1435                 talloc_free(tmp_ctx);
1436                 return ret;
1437         }
1438
1439         talloc_free(tmp_ctx);
1440         return 0;
1441 }
1442
1443 /*
1444   kill a tcp connection
1445  */
1446 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1447 {
1448         int ret;
1449         struct ctdb_control_killtcp killtcp;
1450
1451         if (argc < 2) {
1452                 usage();
1453         }
1454
1455         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1456                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1457                 return -1;
1458         }
1459
1460         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1461                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1462                 return -1;
1463         }
1464
1465         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1466         if (ret != 0) {
1467                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1468                 return ret;
1469         }
1470
1471         return 0;
1472 }
1473
1474
1475 /*
1476   send a gratious arp
1477  */
1478 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1479 {
1480         int ret;
1481         ctdb_sock_addr addr;
1482
1483         if (argc < 2) {
1484                 usage();
1485         }
1486
1487         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1488                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1489                 return -1;
1490         }
1491
1492         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1493         if (ret != 0) {
1494                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1495                 return ret;
1496         }
1497
1498         return 0;
1499 }
1500
1501 /*
1502   register a server id
1503  */
1504 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1505 {
1506         int ret;
1507         struct ctdb_server_id server_id;
1508
1509         if (argc < 3) {
1510                 usage();
1511         }
1512
1513         server_id.pnn       = strtoul(argv[0], NULL, 0);
1514         server_id.type      = strtoul(argv[1], NULL, 0);
1515         server_id.server_id = strtoul(argv[2], NULL, 0);
1516
1517         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1518         if (ret != 0) {
1519                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1520                 return ret;
1521         }
1522         return -1;
1523 }
1524
1525 /*
1526   unregister a server id
1527  */
1528 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1529 {
1530         int ret;
1531         struct ctdb_server_id server_id;
1532
1533         if (argc < 3) {
1534                 usage();
1535         }
1536
1537         server_id.pnn       = strtoul(argv[0], NULL, 0);
1538         server_id.type      = strtoul(argv[1], NULL, 0);
1539         server_id.server_id = strtoul(argv[2], NULL, 0);
1540
1541         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1542         if (ret != 0) {
1543                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1544                 return ret;
1545         }
1546         return -1;
1547 }
1548
1549 /*
1550   check if a server id exists
1551  */
1552 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1553 {
1554         uint32_t status;
1555         int ret;
1556         struct ctdb_server_id server_id;
1557
1558         if (argc < 3) {
1559                 usage();
1560         }
1561
1562         server_id.pnn       = strtoul(argv[0], NULL, 0);
1563         server_id.type      = strtoul(argv[1], NULL, 0);
1564         server_id.server_id = strtoul(argv[2], NULL, 0);
1565
1566         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1569                 return ret;
1570         }
1571
1572         if (status) {
1573                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1574         } else {
1575                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1576         }
1577         return 0;
1578 }
1579
1580 /*
1581   get a list of all server ids that are registered on a node
1582  */
1583 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1584 {
1585         int i, ret;
1586         struct ctdb_server_id_list *server_ids;
1587
1588         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1589         if (ret != 0) {
1590                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1591                 return ret;
1592         }
1593
1594         for (i=0; i<server_ids->num; i++) {
1595                 printf("Server id %d:%d:%d\n", 
1596                         server_ids->server_ids[i].pnn, 
1597                         server_ids->server_ids[i].type, 
1598                         server_ids->server_ids[i].server_id); 
1599         }
1600
1601         return -1;
1602 }
1603
1604 /*
1605   send a tcp tickle ack
1606  */
1607 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1608 {
1609         int ret;
1610         ctdb_sock_addr  src, dst;
1611
1612         if (argc < 2) {
1613                 usage();
1614         }
1615
1616         if (!parse_ip_port(argv[0], &src)) {
1617                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1618                 return -1;
1619         }
1620
1621         if (!parse_ip_port(argv[1], &dst)) {
1622                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1623                 return -1;
1624         }
1625
1626         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1627         if (ret==0) {
1628                 return 0;
1629         }
1630         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1631
1632         return -1;
1633 }
1634
1635
1636 /*
1637   display public ip status
1638  */
1639 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1640 {
1641         int i, ret;
1642         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1643         struct ctdb_all_public_ips *ips;
1644
1645         if (options.pnn == CTDB_BROADCAST_ALL) {
1646                 /* read the list of public ips from all nodes */
1647                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1648         } else {
1649                 /* read the public ip list from this node */
1650                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1651         }
1652         if (ret != 0) {
1653                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1654                 talloc_free(tmp_ctx);
1655                 return ret;
1656         }
1657
1658         if (options.machinereadable){
1659                 printf(":Public IP:Node:\n");
1660         } else {
1661                 if (options.pnn == CTDB_BROADCAST_ALL) {
1662                         printf("Public IPs on ALL nodes\n");
1663                 } else {
1664                         printf("Public IPs on node %u\n", options.pnn);
1665                 }
1666         }
1667
1668         for (i=1;i<=ips->num;i++) {
1669                 if (options.machinereadable){
1670                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1671                 } else {
1672                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1673                 }
1674         }
1675
1676         talloc_free(tmp_ctx);
1677         return 0;
1678 }
1679
1680 /*
1681   display pid of a ctdb daemon
1682  */
1683 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1684 {
1685         uint32_t pid;
1686         int ret;
1687
1688         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1689         if (ret != 0) {
1690                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1691                 return ret;
1692         }
1693         printf("Pid:%d\n", pid);
1694
1695         return 0;
1696 }
1697
1698 /*
1699   handler for receiving the response to ipreallocate
1700 */
1701 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1702                              TDB_DATA data, void *private_data)
1703 {
1704         exit(0);
1705 }
1706
1707 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1708 {
1709         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1710
1711         event_add_timed(ctdb->ev, ctdb, 
1712                                 timeval_current_ofs(1, 0),
1713                                 ctdb_every_second, ctdb);
1714 }
1715
1716 /*
1717   ask the recovery daemon on the recovery master to perform a ip reallocation
1718  */
1719 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1720 {
1721         int i, ret;
1722         TDB_DATA data;
1723         struct takeover_run_reply rd;
1724         uint32_t recmaster;
1725         struct ctdb_node_map *nodemap=NULL;
1726         int retries=0;
1727         struct timeval tv = timeval_current();
1728
1729         /* we need some events to trigger so we can timeout and restart
1730            the loop
1731         */
1732         event_add_timed(ctdb->ev, ctdb, 
1733                                 timeval_current_ofs(1, 0),
1734                                 ctdb_every_second, ctdb);
1735
1736         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1737         if (rd.pnn == -1) {
1738                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1739                 return -1;
1740         }
1741         rd.srvid = getpid();
1742
1743         /* register a message port for receiveing the reply so that we
1744            can receive the reply
1745         */
1746         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1747
1748         data.dptr = (uint8_t *)&rd;
1749         data.dsize = sizeof(rd);
1750
1751 again:
1752         if (retries>5) {
1753                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1754                 exit(10);
1755         }
1756
1757         /* check that there are valid nodes available */
1758         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1759                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1760                 exit(10);
1761         }
1762         for (i=0; i<nodemap->num;i++) {
1763                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1764                         break;
1765                 }
1766         }
1767         if (i==nodemap->num) {
1768                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1769                 return 0;
1770         }
1771
1772
1773         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1774         if (ret != 0) {
1775                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1776                 return ret;
1777         }
1778
1779         /* verify the node exists */
1780         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1781                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1782                 exit(10);
1783         }
1784
1785
1786         /* check tha there are nodes available that can act as a recmaster */
1787         for (i=0; i<nodemap->num; i++) {
1788                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1789                         continue;
1790                 }
1791                 break;
1792         }
1793         if (i == nodemap->num) {
1794                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1795                 return 0;
1796         }
1797
1798         /* verify the recovery master is not STOPPED, nor BANNED */
1799         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1800                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1801                 retries++;
1802                 sleep(1);
1803                 goto again;
1804         } 
1805
1806         
1807         /* verify the recovery master is not STOPPED, nor BANNED */
1808         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1809                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1810                 retries++;
1811                 sleep(1);
1812                 goto again;
1813         } 
1814
1815         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1816         if (ret != 0) {
1817                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1818                 return -1;
1819         }
1820
1821         tv = timeval_current();
1822         /* this loop will terminate when we have received the reply */
1823         while (timeval_elapsed(&tv) < 3.0) {    
1824                 event_loop_once(ctdb->ev);
1825         }
1826
1827         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1828         retries++;
1829         sleep(1);
1830         goto again;
1831
1832         return 0;
1833 }
1834
1835
1836 /*
1837   disable a remote node
1838  */
1839 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1840 {
1841         int ret;
1842         struct ctdb_node_map *nodemap=NULL;
1843
1844         do {
1845                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1846                 if (ret != 0) {
1847                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1848                         return ret;
1849                 }
1850
1851                 sleep(1);
1852
1853                 /* read the nodemap and verify the change took effect */
1854                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1855                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1856                         exit(10);
1857                 }
1858
1859         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1860         ret = control_ipreallocate(ctdb, argc, argv);
1861         if (ret != 0) {
1862                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1863                 return ret;
1864         }
1865
1866         return 0;
1867 }
1868
1869 /*
1870   enable a disabled remote node
1871  */
1872 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1873 {
1874         int ret;
1875
1876         struct ctdb_node_map *nodemap=NULL;
1877
1878         do {
1879                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1880                 if (ret != 0) {
1881                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1882                         return ret;
1883                 }
1884
1885                 sleep(1);
1886
1887                 /* read the nodemap and verify the change took effect */
1888                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1889                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1890                         exit(10);
1891                 }
1892
1893         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1894         ret = control_ipreallocate(ctdb, argc, argv);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1897                 return ret;
1898         }
1899
1900         return 0;
1901 }
1902
1903 /*
1904   stop a remote node
1905  */
1906 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1907 {
1908         int ret;
1909         struct ctdb_node_map *nodemap=NULL;
1910
1911         do {
1912                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1913                 if (ret != 0) {
1914                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1915                 }
1916         
1917                 sleep(1);
1918
1919                 /* read the nodemap and verify the change took effect */
1920                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1921                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1922                         exit(10);
1923                 }
1924
1925         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1926         ret = control_ipreallocate(ctdb, argc, argv);
1927         if (ret != 0) {
1928                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1929                 return ret;
1930         }
1931
1932         return 0;
1933 }
1934
1935 /*
1936   restart a stopped remote node
1937  */
1938 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1939 {
1940         int ret;
1941
1942         struct ctdb_node_map *nodemap=NULL;
1943
1944         do {
1945                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1946                 if (ret != 0) {
1947                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1948                         return ret;
1949                 }
1950         
1951                 sleep(1);
1952
1953                 /* read the nodemap and verify the change took effect */
1954                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1955                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1956                         exit(10);
1957                 }
1958
1959         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1960         ret = control_ipreallocate(ctdb, argc, argv);
1961         if (ret != 0) {
1962                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1963                 return ret;
1964         }
1965
1966         return 0;
1967 }
1968
1969 static uint32_t get_generation(struct ctdb_context *ctdb)
1970 {
1971         struct ctdb_vnn_map *vnnmap=NULL;
1972         int ret;
1973
1974         /* wait until the recmaster is not in recovery mode */
1975         while (1) {
1976                 uint32_t recmode, recmaster;
1977                 
1978                 if (vnnmap != NULL) {
1979                         talloc_free(vnnmap);
1980                         vnnmap = NULL;
1981                 }
1982
1983                 /* get the recmaster */
1984                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1985                 if (ret != 0) {
1986                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1987                         exit(10);
1988                 }
1989
1990                 /* get recovery mode */
1991                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1992                 if (ret != 0) {
1993                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1994                         exit(10);
1995                 }
1996
1997                 /* get the current generation number */
1998                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
1999                 if (ret != 0) {
2000                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2001                         exit(10);
2002                 }
2003
2004                 if ((recmode == CTDB_RECOVERY_NORMAL)
2005                 &&  (vnnmap->generation != 1)){
2006                         return vnnmap->generation;
2007                 }
2008                 sleep(1);
2009         }
2010 }
2011
2012 /*
2013   ban a node from the cluster
2014  */
2015 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2016 {
2017         int ret;
2018         struct ctdb_node_map *nodemap=NULL;
2019         struct ctdb_ban_time bantime;
2020
2021         if (argc < 1) {
2022                 usage();
2023         }
2024         
2025         /* verify the node exists */
2026         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2029                 return ret;
2030         }
2031
2032         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2033                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2034                 return -1;
2035         }
2036
2037         bantime.pnn  = options.pnn;
2038         bantime.time = strtoul(argv[0], NULL, 0);
2039
2040         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2041         if (ret != 0) {
2042                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2043                 return -1;
2044         }       
2045
2046         ret = control_ipreallocate(ctdb, argc, argv);
2047         if (ret != 0) {
2048                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2049                 return ret;
2050         }
2051
2052         return 0;
2053 }
2054
2055
2056 /*
2057   unban a node from the cluster
2058  */
2059 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2060 {
2061         int ret;
2062         struct ctdb_node_map *nodemap=NULL;
2063         struct ctdb_ban_time bantime;
2064
2065         /* verify the node exists */
2066         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2069                 return ret;
2070         }
2071
2072         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2073                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2074                 return -1;
2075         }
2076
2077         bantime.pnn  = options.pnn;
2078         bantime.time = 0;
2079
2080         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2083                 return -1;
2084         }       
2085
2086         ret = control_ipreallocate(ctdb, argc, argv);
2087         if (ret != 0) {
2088                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2089                 return ret;
2090         }
2091
2092         return 0;
2093 }
2094
2095
2096 /*
2097   show ban information for a node
2098  */
2099 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2100 {
2101         int ret;
2102         struct ctdb_node_map *nodemap=NULL;
2103         struct ctdb_ban_time *bantime;
2104
2105         /* verify the node exists */
2106         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2109                 return ret;
2110         }
2111
2112         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2113         if (ret != 0) {
2114                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2115                 return -1;
2116         }       
2117
2118         if (bantime->time == 0) {
2119                 printf("Node %u is not banned\n", bantime->pnn);
2120         } else {
2121                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2122         }
2123
2124         return 0;
2125 }
2126
2127 /*
2128   shutdown a daemon
2129  */
2130 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2131 {
2132         int ret;
2133
2134         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2135         if (ret != 0) {
2136                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2137                 return ret;
2138         }
2139
2140         return 0;
2141 }
2142
2143 /*
2144   trigger a recovery
2145  */
2146 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2147 {
2148         int ret;
2149         uint32_t generation, next_generation;
2150
2151         /* record the current generation number */
2152         generation = get_generation(ctdb);
2153
2154         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2155         if (ret != 0) {
2156                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2157                 return ret;
2158         }
2159
2160         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2161         if (ret != 0) {
2162                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2163                 return ret;
2164         }
2165
2166         /* wait until we are in a new generation */
2167         while (1) {
2168                 next_generation = get_generation(ctdb);
2169                 if (next_generation != generation) {
2170                         return 0;
2171                 }
2172                 sleep(1);
2173         }
2174
2175         return 0;
2176 }
2177
2178
2179 /*
2180   display monitoring mode of a remote node
2181  */
2182 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2183 {
2184         uint32_t monmode;
2185         int ret;
2186
2187         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2188         if (ret != 0) {
2189                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2190                 return ret;
2191         }
2192         if (!options.machinereadable){
2193                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2194         } else {
2195                 printf(":mode:\n");
2196                 printf(":%d:\n",monmode);
2197         }
2198         return 0;
2199 }
2200
2201
2202 /*
2203   display capabilities of a remote node
2204  */
2205 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2206 {
2207         uint32_t capabilities;
2208         int ret;
2209
2210         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2211         if (ret != 0) {
2212                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2213                 return ret;
2214         }
2215         
2216         if (!options.machinereadable){
2217                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2218                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2219                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2220                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2221         } else {
2222                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2223                 printf(":%d:%d:%d:%d:\n",
2224                         !!(capabilities&CTDB_CAP_RECMASTER),
2225                         !!(capabilities&CTDB_CAP_LMASTER),
2226                         !!(capabilities&CTDB_CAP_LVS),
2227                         !!(capabilities&CTDB_CAP_NATGW));
2228         }
2229         return 0;
2230 }
2231
2232 /*
2233   display lvs configuration
2234  */
2235 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2236 {
2237         uint32_t *capabilities;
2238         struct ctdb_node_map *nodemap=NULL;
2239         int i, ret;
2240         int healthy_count = 0;
2241
2242         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2243         if (ret != 0) {
2244                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2245                 return ret;
2246         }
2247
2248         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2249         CTDB_NO_MEMORY(ctdb, capabilities);
2250         
2251         /* collect capabilities for all connected nodes */
2252         for (i=0; i<nodemap->num; i++) {
2253                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2254                         continue;
2255                 }
2256                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2257                         continue;
2258                 }
2259         
2260                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2261                 if (ret != 0) {
2262                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2263                         return ret;
2264                 }
2265
2266                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2267                         continue;
2268                 }
2269
2270                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2271                         healthy_count++;
2272                 }
2273         }
2274
2275         /* Print all LVS nodes */
2276         for (i=0; i<nodemap->num; i++) {
2277                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2278                         continue;
2279                 }
2280                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2281                         continue;
2282                 }
2283                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2284                         continue;
2285                 }
2286
2287                 if (healthy_count != 0) {
2288                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2289                                 continue;
2290                         }
2291                 }
2292
2293                 printf("%d:%s\n", i, 
2294                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2295         }
2296
2297         return 0;
2298 }
2299
2300 /*
2301   display who is the lvs master
2302  */
2303 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2304 {
2305         uint32_t *capabilities;
2306         struct ctdb_node_map *nodemap=NULL;
2307         int i, ret;
2308         int healthy_count = 0;
2309
2310         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2313                 return ret;
2314         }
2315
2316         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2317         CTDB_NO_MEMORY(ctdb, capabilities);
2318         
2319         /* collect capabilities for all connected nodes */
2320         for (i=0; i<nodemap->num; i++) {
2321                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2322                         continue;
2323                 }
2324                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2325                         continue;
2326                 }
2327         
2328                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2329                 if (ret != 0) {
2330                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2331                         return ret;
2332                 }
2333
2334                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2335                         continue;
2336                 }
2337
2338                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2339                         healthy_count++;
2340                 }
2341         }
2342
2343         /* find and show the lvsmaster */
2344         for (i=0; i<nodemap->num; i++) {
2345                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2346                         continue;
2347                 }
2348                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2349                         continue;
2350                 }
2351                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2352                         continue;
2353                 }
2354
2355                 if (healthy_count != 0) {
2356                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2357                                 continue;
2358                         }
2359                 }
2360
2361                 if (options.machinereadable){
2362                         printf("%d\n", i);
2363                 } else {
2364                         printf("Node %d is LVS master\n", i);
2365                 }
2366                 return 0;
2367         }
2368
2369         printf("There is no LVS master\n");
2370         return -1;
2371 }
2372
2373 /*
2374   disable monitoring on a  node
2375  */
2376 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2377 {
2378         
2379         int ret;
2380
2381         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2382         if (ret != 0) {
2383                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2384                 return ret;
2385         }
2386         printf("Monitoring mode:%s\n","DISABLED");
2387
2388         return 0;
2389 }
2390
2391 /*
2392   enable monitoring on a  node
2393  */
2394 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2395 {
2396         
2397         int ret;
2398
2399         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2400         if (ret != 0) {
2401                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2402                 return ret;
2403         }
2404         printf("Monitoring mode:%s\n","ACTIVE");
2405
2406         return 0;
2407 }
2408
2409 /*
2410   display remote list of keys/data for a db
2411  */
2412 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2413 {
2414         const char *db_name;
2415         struct ctdb_db_context *ctdb_db;
2416         int ret;
2417
2418         if (argc < 1) {
2419                 usage();
2420         }
2421
2422         db_name = argv[0];
2423
2424
2425         if (db_exists(ctdb, db_name)) {
2426                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2427                 return -1;
2428         }
2429
2430         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2431
2432         if (ctdb_db == NULL) {
2433                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2434                 return -1;
2435         }
2436
2437         /* traverse and dump the cluster tdb */
2438         ret = ctdb_dump_db(ctdb_db, stdout);
2439         if (ret == -1) {
2440                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2441                 return -1;
2442         }
2443         talloc_free(ctdb_db);
2444
2445         printf("Dumped %d records\n", ret);
2446         return 0;
2447 }
2448
2449
2450 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2451                              TDB_DATA data, void *private_data)
2452 {
2453         DEBUG(DEBUG_ERR,("Log data received\n"));
2454         if (data.dsize > 0) {
2455                 printf("%s", data.dptr);
2456         }
2457
2458         exit(0);
2459 }
2460
2461 /*
2462   display a list of log messages from the in memory ringbuffer
2463  */
2464 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2465 {
2466         int ret;
2467         int32_t res;
2468         struct ctdb_get_log_addr log_addr;
2469         TDB_DATA data;
2470         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2471         char *errmsg;
2472         struct timeval tv;
2473
2474         if (argc != 1) {
2475                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2476                 talloc_free(tmp_ctx);
2477                 return -1;
2478         }
2479
2480         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2481         log_addr.srvid = getpid();
2482         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2483                 log_addr.level = get_debug_by_desc(argv[0]);
2484         } else {
2485                 log_addr.level = strtol(argv[0], NULL, 0);
2486         }
2487
2488
2489         data.dptr = (unsigned char *)&log_addr;
2490         data.dsize = sizeof(log_addr);
2491
2492         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2493
2494         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2495         sleep(1);
2496
2497         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2498
2499         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2500                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2501         if (ret != 0 || res != 0) {
2502                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2503                 talloc_free(tmp_ctx);
2504                 return -1;
2505         }
2506
2507
2508         tv = timeval_current();
2509         /* this loop will terminate when we have received the reply */
2510         while (timeval_elapsed(&tv) < 3.0) {    
2511                 event_loop_once(ctdb->ev);
2512         }
2513
2514         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2515
2516         talloc_free(tmp_ctx);
2517         return 0;
2518 }
2519
2520 /*
2521   clear the in memory log area
2522  */
2523 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2524 {
2525         int ret;
2526         int32_t res;
2527         char *errmsg;
2528         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2529
2530         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2531                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2532         if (ret != 0 || res != 0) {
2533                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2534                 talloc_free(tmp_ctx);
2535                 return -1;
2536         }
2537
2538         talloc_free(tmp_ctx);
2539         return 0;
2540 }
2541
2542
2543
2544 /*
2545   display a list of the databases on a remote ctdb
2546  */
2547 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2548 {
2549         int i, ret;
2550         struct ctdb_dbid_map *dbmap=NULL;
2551
2552         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2555                 return ret;
2556         }
2557
2558         printf("Number of databases:%d\n", dbmap->num);
2559         for(i=0;i<dbmap->num;i++){
2560                 const char *path;
2561                 const char *name;
2562                 const char *health;
2563                 bool persistent;
2564
2565                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2566                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2567                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2568                 persistent = dbmap->dbs[i].persistent;
2569                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2570                        dbmap->dbs[i].dbid, name, path,
2571                        persistent?" PERSISTENT":"",
2572                        health?" UNHEALTHY":"");
2573         }
2574
2575         return 0;
2576 }
2577
2578 /*
2579   display the status of a database on a remote ctdb
2580  */
2581 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2582 {
2583         int i, ret;
2584         struct ctdb_dbid_map *dbmap=NULL;
2585         const char *db_name;
2586
2587         if (argc < 1) {
2588                 usage();
2589         }
2590
2591         db_name = argv[0];
2592
2593         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2594         if (ret != 0) {
2595                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2596                 return ret;
2597         }
2598
2599         for(i=0;i<dbmap->num;i++){
2600                 const char *path;
2601                 const char *name;
2602                 const char *health;
2603                 bool persistent;
2604
2605                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2606                 if (strcmp(name, db_name) != 0) {
2607                         continue;
2608                 }
2609
2610                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2611                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2612                 persistent = dbmap->dbs[i].persistent;
2613                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2614                        dbmap->dbs[i].dbid, name, path,
2615                        persistent?"yes":"no",
2616                        health?health:"OK");
2617                 return 0;
2618         }
2619
2620         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2621         return 0;
2622 }
2623
2624 /*
2625   check if the local node is recmaster or not
2626   it will return 1 if this node is the recmaster and 0 if it is not
2627   or if the local ctdb daemon could not be contacted
2628  */
2629 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2630 {
2631         uint32_t mypnn, recmaster;
2632         int ret;
2633
2634         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2635         if (mypnn == -1) {
2636                 printf("Failed to get pnn of node\n");
2637                 return 1;
2638         }
2639
2640         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2641         if (ret != 0) {
2642                 printf("Failed to get the recmaster\n");
2643                 return 1;
2644         }
2645
2646         if (recmaster != mypnn) {
2647                 printf("this node is not the recmaster\n");
2648                 return 1;
2649         }
2650
2651         printf("this node is the recmaster\n");
2652         return 0;
2653 }
2654
2655 /*
2656   ping a node
2657  */
2658 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2659 {
2660         int ret;
2661         struct timeval tv = timeval_current();
2662         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2663         if (ret == -1) {
2664                 printf("Unable to get ping response from node %u\n", options.pnn);
2665                 return -1;
2666         } else {
2667                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2668                        options.pnn, timeval_elapsed(&tv), ret);
2669         }
2670         return 0;
2671 }
2672
2673
2674 /*
2675   get a tunable
2676  */
2677 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2678 {
2679         const char *name;
2680         uint32_t value;
2681         int ret;
2682
2683         if (argc < 1) {
2684                 usage();
2685         }
2686
2687         name = argv[0];
2688         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2689         if (ret == -1) {
2690                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2691                 return -1;
2692         }
2693
2694         printf("%-19s = %u\n", name, value);
2695         return 0;
2696 }
2697
2698 /*
2699   set a tunable
2700  */
2701 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2702 {
2703         const char *name;
2704         uint32_t value;
2705         int ret;
2706
2707         if (argc < 2) {
2708                 usage();
2709         }
2710
2711         name = argv[0];
2712         value = strtoul(argv[1], NULL, 0);
2713
2714         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2715         if (ret == -1) {
2716                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2717                 return -1;
2718         }
2719         return 0;
2720 }
2721
2722 /*
2723   list all tunables
2724  */
2725 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2726 {
2727         uint32_t count;
2728         const char **list;
2729         int ret, i;
2730
2731         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2732         if (ret == -1) {
2733                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2734                 return -1;
2735         }
2736
2737         for (i=0;i<count;i++) {
2738                 control_getvar(ctdb, 1, &list[i]);
2739         }
2740
2741         talloc_free(list);
2742         
2743         return 0;
2744 }
2745
2746 /*
2747   display debug level on a node
2748  */
2749 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2750 {
2751         int ret;
2752         int32_t level;
2753
2754         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2755         if (ret != 0) {
2756                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2757                 return ret;
2758         } else {
2759                 if (options.machinereadable){
2760                         printf(":Name:Level:\n");
2761                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2762                 } else {
2763                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2764                 }
2765         }
2766         return 0;
2767 }
2768
2769 /*
2770   display reclock file of a node
2771  */
2772 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2773 {
2774         int ret;
2775         const char *reclock;
2776
2777         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2778         if (ret != 0) {
2779                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2780                 return ret;
2781         } else {
2782                 if (options.machinereadable){
2783                         if (reclock != NULL) {
2784                                 printf("%s", reclock);
2785                         }
2786                 } else {
2787                         if (reclock == NULL) {
2788                                 printf("No reclock file used.\n");
2789                         } else {
2790                                 printf("Reclock file:%s\n", reclock);
2791                         }
2792                 }
2793         }
2794         return 0;
2795 }
2796
2797 /*
2798   set the reclock file of a node
2799  */
2800 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2801 {
2802         int ret;
2803         const char *reclock;
2804
2805         if (argc == 0) {
2806                 reclock = NULL;
2807         } else if (argc == 1) {
2808                 reclock = argv[0];
2809         } else {
2810                 usage();
2811         }
2812
2813         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2814         if (ret != 0) {
2815                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2816                 return ret;
2817         }
2818         return 0;
2819 }
2820
2821 /*
2822   set the natgw state on/off
2823  */
2824 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2825 {
2826         int ret;
2827         uint32_t natgwstate;
2828
2829         if (argc == 0) {
2830                 usage();
2831         }
2832
2833         if (!strcmp(argv[0], "on")) {
2834                 natgwstate = 1;
2835         } else if (!strcmp(argv[0], "off")) {
2836                 natgwstate = 0;
2837         } else {
2838                 usage();
2839         }
2840
2841         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2842         if (ret != 0) {
2843                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2844                 return ret;
2845         }
2846
2847         return 0;
2848 }
2849
2850 /*
2851   set the lmaster role on/off
2852  */
2853 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2854 {
2855         int ret;
2856         uint32_t lmasterrole;
2857
2858         if (argc == 0) {
2859                 usage();
2860         }
2861
2862         if (!strcmp(argv[0], "on")) {
2863                 lmasterrole = 1;
2864         } else if (!strcmp(argv[0], "off")) {
2865                 lmasterrole = 0;
2866         } else {
2867                 usage();
2868         }
2869
2870         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2871         if (ret != 0) {
2872                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2873                 return ret;
2874         }
2875
2876         return 0;
2877 }
2878
2879 /*
2880   set the recmaster role on/off
2881  */
2882 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2883 {
2884         int ret;
2885         uint32_t recmasterrole;
2886
2887         if (argc == 0) {
2888                 usage();
2889         }
2890
2891         if (!strcmp(argv[0], "on")) {
2892                 recmasterrole = 1;
2893         } else if (!strcmp(argv[0], "off")) {
2894                 recmasterrole = 0;
2895         } else {
2896                 usage();
2897         }
2898
2899         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2900         if (ret != 0) {
2901                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2902                 return ret;
2903         }
2904
2905         return 0;
2906 }
2907
2908 /*
2909   set debug level on a node or all nodes
2910  */
2911 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2912 {
2913         int i, ret;
2914         int32_t level;
2915
2916         if (argc == 0) {
2917                 printf("You must specify the debug level. Valid levels are:\n");
2918                 for (i=0; debug_levels[i].description != NULL; i++) {
2919                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2920                 }
2921
2922                 return 0;
2923         }
2924
2925         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2926                 level = get_debug_by_desc(argv[0]);
2927         } else {
2928                 level = strtol(argv[0], NULL, 0);
2929         }
2930
2931         for (i=0; debug_levels[i].description != NULL; i++) {
2932                 if (level == debug_levels[i].level) {
2933                         break;
2934                 }
2935         }
2936         if (debug_levels[i].description == NULL) {
2937                 printf("Invalid debug level, must be one of\n");
2938                 for (i=0; debug_levels[i].description != NULL; i++) {
2939                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2940                 }
2941                 return -1;
2942         }
2943
2944         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2945         if (ret != 0) {
2946                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2947         }
2948         return 0;
2949 }
2950
2951
2952 /*
2953   freeze a node
2954  */
2955 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2956 {
2957         int ret;
2958         uint32_t priority;
2959         
2960         if (argc == 1) {
2961                 priority = strtol(argv[0], NULL, 0);
2962         } else {
2963                 priority = 0;
2964         }
2965         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2966
2967         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2968         if (ret != 0) {
2969                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2970         }               
2971         return 0;
2972 }
2973
2974 /*
2975   thaw a node
2976  */
2977 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
2978 {
2979         int ret;
2980         uint32_t priority;
2981         
2982         if (argc == 1) {
2983                 priority = strtol(argv[0], NULL, 0);
2984         } else {
2985                 priority = 0;
2986         }
2987         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
2988
2989         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2990         if (ret != 0) {
2991                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
2992         }               
2993         return 0;
2994 }
2995
2996
2997 /*
2998   attach to a database
2999  */
3000 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3001 {
3002         const char *db_name;
3003         struct ctdb_db_context *ctdb_db;
3004
3005         if (argc < 1) {
3006                 usage();
3007         }
3008         db_name = argv[0];
3009
3010         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3011         if (ctdb_db == NULL) {
3012                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3013                 return -1;
3014         }
3015
3016         return 0;
3017 }
3018
3019 /*
3020   set db priority
3021  */
3022 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3023 {
3024         struct ctdb_db_priority db_prio;
3025         int ret;
3026
3027         if (argc < 2) {
3028                 usage();
3029         }
3030
3031         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3032         db_prio.priority = strtoul(argv[1], NULL, 0);
3033
3034         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3035         if (ret != 0) {
3036                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3037                 return -1;
3038         }
3039
3040         return 0;
3041 }
3042
3043 /*
3044   get db priority
3045  */
3046 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3047 {
3048         uint32_t db_id, priority;
3049         int ret;
3050
3051         if (argc < 1) {
3052                 usage();
3053         }
3054
3055         db_id = strtoul(argv[0], NULL, 0);
3056
3057         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3058         if (ret != 0) {
3059                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3060                 return -1;
3061         }
3062
3063         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3064
3065         return 0;
3066 }
3067
3068 /*
3069   run an eventscript on a node
3070  */
3071 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3072 {
3073         TDB_DATA data;
3074         int ret;
3075         int32_t res;
3076         char *errmsg;
3077         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3078
3079         if (argc != 1) {
3080                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3081                 return -1;
3082         }
3083
3084         data.dptr = (unsigned char *)discard_const(argv[0]);
3085         data.dsize = strlen((char *)data.dptr) + 1;
3086
3087         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3088
3089         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3090                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3091         if (ret != 0 || res != 0) {
3092                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3093                 talloc_free(tmp_ctx);
3094                 return -1;
3095         }
3096         talloc_free(tmp_ctx);
3097         return 0;
3098 }
3099
3100 #define DB_VERSION 1
3101 #define MAX_DB_NAME 64
3102 struct db_file_header {
3103         unsigned long version;
3104         time_t timestamp;
3105         unsigned long persistent;
3106         unsigned long size;
3107         const char name[MAX_DB_NAME];
3108 };
3109
3110 struct backup_data {
3111         struct ctdb_marshall_buffer *records;
3112         uint32_t len;
3113         uint32_t total;
3114         bool traverse_error;
3115 };
3116
3117 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3118 {
3119         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3120         struct ctdb_rec_data *rec;
3121
3122         /* add the record */
3123         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3124         if (rec == NULL) {
3125                 bd->traverse_error = true;
3126                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3127                 return -1;
3128         }
3129         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3130         if (bd->records == NULL) {
3131                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3132                 bd->traverse_error = true;
3133                 return -1;
3134         }
3135         bd->records->count++;
3136         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3137         bd->len += rec->length;
3138         talloc_free(rec);
3139
3140         bd->total++;
3141         return 0;
3142 }
3143
3144 /*
3145  * backup a database to a file 
3146  */
3147 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3148 {
3149         int i, ret;
3150         struct ctdb_dbid_map *dbmap=NULL;
3151         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3152         struct db_file_header dbhdr;
3153         struct ctdb_db_context *ctdb_db;
3154         struct backup_data *bd;
3155         int fh = -1;
3156         int status = -1;
3157
3158         if (argc != 2) {
3159                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3160                 return -1;
3161         }
3162
3163         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3164         if (ret != 0) {
3165                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3166                 return ret;
3167         }
3168
3169         for(i=0;i<dbmap->num;i++){
3170                 const char *name;
3171
3172                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3173                 if(!strcmp(argv[0], name)){
3174                         talloc_free(discard_const(name));
3175                         break;
3176                 }
3177                 talloc_free(discard_const(name));
3178         }
3179         if (i == dbmap->num) {
3180                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3181                 talloc_free(tmp_ctx);
3182                 return -1;
3183         }
3184
3185
3186         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3187         if (ctdb_db == NULL) {
3188                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3189                 talloc_free(tmp_ctx);
3190                 return -1;
3191         }
3192
3193
3194         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3195         if (ret == -1) {
3196                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3197                 talloc_free(tmp_ctx);
3198                 return -1;
3199         }
3200
3201
3202         bd = talloc_zero(tmp_ctx, struct backup_data);
3203         if (bd == NULL) {
3204                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3205                 talloc_free(tmp_ctx);
3206                 return -1;
3207         }
3208
3209         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3210         if (bd->records == NULL) {
3211                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3212                 talloc_free(tmp_ctx);
3213                 return -1;
3214         }
3215
3216         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3217         bd->records->db_id = ctdb_db->db_id;
3218         /* traverse the database collecting all records */
3219         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3220             bd->traverse_error) {
3221                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3222                 talloc_free(tmp_ctx);
3223                 return -1;              
3224         }
3225
3226         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3227
3228
3229         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3230         if (fh == -1) {
3231                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3232                 talloc_free(tmp_ctx);
3233                 return -1;
3234         }
3235
3236         dbhdr.version = DB_VERSION;
3237         dbhdr.timestamp = time(NULL);
3238         dbhdr.persistent = dbmap->dbs[i].persistent;
3239         dbhdr.size = bd->len;
3240         if (strlen(argv[0]) >= MAX_DB_NAME) {
3241                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3242                 goto done;
3243         }
3244         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3245         ret = write(fh, &dbhdr, sizeof(dbhdr));
3246         if (ret == -1) {
3247                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3248                 goto done;
3249         }
3250         ret = write(fh, bd->records, bd->len);
3251         if (ret == -1) {
3252                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3253                 goto done;
3254         }
3255
3256         status = 0;
3257 done:
3258         if (fh != -1) {
3259                 ret = close(fh);
3260                 if (ret == -1) {
3261                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3262                 }
3263         }
3264         talloc_free(tmp_ctx);
3265         return status;
3266 }
3267
3268 /*
3269  * restore a database from a file 
3270  */
3271 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3272 {
3273         int ret;
3274         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3275         TDB_DATA outdata;
3276         TDB_DATA data;
3277         struct db_file_header dbhdr;
3278         struct ctdb_db_context *ctdb_db;
3279         struct ctdb_node_map *nodemap=NULL;
3280         struct ctdb_vnn_map *vnnmap=NULL;
3281         int i, fh;
3282         struct ctdb_control_wipe_database w;
3283         uint32_t *nodes;
3284         uint32_t generation;
3285         struct tm *tm;
3286         char tbuf[100];
3287
3288         if (argc != 1) {
3289                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3290                 return -1;
3291         }
3292
3293         fh = open(argv[0], O_RDONLY);
3294         if (fh == -1) {
3295                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3296                 talloc_free(tmp_ctx);
3297                 return -1;
3298         }
3299
3300         read(fh, &dbhdr, sizeof(dbhdr));
3301         if (dbhdr.version != DB_VERSION) {
3302                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3303                 talloc_free(tmp_ctx);
3304                 return -1;
3305         }
3306
3307         outdata.dsize = dbhdr.size;
3308         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3309         if (outdata.dptr == NULL) {
3310                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3311                 close(fh);
3312                 talloc_free(tmp_ctx);
3313                 return -1;
3314         }               
3315         read(fh, outdata.dptr, outdata.dsize);
3316         close(fh);
3317
3318         tm = localtime(&dbhdr.timestamp);
3319         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3320         printf("Restoring database '%s' from backup @ %s\n",
3321                 dbhdr.name, tbuf);
3322
3323
3324         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3325         if (ctdb_db == NULL) {
3326                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3327                 talloc_free(tmp_ctx);
3328                 return -1;
3329         }
3330
3331         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3332         if (ret != 0) {
3333                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3334                 talloc_free(tmp_ctx);
3335                 return ret;
3336         }
3337
3338
3339         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3340         if (ret != 0) {
3341                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3342                 talloc_free(tmp_ctx);
3343                 return ret;
3344         }
3345
3346         /* freeze all nodes */
3347         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3348         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3349                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3350                                         nodes, i,
3351                                         TIMELIMIT(),
3352                                         false, tdb_null,
3353                                         NULL, NULL,
3354                                         NULL) != 0) {
3355                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3356                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3357                         talloc_free(tmp_ctx);
3358                         return -1;
3359                 }
3360         }
3361
3362         generation = vnnmap->generation;
3363         data.dptr = (void *)&generation;
3364         data.dsize = sizeof(generation);
3365
3366         /* start a cluster wide transaction */
3367         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3368         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3369                                         nodes, 0,
3370                                         TIMELIMIT(), false, data,
3371                                         NULL, NULL,
3372                                         NULL) != 0) {
3373                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3374                 return -1;
3375         }
3376
3377
3378         w.db_id = ctdb_db->db_id;
3379         w.transaction_id = generation;
3380
3381         data.dptr = (void *)&w;
3382         data.dsize = sizeof(w);
3383
3384         /* wipe all the remote databases. */
3385         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3386         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3387                                         nodes, 0,
3388                                         TIMELIMIT(), false, data,
3389                                         NULL, NULL,
3390                                         NULL) != 0) {
3391                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3392                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3393                 talloc_free(tmp_ctx);
3394                 return -1;
3395         }
3396         
3397         /* push the database */
3398         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3399         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3400                                         nodes, 0,
3401                                         TIMELIMIT(), false, outdata,
3402                                         NULL, NULL,
3403                                         NULL) != 0) {
3404                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3405                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3406                 talloc_free(tmp_ctx);
3407                 return -1;
3408         }
3409
3410         data.dptr = (void *)&generation;
3411         data.dsize = sizeof(generation);
3412
3413         /* commit all the changes */
3414         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3415                                         nodes, 0,
3416                                         TIMELIMIT(), false, data,
3417                                         NULL, NULL,
3418                                         NULL) != 0) {
3419                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3420                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3421                 talloc_free(tmp_ctx);
3422                 return -1;
3423         }
3424
3425
3426         /* thaw all nodes */
3427         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3428         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3429                                         nodes, 0,
3430                                         TIMELIMIT(),
3431                                         false, tdb_null,
3432                                         NULL, NULL,
3433                                         NULL) != 0) {
3434                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3435                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3436                 talloc_free(tmp_ctx);
3437                 return -1;
3438         }
3439
3440
3441         talloc_free(tmp_ctx);
3442         return 0;
3443 }
3444
3445 /*
3446  * wipe a database from a file
3447  */
3448 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3449                           const char **argv)
3450 {
3451         int ret;
3452         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3453         TDB_DATA data;
3454         struct ctdb_db_context *ctdb_db;
3455         struct ctdb_node_map *nodemap = NULL;
3456         struct ctdb_vnn_map *vnnmap = NULL;
3457         int i;
3458         struct ctdb_control_wipe_database w;
3459         uint32_t *nodes;
3460         uint32_t generation;
3461         struct ctdb_dbid_map *dbmap = NULL;
3462
3463         if (argc != 1) {
3464                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3465                 return -1;
3466         }
3467
3468         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3469                                  &dbmap);
3470         if (ret != 0) {
3471                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3472                                   options.pnn));
3473                 return ret;
3474         }
3475
3476         for(i=0;i<dbmap->num;i++){
3477                 const char *name;
3478
3479                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3480                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3481                 if(!strcmp(argv[0], name)){
3482                         talloc_free(discard_const(name));
3483                         break;
3484                 }
3485                 talloc_free(discard_const(name));
3486         }
3487         if (i == dbmap->num) {
3488                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3489                                   argv[0]));
3490                 talloc_free(tmp_ctx);
3491                 return -1;
3492         }
3493
3494         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3495         if (ctdb_db == NULL) {
3496                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3497                                   argv[0]));
3498                 talloc_free(tmp_ctx);
3499                 return -1;
3500         }
3501
3502         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3503                                    &nodemap);
3504         if (ret != 0) {
3505                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3506                                   options.pnn));
3507                 talloc_free(tmp_ctx);
3508                 return ret;
3509         }
3510
3511         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3512                                   &vnnmap);
3513         if (ret != 0) {
3514                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3515                                   options.pnn));
3516                 talloc_free(tmp_ctx);
3517                 return ret;
3518         }
3519
3520         /* freeze all nodes */
3521         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3522         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3523                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3524                                                 nodes, i,
3525                                                 TIMELIMIT(),
3526                                                 false, tdb_null,
3527                                                 NULL, NULL,
3528                                                 NULL);
3529                 if (ret != 0) {
3530                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3531                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3532                                              CTDB_RECOVERY_ACTIVE);
3533                         talloc_free(tmp_ctx);
3534                         return -1;
3535                 }
3536         }
3537
3538         generation = vnnmap->generation;
3539         data.dptr = (void *)&generation;
3540         data.dsize = sizeof(generation);
3541
3542         /* start a cluster wide transaction */
3543         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3544         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3545                                         nodes, 0,
3546                                         TIMELIMIT(), false, data,
3547                                         NULL, NULL,
3548                                         NULL);
3549         if (ret!= 0) {
3550                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3551                                   "transactions.\n"));
3552                 return -1;
3553         }
3554
3555         w.db_id = ctdb_db->db_id;
3556         w.transaction_id = generation;
3557
3558         data.dptr = (void *)&w;
3559         data.dsize = sizeof(w);
3560
3561         /* wipe all the remote databases. */
3562         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3563         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3564                                         nodes, 0,
3565                                         TIMELIMIT(), false, data,
3566                                         NULL, NULL,
3567                                         NULL) != 0) {
3568                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3569                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3570                 talloc_free(tmp_ctx);
3571                 return -1;
3572         }
3573
3574         data.dptr = (void *)&generation;
3575         data.dsize = sizeof(generation);
3576
3577         /* commit all the changes */
3578         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3579                                         nodes, 0,
3580                                         TIMELIMIT(), false, data,
3581                                         NULL, NULL,
3582                                         NULL) != 0) {
3583                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3584                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3585                 talloc_free(tmp_ctx);
3586                 return -1;
3587         }
3588
3589         /* thaw all nodes */
3590         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3591         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3592                                         nodes, 0,
3593                                         TIMELIMIT(),
3594                                         false, tdb_null,
3595                                         NULL, NULL,
3596                                         NULL) != 0) {
3597                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3598                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3599                 talloc_free(tmp_ctx);
3600                 return -1;
3601         }
3602
3603         talloc_free(tmp_ctx);
3604         return 0;
3605 }
3606
3607 /*
3608  * set flags of a node in the nodemap
3609  */
3610 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3611 {
3612         int ret;
3613         int32_t status;
3614         int node;
3615         int flags;
3616         TDB_DATA data;
3617         struct ctdb_node_flag_change c;
3618
3619         if (argc != 2) {
3620                 usage();
3621                 return -1;
3622         }
3623
3624         if (sscanf(argv[0], "%d", &node) != 1) {
3625                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3626                 usage();
3627                 return -1;
3628         }
3629         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3630                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3631                 usage();
3632                 return -1;
3633         }
3634
3635         c.pnn       = node;
3636         c.old_flags = 0;
3637         c.new_flags = flags;
3638
3639         data.dsize = sizeof(c);
3640         data.dptr = (unsigned char *)&c;
3641
3642         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3643                            data, NULL, NULL, &status, NULL, NULL);
3644         if (ret != 0 || status != 0) {
3645                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3646                 return -1;
3647         }
3648         return 0;
3649 }
3650
3651 /*
3652   dump memory usage
3653  */
3654 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3655 {
3656         TDB_DATA data;
3657         int ret;
3658         int32_t res;
3659         char *errmsg;
3660         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3661         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3662                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3663         if (ret != 0 || res != 0) {
3664                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3665                 talloc_free(tmp_ctx);
3666                 return -1;
3667         }
3668         write(1, data.dptr, data.dsize);
3669         talloc_free(tmp_ctx);
3670         return 0;
3671 }
3672
3673 /*
3674   handler for memory dumps
3675 */
3676 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3677                              TDB_DATA data, void *private_data)
3678 {
3679         write(1, data.dptr, data.dsize);
3680         exit(0);
3681 }
3682
3683 /*
3684   dump memory usage on the recovery daemon
3685  */
3686 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3687 {
3688         int ret;
3689         TDB_DATA data;
3690         struct rd_memdump_reply rd;
3691
3692         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3693         if (rd.pnn == -1) {
3694                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3695                 return -1;
3696         }
3697         rd.srvid = getpid();
3698
3699         /* register a message port for receiveing the reply so that we
3700            can receive the reply
3701         */
3702         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3703
3704
3705         data.dptr = (uint8_t *)&rd;
3706         data.dsize = sizeof(rd);
3707
3708         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3709         if (ret != 0) {
3710                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3711                 return -1;
3712         }
3713
3714         /* this loop will terminate when we have received the reply */
3715         while (1) {     
3716                 event_loop_once(ctdb->ev);
3717         }
3718
3719         return 0;
3720 }
3721
3722 /*
3723   list all nodes in the cluster
3724   if the daemon is running, we read the data from the daemon.
3725   if the daemon is not running we parse the nodes file directly
3726  */
3727 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3728 {
3729         int i, ret;
3730         struct ctdb_node_map *nodemap=NULL;
3731
3732         if (ctdb != NULL) {
3733                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3734                 if (ret != 0) {
3735                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3736                         return ret;
3737                 }
3738
3739                 for(i=0;i<nodemap->num;i++){
3740                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3741                                 continue;
3742                         }
3743                         if (options.machinereadable){
3744                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3745                         } else {
3746                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3747                         }
3748                 }
3749         } else {
3750                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3751                 struct pnn_node *pnn_nodes;
3752                 struct pnn_node *pnn_node;
3753         
3754                 pnn_nodes = read_nodes_file(mem_ctx);
3755                 if (pnn_nodes == NULL) {
3756                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3757                         talloc_free(mem_ctx);
3758                         return -1;
3759                 }
3760
3761                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3762                         ctdb_sock_addr addr;
3763
3764                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3765                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3766                                 talloc_free(mem_ctx);
3767                                 return -1;
3768                         }
3769
3770                         if (options.machinereadable){
3771                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3772                         } else {
3773                                 printf("%s\n", pnn_node->addr);
3774                         }
3775                 }
3776                 talloc_free(mem_ctx);
3777         }
3778
3779         return 0;
3780 }
3781
3782 /*
3783   reload the nodes file on the local node
3784  */
3785 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3786 {
3787         int i, ret;
3788         int mypnn;
3789         struct ctdb_node_map *nodemap=NULL;
3790
3791         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3792         if (mypnn == -1) {
3793                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3794                 return -1;
3795         }
3796
3797         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3798         if (ret != 0) {
3799                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3800                 return ret;
3801         }
3802
3803         /* reload the nodes file on all remote nodes */
3804         for (i=0;i<nodemap->num;i++) {
3805                 if (nodemap->nodes[i].pnn == mypnn) {
3806                         continue;
3807                 }
3808                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3809                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3810                         nodemap->nodes[i].pnn);
3811                 if (ret != 0) {
3812                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3813                 }
3814         }
3815
3816         /* reload the nodes file on the local node */
3817         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3818         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3819         if (ret != 0) {
3820                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3821         }
3822
3823         /* initiate a recovery */
3824         control_recover(ctdb, argc, argv);
3825
3826         return 0;
3827 }
3828
3829
3830 static const struct {
3831         const char *name;
3832         int (*fn)(struct ctdb_context *, int, const char **);
3833         bool auto_all;
3834         bool without_daemon; /* can be run without daemon running ? */
3835         const char *msg;
3836         const char *args;
3837 } ctdb_commands[] = {
3838 #ifdef CTDB_VERS
3839         { "version",         control_version,           true,   false,  "show version of ctdb" },
3840 #endif
3841         { "status",          control_status,            true,   false,  "show node status" },
3842         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
3843         { "ping",            control_ping,              true,   false,  "ping all nodes" },
3844         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
3845         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
3846         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
3847         { "statistics",      control_statistics,        false,  false, "show statistics" },
3848         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
3849         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
3850         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
3851         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
3852         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
3853         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
3854         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
3855         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
3856         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
3857         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
3858         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
3859         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
3860         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
3861         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
3862         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
3863         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
3864         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
3865         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
3866         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
3867         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
3868         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
3869         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
3870         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
3871         { "stop",            control_stop,              true,   false,  "stop a node" },
3872         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
3873         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
3874         { "unban",           control_unban,             true,   false,  "unban a node" },
3875         { "showban",         control_showban,           true,   false,  "show ban information"},
3876         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
3877         { "recover",         control_recover,           true,   false,  "force recovery" },
3878         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
3879         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
3880         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
3881         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
3882         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
3883         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
3884         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
3885         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
3886
3887         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
3888         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
3889         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
3890         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
3891         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
3892         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
3893         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
3894         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
3895         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
3896         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
3897         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
3898         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
3899         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
3900         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
3901         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
3902         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
3903         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
3904         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
3905         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
3906         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
3907         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
3908         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
3909         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
3910         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
3911         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
3912         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
3913         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
3914         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
3915         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
3916 };
3917
3918 /*
3919   show usage message
3920  */
3921 static void usage(void)
3922 {
3923         int i;
3924         printf(
3925 "Usage: ctdb [options] <control>\n" \
3926 "Options:\n" \
3927 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
3928 "   -Y                 generate machinereadable output\n"
3929 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
3930         printf("Controls:\n");
3931         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3932                 printf("  %-15s %-27s  %s\n", 
3933                        ctdb_commands[i].name, 
3934                        ctdb_commands[i].args?ctdb_commands[i].args:"",
3935                        ctdb_commands[i].msg);
3936         }
3937         exit(1);
3938 }
3939
3940
3941 static void ctdb_alarm(int sig)
3942 {
3943         printf("Maximum runtime exceeded - exiting\n");
3944         _exit(ERR_TIMEOUT);
3945 }
3946
3947 /*
3948   main program
3949 */
3950 int main(int argc, const char *argv[])
3951 {
3952         struct ctdb_context *ctdb;
3953         char *nodestring = NULL;
3954         struct poptOption popt_options[] = {
3955                 POPT_AUTOHELP
3956                 POPT_CTDB_CMDLINE
3957                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
3958                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
3959                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
3960                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
3961                 POPT_TABLEEND
3962         };
3963         int opt;
3964         const char **extra_argv;
3965         int extra_argc = 0;
3966         int ret=-1, i;
3967         poptContext pc;
3968         struct event_context *ev;
3969         const char *control;
3970
3971         setlinebuf(stdout);
3972         
3973         /* set some defaults */
3974         options.maxruntime = 0;
3975         options.timelimit = 3;
3976         options.pnn = CTDB_CURRENT_NODE;
3977
3978         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
3979
3980         while ((opt = poptGetNextOpt(pc)) != -1) {
3981                 switch (opt) {
3982                 default:
3983                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
3984                                 poptBadOption(pc, 0), poptStrerror(opt)));
3985                         exit(1);
3986                 }
3987         }
3988
3989         /* setup the remaining options for the main program to use */
3990         extra_argv = poptGetArgs(pc);
3991         if (extra_argv) {
3992                 extra_argv++;
3993                 while (extra_argv[extra_argc]) extra_argc++;
3994         }
3995
3996         if (extra_argc < 1) {
3997                 usage();
3998         }
3999
4000         if (options.maxruntime == 0) {
4001                 const char *ctdb_timeout;
4002                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4003                 if (ctdb_timeout != NULL) {
4004                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4005                 } else {
4006                         /* default timeout is 120 seconds */
4007                         options.maxruntime = 120;
4008                 }
4009         }
4010
4011         signal(SIGALRM, ctdb_alarm);
4012         alarm(options.maxruntime);
4013
4014         /* setup the node number to contact */
4015         if (nodestring != NULL) {
4016                 if (strcmp(nodestring, "all") == 0) {
4017                         options.pnn = CTDB_BROADCAST_ALL;
4018                 } else {
4019                         options.pnn = strtoul(nodestring, NULL, 0);
4020                 }
4021         }
4022
4023         control = extra_argv[0];
4024
4025         ev = event_context_init(NULL);
4026
4027         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4028                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4029                         int j;
4030
4031                         if (ctdb_commands[i].without_daemon == true) {
4032                                 close(2);
4033                         }
4034
4035                         /* initialise ctdb */
4036                         ctdb = ctdb_cmdline_client(ev);
4037
4038                         if (ctdb_commands[i].without_daemon == false) {
4039                                 if (ctdb == NULL) {
4040                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4041                                         exit(1);
4042                                 }
4043
4044                                 /* verify the node exists */
4045                                 verify_node(ctdb);
4046
4047                                 if (options.pnn == CTDB_CURRENT_NODE) {
4048                                         int pnn;
4049                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4050                                         if (pnn == -1) {
4051                                                 return -1;
4052                                         }
4053                                         options.pnn = pnn;
4054                                 }
4055                         }
4056
4057                         if (ctdb_commands[i].auto_all && 
4058                             options.pnn == CTDB_BROADCAST_ALL) {
4059                                 uint32_t *nodes;
4060                                 uint32_t num_nodes;
4061                                 ret = 0;
4062
4063                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4064                                 CTDB_NO_MEMORY(ctdb, nodes);
4065         
4066                                 for (j=0;j<num_nodes;j++) {
4067                                         options.pnn = nodes[j];
4068                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4069                                 }
4070                                 talloc_free(nodes);
4071                         } else {
4072                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4073                         }
4074                         break;
4075                 }
4076         }
4077
4078         if (i == ARRAY_SIZE(ctdb_commands)) {
4079                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4080                 exit(1);
4081         }
4082
4083         return ret;
4084 }