Merge commit 'obnox/ctdb-wip-trans3' into trans3
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
517                 }
518                 return 0;
519         }
520
521         printf("Number of nodes:%d\n", nodemap->num);
522         for(i=0;i<nodemap->num;i++){
523                 static const struct {
524                         uint32_t flag;
525                         const char *name;
526                 } flag_names[] = {
527                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
528                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
529                         { NODE_FLAGS_BANNED,                "BANNED" },
530                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
531                         { NODE_FLAGS_DELETED,               "DELETED" },
532                         { NODE_FLAGS_STOPPED,               "STOPPED" },
533                 };
534                 char *flags_str = NULL;
535                 int j;
536
537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
538                         continue;
539                 }
540                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
541                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
542                                 if (flags_str == NULL) {
543                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
544                                 } else {
545                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
546                                                                            flag_names[j].name);
547                                 }
548                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
549                         }
550                 }
551                 if (flags_str == NULL) {
552                         flags_str = talloc_strdup(ctdb, "OK");
553                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
554                 }
555                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
556                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
557                        flags_str,
558                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
559                 talloc_free(flags_str);
560         }
561
562         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
563         if (ret != 0) {
564                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
565                 return ret;
566         }
567         if (vnnmap->generation == INVALID_GENERATION) {
568                 printf("Generation:INVALID\n");
569         } else {
570                 printf("Generation:%d\n",vnnmap->generation);
571         }
572         printf("Size:%d\n",vnnmap->size);
573         for(i=0;i<vnnmap->size;i++){
574                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
575         }
576
577         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
580                 return ret;
581         }
582         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
583
584         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
585         if (ret != 0) {
586                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
587                 return ret;
588         }
589         printf("Recovery master:%d\n",recmaster);
590
591         return 0;
592 }
593
594
595 struct natgw_node {
596         struct natgw_node *next;
597         const char *addr;
598 };
599
600 /*
601   display the list of nodes belonging to this natgw configuration
602  */
603 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
604 {
605         int i, ret;
606         const char *natgw_list;
607         int nlines;
608         char **lines;
609         struct natgw_node *natgw_nodes = NULL;
610         struct natgw_node *natgw_node;
611         struct ctdb_node_map *nodemap=NULL;
612
613
614         /* read the natgw nodes file into a linked list */
615         natgw_list = getenv("NATGW_NODES");
616         if (natgw_list == NULL) {
617                 natgw_list = "/etc/ctdb/natgw_nodes";
618         }
619         lines = file_lines_load(natgw_list, &nlines, ctdb);
620         if (lines == NULL) {
621                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
622                 return -1;
623         }
624         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
625                 nlines--;
626         }
627         for (i=0;i<nlines;i++) {
628                 char *node;
629
630                 node = lines[i];
631                 /* strip leading spaces */
632                 while((*node == ' ') || (*node == '\t')) {
633                         node++;
634                 }
635                 if (*node == '#') {
636                         continue;
637                 }
638                 if (strcmp(node, "") == 0) {
639                         continue;
640                 }
641                 natgw_node = talloc(ctdb, struct natgw_node);
642                 natgw_node->addr = talloc_strdup(natgw_node, node);
643                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
644                 natgw_node->next = natgw_nodes;
645                 natgw_nodes = natgw_node;
646         }
647
648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
651                 return ret;
652         }
653
654         i=0;
655         while(i<nodemap->num) {
656                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
657                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
658                                 break;
659                         }
660                 }
661
662                 /* this node was not in the natgw so we just remove it from
663                  * the list
664                  */
665                 if ((natgw_node == NULL) 
666                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
667                         int j;
668
669                         for (j=i+1; j<nodemap->num; j++) {
670                                 nodemap->nodes[j-1] = nodemap->nodes[j];
671                         }
672                         nodemap->num--;
673                         continue;
674                 }
675
676                 i++;
677         }               
678
679         /* pick a node to be natgwmaster
680          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
681          */
682         for(i=0;i<nodemap->num;i++){
683                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
684                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
685                         break;
686                 }
687         }
688         /* we couldnt find any healthy node, try unhealthy ones */
689         if (i == nodemap->num) {
690                 for(i=0;i<nodemap->num;i++){
691                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
692                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
693                                 break;
694                         }
695                 }
696         }
697         /* unless all nodes are STOPPED, when we pick one anyway */
698         if (i == nodemap->num) {
699                 for(i=0;i<nodemap->num;i++){
700                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
701                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
702                                 break;
703                         }
704                 }
705                 /* or if we still can not find any */
706                 if (i == nodemap->num) {
707                         printf("-1 0.0.0.0\n");
708                 }
709         }
710
711         /* print the pruned list of nodes belonging to this natgw list */
712         for(i=0;i<nodemap->num;i++){
713                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
714                         continue;
715                 }
716                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
717                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
718                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
723         }
724
725         return 0;
726 }
727
728 /*
729   display the status of the scripts for monitoring (or other events)
730  */
731 static int control_one_scriptstatus(struct ctdb_context *ctdb,
732                                     enum ctdb_eventscript_call type)
733 {
734         struct ctdb_scripts_wire *script_status;
735         int ret, i;
736
737         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
738         if (ret != 0) {
739                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
740                 return ret;
741         }
742
743         if (script_status == NULL) {
744                 if (!options.machinereadable) {
745                         printf("%s cycle never run\n",
746                                ctdb_eventscript_call_names[type]);
747                 }
748                 return 0;
749         }
750
751         if (!options.machinereadable) {
752                 printf("%d scripts were executed last %s cycle\n",
753                        script_status->num_scripts,
754                        ctdb_eventscript_call_names[type]);
755         }
756         for (i=0; i<script_status->num_scripts; i++) {
757                 const char *status = NULL;
758
759                 switch (script_status->scripts[i].status) {
760                 case -ETIME:
761                         status = "TIMEDOUT";
762                         break;
763                 case -ENOEXEC:
764                         status = "DISABLED";
765                         break;
766                 case 0:
767                         status = "OK";
768                         break;
769                 default:
770                         if (script_status->scripts[i].status > 0)
771                                 status = "ERROR";
772                         break;
773                 }
774                 if (options.machinereadable) {
775                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
776                                ctdb_eventscript_call_names[type],
777                                script_status->scripts[i].name,
778                                script_status->scripts[i].status,
779                                status,
780                                (long)script_status->scripts[i].start.tv_sec,
781                                (long)script_status->scripts[i].start.tv_usec,
782                                (long)script_status->scripts[i].finished.tv_sec,
783                                (long)script_status->scripts[i].finished.tv_usec,
784                                script_status->scripts[i].output);
785                         continue;
786                 }
787                 if (status)
788                         printf("%-20s Status:%s    ",
789                                script_status->scripts[i].name, status);
790                 else
791                         /* Some other error, eg from stat. */
792                         printf("%-20s Status:CANNOT RUN (%s)",
793                                script_status->scripts[i].name,
794                                strerror(-script_status->scripts[i].status));
795
796                 if (script_status->scripts[i].status >= 0) {
797                         printf("Duration:%.3lf ",
798                         timeval_delta(&script_status->scripts[i].finished,
799                               &script_status->scripts[i].start));
800                 }
801                 if (script_status->scripts[i].status != -ENOEXEC) {
802                         printf("%s",
803                                ctime(&script_status->scripts[i].start.tv_sec));
804                         if (script_status->scripts[i].status != 0) {
805                                 printf("   OUTPUT:%s\n",
806                                        script_status->scripts[i].output);
807                         }
808                 } else {
809                         printf("\n");
810                 }
811         }
812         return 0;
813 }
814
815
816 static int control_scriptstatus(struct ctdb_context *ctdb,
817                                 int argc, const char **argv)
818 {
819         int ret;
820         enum ctdb_eventscript_call type, min, max;
821         const char *arg;
822
823         if (argc > 1) {
824                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
825                 return -1;
826         }
827
828         if (argc == 0)
829                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
830         else
831                 arg = argv[0];
832
833         for (type = 0; type < CTDB_EVENT_MAX; type++) {
834                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
835                         min = type;
836                         max = type+1;
837                         break;
838                 }
839         }
840         if (type == CTDB_EVENT_MAX) {
841                 if (strcmp(arg, "all") == 0) {
842                         min = 0;
843                         max = CTDB_EVENT_MAX;
844                 } else {
845                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
846                         return -1;
847                 }
848         }
849
850         if (options.machinereadable) {
851                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
852         }
853
854         for (type = min; type < max; type++) {
855                 ret = control_one_scriptstatus(ctdb, type);
856                 if (ret != 0) {
857                         return ret;
858                 }
859         }
860
861         return 0;
862 }
863
864 /*
865   enable an eventscript
866  */
867 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
868 {
869         int ret;
870
871         if (argc < 1) {
872                 usage();
873         }
874
875         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
876         if (ret != 0) {
877           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
878                 return ret;
879         }
880
881         return 0;
882 }
883
884 /*
885   disable an eventscript
886  */
887 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
888 {
889         int ret;
890
891         if (argc < 1) {
892                 usage();
893         }
894
895         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
896         if (ret != 0) {
897           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
898                 return ret;
899         }
900
901         return 0;
902 }
903
904 /*
905   display the pnn of the recovery master
906  */
907 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
908 {
909         int ret;
910         uint32_t recmaster;
911
912         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
913         if (ret != 0) {
914                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
915                 return ret;
916         }
917         printf("%d\n",recmaster);
918
919         return 0;
920 }
921
922 /*
923   get a list of all tickles for this pnn
924  */
925 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
926 {
927         struct ctdb_control_tcp_tickle_list *list;
928         ctdb_sock_addr addr;
929         int i, ret;
930
931         if (argc < 1) {
932                 usage();
933         }
934
935         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
936                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
937                 return -1;
938         }
939
940         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
941         if (ret == -1) {
942                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
943                 return -1;
944         }
945
946         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
947         printf("Num tickles:%u\n", list->tickles.num);
948         for (i=0;i<list->tickles.num;i++) {
949                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
950                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
951         }
952
953         talloc_free(list);
954         
955         return 0;
956 }
957
958
959
960 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
961 {
962         struct ctdb_all_public_ips *ips;
963         struct ctdb_public_ip ip;
964         int i, ret;
965         uint32_t *nodes;
966         uint32_t disable_time;
967         TDB_DATA data;
968         struct ctdb_node_map *nodemap=NULL;
969         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
970
971         disable_time = 30;
972         data.dptr  = (uint8_t*)&disable_time;
973         data.dsize = sizeof(disable_time);
974         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
975         if (ret != 0) {
976                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
977                 return -1;
978         }
979
980
981
982         /* read the public ip list from the node */
983         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
984         if (ret != 0) {
985                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
986                 talloc_free(tmp_ctx);
987                 return -1;
988         }
989
990         for (i=0;i<ips->num;i++) {
991                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
992                         break;
993                 }
994         }
995         if (i==ips->num) {
996                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
997                         pnn, ctdb_addr_to_str(addr)));
998                 talloc_free(tmp_ctx);
999                 return -1;
1000         }
1001
1002         ip.pnn  = pnn;
1003         ip.addr = *addr;
1004
1005         data.dptr  = (uint8_t *)&ip;
1006         data.dsize = sizeof(ip);
1007
1008         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1009         if (ret != 0) {
1010                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1011                 talloc_free(tmp_ctx);
1012                 return ret;
1013         }
1014
1015         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1016         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1017                                         nodes, 0,
1018                                         TIMELIMIT(),
1019                                         false, data,
1020                                         NULL, NULL,
1021                                         NULL);
1022         if (ret != 0) {
1023                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1024                 talloc_free(tmp_ctx);
1025                 return -1;
1026         }
1027
1028         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1029         if (ret != 0) {
1030                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1031                 talloc_free(tmp_ctx);
1032                 return -1;
1033         }
1034
1035         talloc_free(tmp_ctx);
1036         return 0;
1037 }
1038
1039 /*
1040   move/failover an ip address to a specific node
1041  */
1042 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1043 {
1044         uint32_t pnn;
1045         ctdb_sock_addr addr;
1046
1047         if (argc < 2) {
1048                 usage();
1049                 return -1;
1050         }
1051
1052         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1053                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1054                 return -1;
1055         }
1056
1057
1058         if (sscanf(argv[1], "%u", &pnn) != 1) {
1059                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1060                 return -1;
1061         }
1062
1063         if (move_ip(ctdb, &addr, pnn) != 0) {
1064                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1065                 return -1;
1066         }
1067
1068         return 0;
1069 }
1070
1071 void getips_store_callback(void *param, void *data)
1072 {
1073         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1074         struct ctdb_all_public_ips *ips = param;
1075         int i;
1076
1077         i = ips->num++;
1078         ips->ips[i].pnn  = node_ip->pnn;
1079         ips->ips[i].addr = node_ip->addr;
1080 }
1081
1082 void getips_count_callback(void *param, void *data)
1083 {
1084         uint32_t *count = param;
1085
1086         (*count)++;
1087 }
1088
1089 #define IP_KEYLEN       4
1090 static uint32_t *ip_key(ctdb_sock_addr *ip)
1091 {
1092         static uint32_t key[IP_KEYLEN];
1093
1094         bzero(key, sizeof(key));
1095
1096         switch (ip->sa.sa_family) {
1097         case AF_INET:
1098                 key[0]  = ip->ip.sin_addr.s_addr;
1099                 break;
1100         case AF_INET6:
1101                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1102                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1103                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1104                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1105                 break;
1106         default:
1107                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1108                 return key;
1109         }
1110
1111         return key;
1112 }
1113
1114 static void *add_ip_callback(void *parm, void *data)
1115 {
1116         return parm;
1117 }
1118
1119 static int
1120 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1121 {
1122         struct ctdb_all_public_ips *tmp_ips;
1123         struct ctdb_node_map *nodemap=NULL;
1124         trbt_tree_t *ip_tree;
1125         int i, j, len, ret;
1126         uint32_t count;
1127
1128         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1129         if (ret != 0) {
1130                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1131                 return ret;
1132         }
1133
1134         ip_tree = trbt_create(tmp_ctx, 0);
1135
1136         for(i=0;i<nodemap->num;i++){
1137                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1138                         continue;
1139                 }
1140                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1141                         continue;
1142                 }
1143
1144                 /* read the public ip list from this node */
1145                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1146                 if (ret != 0) {
1147                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1148                         return -1;
1149                 }
1150         
1151                 for (j=0; j<tmp_ips->num;j++) {
1152                         struct ctdb_public_ip *node_ip;
1153
1154                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1155                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1156                         node_ip->addr = tmp_ips->ips[j].addr;
1157
1158                         trbt_insertarray32_callback(ip_tree,
1159                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1160                                 add_ip_callback,
1161                                 node_ip);
1162                 }
1163                 talloc_free(tmp_ips);
1164         }
1165
1166         /* traverse */
1167         count = 0;
1168         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1169
1170         len = offsetof(struct ctdb_all_public_ips, ips) + 
1171                 count*sizeof(struct ctdb_public_ip);
1172         tmp_ips = talloc_zero_size(tmp_ctx, len);
1173         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1174
1175         *ips = tmp_ips;
1176
1177         return 0;
1178 }
1179
1180
1181 /* 
1182  * scans all other nodes and returns a pnn for another node that can host this 
1183  * ip address or -1
1184  */
1185 static int
1186 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1187 {
1188         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1189         struct ctdb_all_public_ips *ips;
1190         struct ctdb_node_map *nodemap=NULL;
1191         int i, j, ret;
1192
1193         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1194         if (ret != 0) {
1195                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1196                 talloc_free(tmp_ctx);
1197                 return ret;
1198         }
1199
1200         for(i=0;i<nodemap->num;i++){
1201                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1202                         continue;
1203                 }
1204                 if (nodemap->nodes[i].pnn == options.pnn) {
1205                         continue;
1206                 }
1207
1208                 /* read the public ip list from this node */
1209                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1210                 if (ret != 0) {
1211                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1212                         return -1;
1213                 }
1214
1215                 for (j=0;j<ips->num;j++) {
1216                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1217                                 talloc_free(tmp_ctx);
1218                                 return nodemap->nodes[i].pnn;
1219                         }
1220                 }
1221                 talloc_free(ips);
1222         }
1223
1224         talloc_free(tmp_ctx);
1225         return -1;
1226 }
1227
1228 /*
1229   add a public ip address to a node
1230  */
1231 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1232 {
1233         int i, ret;
1234         int len;
1235         uint32_t pnn;
1236         unsigned mask;
1237         ctdb_sock_addr addr;
1238         struct ctdb_control_ip_iface *pub;
1239         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1240         struct ctdb_all_public_ips *ips;
1241
1242
1243         if (argc != 2) {
1244                 talloc_free(tmp_ctx);
1245                 usage();
1246         }
1247
1248         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1249                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1250                 talloc_free(tmp_ctx);
1251                 return -1;
1252         }
1253
1254         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1255         if (ret != 0) {
1256                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1257                 talloc_free(tmp_ctx);
1258                 return ret;
1259         }
1260
1261
1262         /* check if some other node is already serving this ip, if not,
1263          * we will claim it
1264          */
1265         for (i=0;i<ips->num;i++) {
1266                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1267                         break;
1268                 }
1269         }
1270
1271         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1272         pub = talloc_size(tmp_ctx, len); 
1273         CTDB_NO_MEMORY(ctdb, pub);
1274
1275         pub->addr  = addr;
1276         pub->mask  = mask;
1277         pub->len   = strlen(argv[1])+1;
1278         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1279
1280         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1281         if (ret != 0) {
1282                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1283                 talloc_free(tmp_ctx);
1284                 return ret;
1285         }
1286
1287         if (i == ips->num) {
1288                 /* no one has this ip so we claim it */
1289                 pnn  = options.pnn;
1290         } else {
1291                 pnn  = ips->ips[i].pnn;
1292         }
1293
1294         if (move_ip(ctdb, &addr, pnn) != 0) {
1295                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1296                 return -1;
1297         }
1298
1299         talloc_free(tmp_ctx);
1300         return 0;
1301 }
1302
1303 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1304
1305 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1306 {
1307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1308         struct ctdb_node_map *nodemap=NULL;
1309         struct ctdb_all_public_ips *ips;
1310         int ret, i, j;
1311
1312         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1313         if (ret != 0) {
1314                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1315                 return ret;
1316         }
1317
1318         /* remove it from the nodes that are not hosting the ip currently */
1319         for(i=0;i<nodemap->num;i++){
1320                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1321                         continue;
1322                 }
1323                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1324                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1325                         continue;
1326                 }
1327
1328                 for (j=0;j<ips->num;j++) {
1329                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1330                                 break;
1331                         }
1332                 }
1333                 if (j==ips->num) {
1334                         continue;
1335                 }
1336
1337                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1338                         continue;
1339                 }
1340
1341                 options.pnn = nodemap->nodes[i].pnn;
1342                 control_delip(ctdb, argc, argv);
1343         }
1344
1345
1346         /* remove it from every node (also the one hosting it) */
1347         for(i=0;i<nodemap->num;i++){
1348                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1349                         continue;
1350                 }
1351                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1352                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1353                         continue;
1354                 }
1355
1356                 for (j=0;j<ips->num;j++) {
1357                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1358                                 break;
1359                         }
1360                 }
1361                 if (j==ips->num) {
1362                         continue;
1363                 }
1364
1365                 options.pnn = nodemap->nodes[i].pnn;
1366                 control_delip(ctdb, argc, argv);
1367         }
1368
1369         talloc_free(tmp_ctx);
1370         return 0;
1371 }
1372         
1373 /*
1374   delete a public ip address from a node
1375  */
1376 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1377 {
1378         int i, ret;
1379         ctdb_sock_addr addr;
1380         struct ctdb_control_ip_iface pub;
1381         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1382         struct ctdb_all_public_ips *ips;
1383
1384         if (argc != 1) {
1385                 talloc_free(tmp_ctx);
1386                 usage();
1387         }
1388
1389         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1390                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1391                 return -1;
1392         }
1393
1394         if (options.pnn == CTDB_BROADCAST_ALL) {
1395                 return control_delip_all(ctdb, argc, argv, &addr);
1396         }
1397
1398         pub.addr  = addr;
1399         pub.mask  = 0;
1400         pub.len   = 0;
1401
1402         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1405                 talloc_free(tmp_ctx);
1406                 return ret;
1407         }
1408         
1409         for (i=0;i<ips->num;i++) {
1410                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1411                         break;
1412                 }
1413         }
1414
1415         if (i==ips->num) {
1416                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1417                         ctdb_addr_to_str(&addr)));
1418                 talloc_free(tmp_ctx);
1419                 return -1;
1420         }
1421
1422         if (ips->ips[i].pnn == options.pnn) {
1423                 ret = find_other_host_for_public_ip(ctdb, &addr);
1424                 if (ret != -1) {
1425                         if (move_ip(ctdb, &addr, ret) != 0) {
1426                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1427                                 return -1;
1428                         }
1429                 }
1430         }
1431
1432         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1433         if (ret != 0) {
1434                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1435                 talloc_free(tmp_ctx);
1436                 return ret;
1437         }
1438
1439         talloc_free(tmp_ctx);
1440         return 0;
1441 }
1442
1443 /*
1444   kill a tcp connection
1445  */
1446 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1447 {
1448         int ret;
1449         struct ctdb_control_killtcp killtcp;
1450
1451         if (argc < 2) {
1452                 usage();
1453         }
1454
1455         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1456                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1457                 return -1;
1458         }
1459
1460         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1461                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1462                 return -1;
1463         }
1464
1465         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1466         if (ret != 0) {
1467                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1468                 return ret;
1469         }
1470
1471         return 0;
1472 }
1473
1474
1475 /*
1476   send a gratious arp
1477  */
1478 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1479 {
1480         int ret;
1481         ctdb_sock_addr addr;
1482
1483         if (argc < 2) {
1484                 usage();
1485         }
1486
1487         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1488                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1489                 return -1;
1490         }
1491
1492         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1493         if (ret != 0) {
1494                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1495                 return ret;
1496         }
1497
1498         return 0;
1499 }
1500
1501 /*
1502   register a server id
1503  */
1504 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1505 {
1506         int ret;
1507         struct ctdb_server_id server_id;
1508
1509         if (argc < 3) {
1510                 usage();
1511         }
1512
1513         server_id.pnn       = strtoul(argv[0], NULL, 0);
1514         server_id.type      = strtoul(argv[1], NULL, 0);
1515         server_id.server_id = strtoul(argv[2], NULL, 0);
1516
1517         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1518         if (ret != 0) {
1519                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1520                 return ret;
1521         }
1522         return -1;
1523 }
1524
1525 /*
1526   unregister a server id
1527  */
1528 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1529 {
1530         int ret;
1531         struct ctdb_server_id server_id;
1532
1533         if (argc < 3) {
1534                 usage();
1535         }
1536
1537         server_id.pnn       = strtoul(argv[0], NULL, 0);
1538         server_id.type      = strtoul(argv[1], NULL, 0);
1539         server_id.server_id = strtoul(argv[2], NULL, 0);
1540
1541         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1542         if (ret != 0) {
1543                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1544                 return ret;
1545         }
1546         return -1;
1547 }
1548
1549 /*
1550   check if a server id exists
1551  */
1552 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1553 {
1554         uint32_t status;
1555         int ret;
1556         struct ctdb_server_id server_id;
1557
1558         if (argc < 3) {
1559                 usage();
1560         }
1561
1562         server_id.pnn       = strtoul(argv[0], NULL, 0);
1563         server_id.type      = strtoul(argv[1], NULL, 0);
1564         server_id.server_id = strtoul(argv[2], NULL, 0);
1565
1566         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1569                 return ret;
1570         }
1571
1572         if (status) {
1573                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1574         } else {
1575                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1576         }
1577         return 0;
1578 }
1579
1580 /*
1581   get a list of all server ids that are registered on a node
1582  */
1583 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1584 {
1585         int i, ret;
1586         struct ctdb_server_id_list *server_ids;
1587
1588         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1589         if (ret != 0) {
1590                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1591                 return ret;
1592         }
1593
1594         for (i=0; i<server_ids->num; i++) {
1595                 printf("Server id %d:%d:%d\n", 
1596                         server_ids->server_ids[i].pnn, 
1597                         server_ids->server_ids[i].type, 
1598                         server_ids->server_ids[i].server_id); 
1599         }
1600
1601         return -1;
1602 }
1603
1604 /*
1605   send a tcp tickle ack
1606  */
1607 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1608 {
1609         int ret;
1610         ctdb_sock_addr  src, dst;
1611
1612         if (argc < 2) {
1613                 usage();
1614         }
1615
1616         if (!parse_ip_port(argv[0], &src)) {
1617                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1618                 return -1;
1619         }
1620
1621         if (!parse_ip_port(argv[1], &dst)) {
1622                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1623                 return -1;
1624         }
1625
1626         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1627         if (ret==0) {
1628                 return 0;
1629         }
1630         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1631
1632         return -1;
1633 }
1634
1635
1636 /*
1637   display public ip status
1638  */
1639 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1640 {
1641         int i, ret;
1642         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1643         struct ctdb_all_public_ips *ips;
1644
1645         if (options.pnn == CTDB_BROADCAST_ALL) {
1646                 /* read the list of public ips from all nodes */
1647                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1648         } else {
1649                 /* read the public ip list from this node */
1650                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1651         }
1652         if (ret != 0) {
1653                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1654                 talloc_free(tmp_ctx);
1655                 return ret;
1656         }
1657
1658         if (options.machinereadable){
1659                 printf(":Public IP:Node:\n");
1660         } else {
1661                 if (options.pnn == CTDB_BROADCAST_ALL) {
1662                         printf("Public IPs on ALL nodes\n");
1663                 } else {
1664                         printf("Public IPs on node %u\n", options.pnn);
1665                 }
1666         }
1667
1668         for (i=1;i<=ips->num;i++) {
1669                 if (options.machinereadable){
1670                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1671                 } else {
1672                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1673                 }
1674         }
1675
1676         talloc_free(tmp_ctx);
1677         return 0;
1678 }
1679
1680 /*
1681   display pid of a ctdb daemon
1682  */
1683 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1684 {
1685         uint32_t pid;
1686         int ret;
1687
1688         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1689         if (ret != 0) {
1690                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1691                 return ret;
1692         }
1693         printf("Pid:%d\n", pid);
1694
1695         return 0;
1696 }
1697
1698 /*
1699   handler for receiving the response to ipreallocate
1700 */
1701 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1702                              TDB_DATA data, void *private_data)
1703 {
1704         exit(0);
1705 }
1706
1707 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1708 {
1709         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1710
1711         event_add_timed(ctdb->ev, ctdb, 
1712                                 timeval_current_ofs(1, 0),
1713                                 ctdb_every_second, ctdb);
1714 }
1715
1716 /*
1717   ask the recovery daemon on the recovery master to perform a ip reallocation
1718  */
1719 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1720 {
1721         int i, ret;
1722         TDB_DATA data;
1723         struct takeover_run_reply rd;
1724         uint32_t recmaster;
1725         struct ctdb_node_map *nodemap=NULL;
1726         int retries=0;
1727         struct timeval tv = timeval_current();
1728
1729         /* we need some events to trigger so we can timeout and restart
1730            the loop
1731         */
1732         event_add_timed(ctdb->ev, ctdb, 
1733                                 timeval_current_ofs(1, 0),
1734                                 ctdb_every_second, ctdb);
1735
1736         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1737         if (rd.pnn == -1) {
1738                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1739                 return -1;
1740         }
1741         rd.srvid = getpid();
1742
1743         /* register a message port for receiveing the reply so that we
1744            can receive the reply
1745         */
1746         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1747
1748         data.dptr = (uint8_t *)&rd;
1749         data.dsize = sizeof(rd);
1750
1751 again:
1752         if (retries>5) {
1753                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1754                 exit(10);
1755         }
1756
1757         /* check that there are valid nodes available */
1758         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1759                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1760                 exit(10);
1761         }
1762         for (i=0; i<nodemap->num;i++) {
1763                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1764                         break;
1765                 }
1766         }
1767         if (i==nodemap->num) {
1768                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1769                 return 0;
1770         }
1771
1772
1773         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1774         if (ret != 0) {
1775                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1776                 return ret;
1777         }
1778
1779         /* verify the node exists */
1780         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1781                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1782                 exit(10);
1783         }
1784
1785
1786         /* check tha there are nodes available that can act as a recmaster */
1787         for (i=0; i<nodemap->num; i++) {
1788                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1789                         continue;
1790                 }
1791                 break;
1792         }
1793         if (i == nodemap->num) {
1794                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1795                 return 0;
1796         }
1797
1798         /* verify the recovery master is not STOPPED, nor BANNED */
1799         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1800                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1801                 retries++;
1802                 sleep(1);
1803                 goto again;
1804         } 
1805
1806         
1807         /* verify the recovery master is not STOPPED, nor BANNED */
1808         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1809                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1810                 retries++;
1811                 sleep(1);
1812                 goto again;
1813         } 
1814
1815         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1816         if (ret != 0) {
1817                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1818                 return -1;
1819         }
1820
1821         tv = timeval_current();
1822         /* this loop will terminate when we have received the reply */
1823         while (timeval_elapsed(&tv) < 3.0) {    
1824                 event_loop_once(ctdb->ev);
1825         }
1826
1827         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1828         retries++;
1829         sleep(1);
1830         goto again;
1831
1832         return 0;
1833 }
1834
1835
1836 /*
1837   disable a remote node
1838  */
1839 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1840 {
1841         int ret;
1842         struct ctdb_node_map *nodemap=NULL;
1843
1844         do {
1845                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1846                 if (ret != 0) {
1847                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1848                         return ret;
1849                 }
1850
1851                 sleep(1);
1852
1853                 /* read the nodemap and verify the change took effect */
1854                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1855                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1856                         exit(10);
1857                 }
1858
1859         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1860         ret = control_ipreallocate(ctdb, argc, argv);
1861         if (ret != 0) {
1862                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1863                 return ret;
1864         }
1865
1866         return 0;
1867 }
1868
1869 /*
1870   enable a disabled remote node
1871  */
1872 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1873 {
1874         int ret;
1875
1876         struct ctdb_node_map *nodemap=NULL;
1877
1878         do {
1879                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1880                 if (ret != 0) {
1881                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1882                         return ret;
1883                 }
1884
1885                 sleep(1);
1886
1887                 /* read the nodemap and verify the change took effect */
1888                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1889                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1890                         exit(10);
1891                 }
1892
1893         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1894         ret = control_ipreallocate(ctdb, argc, argv);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1897                 return ret;
1898         }
1899
1900         return 0;
1901 }
1902
1903 /*
1904   stop a remote node
1905  */
1906 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1907 {
1908         int ret;
1909         struct ctdb_node_map *nodemap=NULL;
1910
1911         do {
1912                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1913                 if (ret != 0) {
1914                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1915                 }
1916         
1917                 sleep(1);
1918
1919                 /* read the nodemap and verify the change took effect */
1920                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1921                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1922                         exit(10);
1923                 }
1924
1925         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1926         ret = control_ipreallocate(ctdb, argc, argv);
1927         if (ret != 0) {
1928                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1929                 return ret;
1930         }
1931
1932         return 0;
1933 }
1934
1935 /*
1936   restart a stopped remote node
1937  */
1938 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1939 {
1940         int ret;
1941
1942         struct ctdb_node_map *nodemap=NULL;
1943
1944         do {
1945                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1946                 if (ret != 0) {
1947                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1948                         return ret;
1949                 }
1950         
1951                 sleep(1);
1952
1953                 /* read the nodemap and verify the change took effect */
1954                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1955                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1956                         exit(10);
1957                 }
1958
1959         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1960         ret = control_ipreallocate(ctdb, argc, argv);
1961         if (ret != 0) {
1962                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1963                 return ret;
1964         }
1965
1966         return 0;
1967 }
1968
1969 static uint32_t get_generation(struct ctdb_context *ctdb)
1970 {
1971         struct ctdb_vnn_map *vnnmap=NULL;
1972         int ret;
1973
1974         /* wait until the recmaster is not in recovery mode */
1975         while (1) {
1976                 uint32_t recmode, recmaster;
1977                 
1978                 if (vnnmap != NULL) {
1979                         talloc_free(vnnmap);
1980                         vnnmap = NULL;
1981                 }
1982
1983                 /* get the recmaster */
1984                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1985                 if (ret != 0) {
1986                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1987                         exit(10);
1988                 }
1989
1990                 /* get recovery mode */
1991                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1992                 if (ret != 0) {
1993                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1994                         exit(10);
1995                 }
1996
1997                 /* get the current generation number */
1998                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
1999                 if (ret != 0) {
2000                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2001                         exit(10);
2002                 }
2003
2004                 if ((recmode == CTDB_RECOVERY_NORMAL)
2005                 &&  (vnnmap->generation != 1)){
2006                         return vnnmap->generation;
2007                 }
2008                 sleep(1);
2009         }
2010 }
2011
2012 /*
2013   ban a node from the cluster
2014  */
2015 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2016 {
2017         int ret;
2018         struct ctdb_node_map *nodemap=NULL;
2019         struct ctdb_ban_time bantime;
2020
2021         if (argc < 1) {
2022                 usage();
2023         }
2024         
2025         /* verify the node exists */
2026         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2029                 return ret;
2030         }
2031
2032         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2033                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2034                 return -1;
2035         }
2036
2037         bantime.pnn  = options.pnn;
2038         bantime.time = strtoul(argv[0], NULL, 0);
2039
2040         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2041         if (ret != 0) {
2042                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2043                 return -1;
2044         }       
2045
2046         ret = control_ipreallocate(ctdb, argc, argv);
2047         if (ret != 0) {
2048                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2049                 return ret;
2050         }
2051
2052         return 0;
2053 }
2054
2055
2056 /*
2057   unban a node from the cluster
2058  */
2059 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2060 {
2061         int ret;
2062         struct ctdb_node_map *nodemap=NULL;
2063         struct ctdb_ban_time bantime;
2064
2065         /* verify the node exists */
2066         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2069                 return ret;
2070         }
2071
2072         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2073                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2074                 return -1;
2075         }
2076
2077         bantime.pnn  = options.pnn;
2078         bantime.time = 0;
2079
2080         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2081         if (ret != 0) {
2082                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2083                 return -1;
2084         }       
2085
2086         ret = control_ipreallocate(ctdb, argc, argv);
2087         if (ret != 0) {
2088                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2089                 return ret;
2090         }
2091
2092         return 0;
2093 }
2094
2095
2096 /*
2097   show ban information for a node
2098  */
2099 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2100 {
2101         int ret;
2102         struct ctdb_node_map *nodemap=NULL;
2103         struct ctdb_ban_time *bantime;
2104
2105         /* verify the node exists */
2106         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2109                 return ret;
2110         }
2111
2112         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2113         if (ret != 0) {
2114                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2115                 return -1;
2116         }       
2117
2118         if (bantime->time == 0) {
2119                 printf("Node %u is not banned\n", bantime->pnn);
2120         } else {
2121                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2122         }
2123
2124         return 0;
2125 }
2126
2127 /*
2128   shutdown a daemon
2129  */
2130 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2131 {
2132         int ret;
2133
2134         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2135         if (ret != 0) {
2136                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2137                 return ret;
2138         }
2139
2140         return 0;
2141 }
2142
2143 /*
2144   trigger a recovery
2145  */
2146 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2147 {
2148         int ret;
2149         uint32_t generation, next_generation;
2150
2151         /* record the current generation number */
2152         generation = get_generation(ctdb);
2153
2154         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2155         if (ret != 0) {
2156                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2157                 return ret;
2158         }
2159
2160         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2161         if (ret != 0) {
2162                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2163                 return ret;
2164         }
2165
2166         /* wait until we are in a new generation */
2167         while (1) {
2168                 next_generation = get_generation(ctdb);
2169                 if (next_generation != generation) {
2170                         return 0;
2171                 }
2172                 sleep(1);
2173         }
2174
2175         return 0;
2176 }
2177
2178
2179 /*
2180   display monitoring mode of a remote node
2181  */
2182 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2183 {
2184         uint32_t monmode;
2185         int ret;
2186
2187         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2188         if (ret != 0) {
2189                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2190                 return ret;
2191         }
2192         if (!options.machinereadable){
2193                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2194         } else {
2195                 printf(":mode:\n");
2196                 printf(":%d:\n",monmode);
2197         }
2198         return 0;
2199 }
2200
2201
2202 /*
2203   display capabilities of a remote node
2204  */
2205 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2206 {
2207         uint32_t capabilities;
2208         int ret;
2209
2210         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2211         if (ret != 0) {
2212                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2213                 return ret;
2214         }
2215         
2216         if (!options.machinereadable){
2217                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2218                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2219                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2220                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2221         } else {
2222                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2223                 printf(":%d:%d:%d:%d:\n",
2224                         !!(capabilities&CTDB_CAP_RECMASTER),
2225                         !!(capabilities&CTDB_CAP_LMASTER),
2226                         !!(capabilities&CTDB_CAP_LVS),
2227                         !!(capabilities&CTDB_CAP_NATGW));
2228         }
2229         return 0;
2230 }
2231
2232 /*
2233   display lvs configuration
2234  */
2235 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2236 {
2237         uint32_t *capabilities;
2238         struct ctdb_node_map *nodemap=NULL;
2239         int i, ret;
2240         int healthy_count = 0;
2241
2242         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2243         if (ret != 0) {
2244                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2245                 return ret;
2246         }
2247
2248         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2249         CTDB_NO_MEMORY(ctdb, capabilities);
2250         
2251         /* collect capabilities for all connected nodes */
2252         for (i=0; i<nodemap->num; i++) {
2253                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2254                         continue;
2255                 }
2256                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2257                         continue;
2258                 }
2259         
2260                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2261                 if (ret != 0) {
2262                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2263                         return ret;
2264                 }
2265
2266                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2267                         continue;
2268                 }
2269
2270                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2271                         healthy_count++;
2272                 }
2273         }
2274
2275         /* Print all LVS nodes */
2276         for (i=0; i<nodemap->num; i++) {
2277                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2278                         continue;
2279                 }
2280                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2281                         continue;
2282                 }
2283                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2284                         continue;
2285                 }
2286
2287                 if (healthy_count != 0) {
2288                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2289                                 continue;
2290                         }
2291                 }
2292
2293                 printf("%d:%s\n", i, 
2294                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2295         }
2296
2297         return 0;
2298 }
2299
2300 /*
2301   display who is the lvs master
2302  */
2303 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2304 {
2305         uint32_t *capabilities;
2306         struct ctdb_node_map *nodemap=NULL;
2307         int i, ret;
2308         int healthy_count = 0;
2309
2310         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2313                 return ret;
2314         }
2315
2316         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2317         CTDB_NO_MEMORY(ctdb, capabilities);
2318         
2319         /* collect capabilities for all connected nodes */
2320         for (i=0; i<nodemap->num; i++) {
2321                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2322                         continue;
2323                 }
2324                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2325                         continue;
2326                 }
2327         
2328                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2329                 if (ret != 0) {
2330                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2331                         return ret;
2332                 }
2333
2334                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2335                         continue;
2336                 }
2337
2338                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2339                         healthy_count++;
2340                 }
2341         }
2342
2343         /* find and show the lvsmaster */
2344         for (i=0; i<nodemap->num; i++) {
2345                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2346                         continue;
2347                 }
2348                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2349                         continue;
2350                 }
2351                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2352                         continue;
2353                 }
2354
2355                 if (healthy_count != 0) {
2356                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2357                                 continue;
2358                         }
2359                 }
2360
2361                 if (options.machinereadable){
2362                         printf("%d\n", i);
2363                 } else {
2364                         printf("Node %d is LVS master\n", i);
2365                 }
2366                 return 0;
2367         }
2368
2369         printf("There is no LVS master\n");
2370         return -1;
2371 }
2372
2373 /*
2374   disable monitoring on a  node
2375  */
2376 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2377 {
2378         
2379         int ret;
2380
2381         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2382         if (ret != 0) {
2383                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2384                 return ret;
2385         }
2386         printf("Monitoring mode:%s\n","DISABLED");
2387
2388         return 0;
2389 }
2390
2391 /*
2392   enable monitoring on a  node
2393  */
2394 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2395 {
2396         
2397         int ret;
2398
2399         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2400         if (ret != 0) {
2401                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2402                 return ret;
2403         }
2404         printf("Monitoring mode:%s\n","ACTIVE");
2405
2406         return 0;
2407 }
2408
2409 /*
2410   display remote list of keys/data for a db
2411  */
2412 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2413 {
2414         const char *db_name;
2415         struct ctdb_db_context *ctdb_db;
2416         int ret;
2417
2418         if (argc < 1) {
2419                 usage();
2420         }
2421
2422         db_name = argv[0];
2423
2424
2425         if (db_exists(ctdb, db_name)) {
2426                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2427                 return -1;
2428         }
2429
2430         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2431
2432         if (ctdb_db == NULL) {
2433                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2434                 return -1;
2435         }
2436
2437         /* traverse and dump the cluster tdb */
2438         ret = ctdb_dump_db(ctdb_db, stdout);
2439         if (ret == -1) {
2440                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2441                 return -1;
2442         }
2443         talloc_free(ctdb_db);
2444
2445         printf("Dumped %d records\n", ret);
2446         return 0;
2447 }
2448
2449
2450 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2451                              TDB_DATA data, void *private_data)
2452 {
2453         DEBUG(DEBUG_ERR,("Log data received\n"));
2454         if (data.dsize > 0) {
2455                 printf("%s", data.dptr);
2456         }
2457
2458         exit(0);
2459 }
2460
2461 /*
2462   display a list of log messages from the in memory ringbuffer
2463  */
2464 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2465 {
2466         int ret;
2467         int32_t res;
2468         struct ctdb_get_log_addr log_addr;
2469         TDB_DATA data;
2470         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2471         char *errmsg;
2472         struct timeval tv;
2473
2474         if (argc != 1) {
2475                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2476                 talloc_free(tmp_ctx);
2477                 return -1;
2478         }
2479
2480         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2481         log_addr.srvid = getpid();
2482         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2483                 log_addr.level = get_debug_by_desc(argv[0]);
2484         } else {
2485                 log_addr.level = strtol(argv[0], NULL, 0);
2486         }
2487
2488
2489         data.dptr = (unsigned char *)&log_addr;
2490         data.dsize = sizeof(log_addr);
2491
2492         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2493
2494         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2495         sleep(1);
2496
2497         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2498
2499         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2500                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2501         if (ret != 0 || res != 0) {
2502                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2503                 talloc_free(tmp_ctx);
2504                 return -1;
2505         }
2506
2507
2508         tv = timeval_current();
2509         /* this loop will terminate when we have received the reply */
2510         while (timeval_elapsed(&tv) < 3.0) {    
2511                 event_loop_once(ctdb->ev);
2512         }
2513
2514         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2515
2516         talloc_free(tmp_ctx);
2517         return 0;
2518 }
2519
2520 /*
2521   clear the in memory log area
2522  */
2523 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2524 {
2525         int ret;
2526         int32_t res;
2527         char *errmsg;
2528         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2529
2530         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2531                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2532         if (ret != 0 || res != 0) {
2533                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2534                 talloc_free(tmp_ctx);
2535                 return -1;
2536         }
2537
2538         talloc_free(tmp_ctx);
2539         return 0;
2540 }
2541
2542
2543
2544 /*
2545   display a list of the databases on a remote ctdb
2546  */
2547 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2548 {
2549         int i, ret;
2550         struct ctdb_dbid_map *dbmap=NULL;
2551
2552         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2555                 return ret;
2556         }
2557
2558         printf("Number of databases:%d\n", dbmap->num);
2559         for(i=0;i<dbmap->num;i++){
2560                 const char *path;
2561                 const char *name;
2562                 bool persistent;
2563
2564                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2565                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2566                 persistent = dbmap->dbs[i].persistent;
2567                 printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name, 
2568                        path, persistent?"PERSISTENT":"");
2569         }
2570
2571         return 0;
2572 }
2573
2574 /*
2575   check if the local node is recmaster or not
2576   it will return 1 if this node is the recmaster and 0 if it is not
2577   or if the local ctdb daemon could not be contacted
2578  */
2579 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2580 {
2581         uint32_t mypnn, recmaster;
2582         int ret;
2583
2584         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2585         if (mypnn == -1) {
2586                 printf("Failed to get pnn of node\n");
2587                 return 1;
2588         }
2589
2590         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2591         if (ret != 0) {
2592                 printf("Failed to get the recmaster\n");
2593                 return 1;
2594         }
2595
2596         if (recmaster != mypnn) {
2597                 printf("this node is not the recmaster\n");
2598                 return 1;
2599         }
2600
2601         printf("this node is the recmaster\n");
2602         return 0;
2603 }
2604
2605 /*
2606   ping a node
2607  */
2608 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2609 {
2610         int ret;
2611         struct timeval tv = timeval_current();
2612         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2613         if (ret == -1) {
2614                 printf("Unable to get ping response from node %u\n", options.pnn);
2615                 return -1;
2616         } else {
2617                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2618                        options.pnn, timeval_elapsed(&tv), ret);
2619         }
2620         return 0;
2621 }
2622
2623
2624 /*
2625   get a tunable
2626  */
2627 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2628 {
2629         const char *name;
2630         uint32_t value;
2631         int ret;
2632
2633         if (argc < 1) {
2634                 usage();
2635         }
2636
2637         name = argv[0];
2638         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2639         if (ret == -1) {
2640                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2641                 return -1;
2642         }
2643
2644         printf("%-19s = %u\n", name, value);
2645         return 0;
2646 }
2647
2648 /*
2649   set a tunable
2650  */
2651 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2652 {
2653         const char *name;
2654         uint32_t value;
2655         int ret;
2656
2657         if (argc < 2) {
2658                 usage();
2659         }
2660
2661         name = argv[0];
2662         value = strtoul(argv[1], NULL, 0);
2663
2664         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2665         if (ret == -1) {
2666                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2667                 return -1;
2668         }
2669         return 0;
2670 }
2671
2672 /*
2673   list all tunables
2674  */
2675 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2676 {
2677         uint32_t count;
2678         const char **list;
2679         int ret, i;
2680
2681         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2682         if (ret == -1) {
2683                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2684                 return -1;
2685         }
2686
2687         for (i=0;i<count;i++) {
2688                 control_getvar(ctdb, 1, &list[i]);
2689         }
2690
2691         talloc_free(list);
2692         
2693         return 0;
2694 }
2695
2696 /*
2697   display debug level on a node
2698  */
2699 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2700 {
2701         int ret;
2702         int32_t level;
2703
2704         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2705         if (ret != 0) {
2706                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2707                 return ret;
2708         } else {
2709                 if (options.machinereadable){
2710                         printf(":Name:Level:\n");
2711                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2712                 } else {
2713                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2714                 }
2715         }
2716         return 0;
2717 }
2718
2719 /*
2720   display reclock file of a node
2721  */
2722 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2723 {
2724         int ret;
2725         const char *reclock;
2726
2727         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2728         if (ret != 0) {
2729                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2730                 return ret;
2731         } else {
2732                 if (options.machinereadable){
2733                         if (reclock != NULL) {
2734                                 printf("%s", reclock);
2735                         }
2736                 } else {
2737                         if (reclock == NULL) {
2738                                 printf("No reclock file used.\n");
2739                         } else {
2740                                 printf("Reclock file:%s\n", reclock);
2741                         }
2742                 }
2743         }
2744         return 0;
2745 }
2746
2747 /*
2748   set the reclock file of a node
2749  */
2750 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2751 {
2752         int ret;
2753         const char *reclock;
2754
2755         if (argc == 0) {
2756                 reclock = NULL;
2757         } else if (argc == 1) {
2758                 reclock = argv[0];
2759         } else {
2760                 usage();
2761         }
2762
2763         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2764         if (ret != 0) {
2765                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2766                 return ret;
2767         }
2768         return 0;
2769 }
2770
2771 /*
2772   set the natgw state on/off
2773  */
2774 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2775 {
2776         int ret;
2777         uint32_t natgwstate;
2778
2779         if (argc == 0) {
2780                 usage();
2781         }
2782
2783         if (!strcmp(argv[0], "on")) {
2784                 natgwstate = 1;
2785         } else if (!strcmp(argv[0], "off")) {
2786                 natgwstate = 0;
2787         } else {
2788                 usage();
2789         }
2790
2791         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2792         if (ret != 0) {
2793                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2794                 return ret;
2795         }
2796
2797         return 0;
2798 }
2799
2800 /*
2801   set the lmaster role on/off
2802  */
2803 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2804 {
2805         int ret;
2806         uint32_t lmasterrole;
2807
2808         if (argc == 0) {
2809                 usage();
2810         }
2811
2812         if (!strcmp(argv[0], "on")) {
2813                 lmasterrole = 1;
2814         } else if (!strcmp(argv[0], "off")) {
2815                 lmasterrole = 0;
2816         } else {
2817                 usage();
2818         }
2819
2820         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2821         if (ret != 0) {
2822                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2823                 return ret;
2824         }
2825
2826         return 0;
2827 }
2828
2829 /*
2830   set the recmaster role on/off
2831  */
2832 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2833 {
2834         int ret;
2835         uint32_t recmasterrole;
2836
2837         if (argc == 0) {
2838                 usage();
2839         }
2840
2841         if (!strcmp(argv[0], "on")) {
2842                 recmasterrole = 1;
2843         } else if (!strcmp(argv[0], "off")) {
2844                 recmasterrole = 0;
2845         } else {
2846                 usage();
2847         }
2848
2849         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2850         if (ret != 0) {
2851                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2852                 return ret;
2853         }
2854
2855         return 0;
2856 }
2857
2858 /*
2859   set debug level on a node or all nodes
2860  */
2861 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2862 {
2863         int i, ret;
2864         int32_t level;
2865
2866         if (argc == 0) {
2867                 printf("You must specify the debug level. Valid levels are:\n");
2868                 for (i=0; debug_levels[i].description != NULL; i++) {
2869                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2870                 }
2871
2872                 return 0;
2873         }
2874
2875         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2876                 level = get_debug_by_desc(argv[0]);
2877         } else {
2878                 level = strtol(argv[0], NULL, 0);
2879         }
2880
2881         for (i=0; debug_levels[i].description != NULL; i++) {
2882                 if (level == debug_levels[i].level) {
2883                         break;
2884                 }
2885         }
2886         if (debug_levels[i].description == NULL) {
2887                 printf("Invalid debug level, must be one of\n");
2888                 for (i=0; debug_levels[i].description != NULL; i++) {
2889                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2890                 }
2891                 return -1;
2892         }
2893
2894         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2895         if (ret != 0) {
2896                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2897         }
2898         return 0;
2899 }
2900
2901
2902 /*
2903   freeze a node
2904  */
2905 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2906 {
2907         int ret;
2908         uint32_t priority;
2909         
2910         if (argc == 1) {
2911                 priority = strtol(argv[0], NULL, 0);
2912         } else {
2913                 priority = 0;
2914         }
2915         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2916
2917         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2918         if (ret != 0) {
2919                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2920         }               
2921         return 0;
2922 }
2923
2924 /*
2925   thaw a node
2926  */
2927 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
2928 {
2929         int ret;
2930         uint32_t priority;
2931         
2932         if (argc == 1) {
2933                 priority = strtol(argv[0], NULL, 0);
2934         } else {
2935                 priority = 0;
2936         }
2937         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
2938
2939         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2940         if (ret != 0) {
2941                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
2942         }               
2943         return 0;
2944 }
2945
2946
2947 /*
2948   attach to a database
2949  */
2950 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
2951 {
2952         const char *db_name;
2953         struct ctdb_db_context *ctdb_db;
2954
2955         if (argc < 1) {
2956                 usage();
2957         }
2958         db_name = argv[0];
2959
2960         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2961         if (ctdb_db == NULL) {
2962                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2963                 return -1;
2964         }
2965
2966         return 0;
2967 }
2968
2969 /*
2970   set db priority
2971  */
2972 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2973 {
2974         struct ctdb_db_priority db_prio;
2975         int ret;
2976
2977         if (argc < 2) {
2978                 usage();
2979         }
2980
2981         db_prio.db_id    = strtoul(argv[0], NULL, 0);
2982         db_prio.priority = strtoul(argv[1], NULL, 0);
2983
2984         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
2985         if (ret != 0) {
2986                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
2987                 return -1;
2988         }
2989
2990         return 0;
2991 }
2992
2993 /*
2994   get db priority
2995  */
2996 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2997 {
2998         uint32_t db_id, priority;
2999         int ret;
3000
3001         if (argc < 1) {
3002                 usage();
3003         }
3004
3005         db_id = strtoul(argv[0], NULL, 0);
3006
3007         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3008         if (ret != 0) {
3009                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3010                 return -1;
3011         }
3012
3013         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3014
3015         return 0;
3016 }
3017
3018 /*
3019   run an eventscript on a node
3020  */
3021 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3022 {
3023         TDB_DATA data;
3024         int ret;
3025         int32_t res;
3026         char *errmsg;
3027         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3028
3029         if (argc != 1) {
3030                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3031                 return -1;
3032         }
3033
3034         data.dptr = (unsigned char *)discard_const(argv[0]);
3035         data.dsize = strlen((char *)data.dptr) + 1;
3036
3037         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3038
3039         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3040                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3041         if (ret != 0 || res != 0) {
3042                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3043                 talloc_free(tmp_ctx);
3044                 return -1;
3045         }
3046         talloc_free(tmp_ctx);
3047         return 0;
3048 }
3049
3050 #define DB_VERSION 1
3051 #define MAX_DB_NAME 64
3052 struct db_file_header {
3053         unsigned long version;
3054         time_t timestamp;
3055         unsigned long persistent;
3056         unsigned long size;
3057         const char name[MAX_DB_NAME];
3058 };
3059
3060 struct backup_data {
3061         struct ctdb_marshall_buffer *records;
3062         uint32_t len;
3063         uint32_t total;
3064         bool traverse_error;
3065 };
3066
3067 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3068 {
3069         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3070         struct ctdb_rec_data *rec;
3071
3072         /* add the record */
3073         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3074         if (rec == NULL) {
3075                 bd->traverse_error = true;
3076                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3077                 return -1;
3078         }
3079         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3080         if (bd->records == NULL) {
3081                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3082                 bd->traverse_error = true;
3083                 return -1;
3084         }
3085         bd->records->count++;
3086         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3087         bd->len += rec->length;
3088         talloc_free(rec);
3089
3090         bd->total++;
3091         return 0;
3092 }
3093
3094 /*
3095  * backup a database to a file 
3096  */
3097 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3098 {
3099         int i, ret;
3100         struct ctdb_dbid_map *dbmap=NULL;
3101         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3102         struct db_file_header dbhdr;
3103         struct ctdb_db_context *ctdb_db;
3104         struct backup_data *bd;
3105         int fh = -1;
3106         int status = -1;
3107
3108         if (argc != 2) {
3109                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3110                 return -1;
3111         }
3112
3113         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3114         if (ret != 0) {
3115                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3116                 return ret;
3117         }
3118
3119         for(i=0;i<dbmap->num;i++){
3120                 const char *name;
3121
3122                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3123                 if(!strcmp(argv[0], name)){
3124                         talloc_free(discard_const(name));
3125                         break;
3126                 }
3127                 talloc_free(discard_const(name));
3128         }
3129         if (i == dbmap->num) {
3130                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3131                 talloc_free(tmp_ctx);
3132                 return -1;
3133         }
3134
3135
3136         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3137         if (ctdb_db == NULL) {
3138                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3139                 talloc_free(tmp_ctx);
3140                 return -1;
3141         }
3142
3143
3144         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3145         if (ret == -1) {
3146                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3147                 talloc_free(tmp_ctx);
3148                 return -1;
3149         }
3150
3151
3152         bd = talloc_zero(tmp_ctx, struct backup_data);
3153         if (bd == NULL) {
3154                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3155                 talloc_free(tmp_ctx);
3156                 return -1;
3157         }
3158
3159         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3160         if (bd->records == NULL) {
3161                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3162                 talloc_free(tmp_ctx);
3163                 return -1;
3164         }
3165
3166         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3167         bd->records->db_id = ctdb_db->db_id;
3168         /* traverse the database collecting all records */
3169         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3170             bd->traverse_error) {
3171                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3172                 talloc_free(tmp_ctx);
3173                 return -1;              
3174         }
3175
3176         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3177
3178
3179         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3180         if (fh == -1) {
3181                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3182                 talloc_free(tmp_ctx);
3183                 return -1;
3184         }
3185
3186         dbhdr.version = DB_VERSION;
3187         dbhdr.timestamp = time(NULL);
3188         dbhdr.persistent = dbmap->dbs[i].persistent;
3189         dbhdr.size = bd->len;
3190         if (strlen(argv[0]) >= MAX_DB_NAME) {
3191                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3192                 goto done;
3193         }
3194         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3195         ret = write(fh, &dbhdr, sizeof(dbhdr));
3196         if (ret == -1) {
3197                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3198                 goto done;
3199         }
3200         ret = write(fh, bd->records, bd->len);
3201         if (ret == -1) {
3202                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3203                 goto done;
3204         }
3205
3206         status = 0;
3207 done:
3208         if (fh != -1) {
3209                 ret = close(fh);
3210                 if (ret == -1) {
3211                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3212                 }
3213         }
3214         talloc_free(tmp_ctx);
3215         return status;
3216 }
3217
3218 /*
3219  * restore a database from a file 
3220  */
3221 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3222 {
3223         int ret;
3224         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3225         TDB_DATA outdata;
3226         TDB_DATA data;
3227         struct db_file_header dbhdr;
3228         struct ctdb_db_context *ctdb_db;
3229         struct ctdb_node_map *nodemap=NULL;
3230         struct ctdb_vnn_map *vnnmap=NULL;
3231         int i, fh;
3232         struct ctdb_control_wipe_database w;
3233         uint32_t *nodes;
3234         uint32_t generation;
3235         struct tm *tm;
3236         char tbuf[100];
3237
3238         if (argc != 1) {
3239                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3240                 return -1;
3241         }
3242
3243         fh = open(argv[0], O_RDONLY);
3244         if (fh == -1) {
3245                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3246                 talloc_free(tmp_ctx);
3247                 return -1;
3248         }
3249
3250         read(fh, &dbhdr, sizeof(dbhdr));
3251         if (dbhdr.version != DB_VERSION) {
3252                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3253                 talloc_free(tmp_ctx);
3254                 return -1;
3255         }
3256
3257         outdata.dsize = dbhdr.size;
3258         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3259         if (outdata.dptr == NULL) {
3260                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3261                 close(fh);
3262                 talloc_free(tmp_ctx);
3263                 return -1;
3264         }               
3265         read(fh, outdata.dptr, outdata.dsize);
3266         close(fh);
3267
3268         tm = localtime(&dbhdr.timestamp);
3269         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3270         printf("Restoring database '%s' from backup @ %s\n",
3271                 dbhdr.name, tbuf);
3272
3273
3274         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3275         if (ctdb_db == NULL) {
3276                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3277                 talloc_free(tmp_ctx);
3278                 return -1;
3279         }
3280
3281         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3282         if (ret != 0) {
3283                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3284                 talloc_free(tmp_ctx);
3285                 return ret;
3286         }
3287
3288
3289         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3290         if (ret != 0) {
3291                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3292                 talloc_free(tmp_ctx);
3293                 return ret;
3294         }
3295
3296         /* freeze all nodes */
3297         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3298         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3299                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3300                                         nodes, i,
3301                                         TIMELIMIT(),
3302                                         false, tdb_null,
3303                                         NULL, NULL,
3304                                         NULL) != 0) {
3305                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3306                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3307                         talloc_free(tmp_ctx);
3308                         return -1;
3309                 }
3310         }
3311
3312         generation = vnnmap->generation;
3313         data.dptr = (void *)&generation;
3314         data.dsize = sizeof(generation);
3315
3316         /* start a cluster wide transaction */
3317         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3318         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3319                                         nodes, 0,
3320                                         TIMELIMIT(), false, data,
3321                                         NULL, NULL,
3322                                         NULL) != 0) {
3323                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3324                 return -1;
3325         }
3326
3327
3328         w.db_id = ctdb_db->db_id;
3329         w.transaction_id = generation;
3330
3331         data.dptr = (void *)&w;
3332         data.dsize = sizeof(w);
3333
3334         /* wipe all the remote databases. */
3335         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3336         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3337                                         nodes, 0,
3338                                         TIMELIMIT(), false, data,
3339                                         NULL, NULL,
3340                                         NULL) != 0) {
3341                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3342                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3343                 talloc_free(tmp_ctx);
3344                 return -1;
3345         }
3346         
3347         /* push the database */
3348         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3349         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3350                                         nodes, 0,
3351                                         TIMELIMIT(), false, outdata,
3352                                         NULL, NULL,
3353                                         NULL) != 0) {
3354                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3355                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3356                 talloc_free(tmp_ctx);
3357                 return -1;
3358         }
3359
3360         data.dptr = (void *)&generation;
3361         data.dsize = sizeof(generation);
3362
3363         /* commit all the changes */
3364         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3365                                         nodes, 0,
3366                                         TIMELIMIT(), false, data,
3367                                         NULL, NULL,
3368                                         NULL) != 0) {
3369                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3370                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3371                 talloc_free(tmp_ctx);
3372                 return -1;
3373         }
3374
3375
3376         /* thaw all nodes */
3377         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3378         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3379                                         nodes, 0,
3380                                         TIMELIMIT(),
3381                                         false, tdb_null,
3382                                         NULL, NULL,
3383                                         NULL) != 0) {
3384                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3385                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3386                 talloc_free(tmp_ctx);
3387                 return -1;
3388         }
3389
3390
3391         talloc_free(tmp_ctx);
3392         return 0;
3393 }
3394
3395 /*
3396  * wipe a database from a file
3397  */
3398 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3399                           const char **argv)
3400 {
3401         int ret;
3402         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3403         TDB_DATA data;
3404         struct ctdb_db_context *ctdb_db;
3405         struct ctdb_node_map *nodemap = NULL;
3406         struct ctdb_vnn_map *vnnmap = NULL;
3407         int i;
3408         struct ctdb_control_wipe_database w;
3409         uint32_t *nodes;
3410         uint32_t generation;
3411         struct ctdb_dbid_map *dbmap = NULL;
3412
3413         if (argc != 1) {
3414                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3415                 return -1;
3416         }
3417
3418         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3419                                  &dbmap);
3420         if (ret != 0) {
3421                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3422                                   options.pnn));
3423                 return ret;
3424         }
3425
3426         for(i=0;i<dbmap->num;i++){
3427                 const char *name;
3428
3429                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3430                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3431                 if(!strcmp(argv[0], name)){
3432                         talloc_free(discard_const(name));
3433                         break;
3434                 }
3435                 talloc_free(discard_const(name));
3436         }
3437         if (i == dbmap->num) {
3438                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3439                                   argv[0]));
3440                 talloc_free(tmp_ctx);
3441                 return -1;
3442         }
3443
3444         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3445         if (ctdb_db == NULL) {
3446                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3447                                   argv[0]));
3448                 talloc_free(tmp_ctx);
3449                 return -1;
3450         }
3451
3452         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3453                                    &nodemap);
3454         if (ret != 0) {
3455                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3456                                   options.pnn));
3457                 talloc_free(tmp_ctx);
3458                 return ret;
3459         }
3460
3461         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3462                                   &vnnmap);
3463         if (ret != 0) {
3464                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3465                                   options.pnn));
3466                 talloc_free(tmp_ctx);
3467                 return ret;
3468         }
3469
3470         /* freeze all nodes */
3471         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3472         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3473                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3474                                                 nodes, i,
3475                                                 TIMELIMIT(),
3476                                                 false, tdb_null,
3477                                                 NULL, NULL,
3478                                                 NULL);
3479                 if (ret != 0) {
3480                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3481                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3482                                              CTDB_RECOVERY_ACTIVE);
3483                         talloc_free(tmp_ctx);
3484                         return -1;
3485                 }
3486         }
3487
3488         generation = vnnmap->generation;
3489         data.dptr = (void *)&generation;
3490         data.dsize = sizeof(generation);
3491
3492         /* start a cluster wide transaction */
3493         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3494         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3495                                         nodes, 0,
3496                                         TIMELIMIT(), false, data,
3497                                         NULL, NULL,
3498                                         NULL);
3499         if (ret!= 0) {
3500                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3501                                   "transactions.\n"));
3502                 return -1;
3503         }
3504
3505         w.db_id = ctdb_db->db_id;
3506         w.transaction_id = generation;
3507
3508         data.dptr = (void *)&w;
3509         data.dsize = sizeof(w);
3510
3511         /* wipe all the remote databases. */
3512         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3513         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3514                                         nodes, 0,
3515                                         TIMELIMIT(), false, data,
3516                                         NULL, NULL,
3517                                         NULL) != 0) {
3518                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3519                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524         data.dptr = (void *)&generation;
3525         data.dsize = sizeof(generation);
3526
3527         /* commit all the changes */
3528         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3529                                         nodes, 0,
3530                                         TIMELIMIT(), false, data,
3531                                         NULL, NULL,
3532                                         NULL) != 0) {
3533                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3534                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3535                 talloc_free(tmp_ctx);
3536                 return -1;
3537         }
3538
3539         /* thaw all nodes */
3540         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3541         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3542                                         nodes, 0,
3543                                         TIMELIMIT(),
3544                                         false, tdb_null,
3545                                         NULL, NULL,
3546                                         NULL) != 0) {
3547                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3548                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3549                 talloc_free(tmp_ctx);
3550                 return -1;
3551         }
3552
3553         talloc_free(tmp_ctx);
3554         return 0;
3555 }
3556
3557 /*
3558  * set flags of a node in the nodemap
3559  */
3560 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3561 {
3562         int ret;
3563         int32_t status;
3564         int node;
3565         int flags;
3566         TDB_DATA data;
3567         struct ctdb_node_flag_change c;
3568
3569         if (argc != 2) {
3570                 usage();
3571                 return -1;
3572         }
3573
3574         if (sscanf(argv[0], "%d", &node) != 1) {
3575                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3576                 usage();
3577                 return -1;
3578         }
3579         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3580                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3581                 usage();
3582                 return -1;
3583         }
3584
3585         c.pnn       = node;
3586         c.old_flags = 0;
3587         c.new_flags = flags;
3588
3589         data.dsize = sizeof(c);
3590         data.dptr = (unsigned char *)&c;
3591
3592         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3593                            data, NULL, NULL, &status, NULL, NULL);
3594         if (ret != 0 || status != 0) {
3595                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3596                 return -1;
3597         }
3598         return 0;
3599 }
3600
3601 /*
3602   dump memory usage
3603  */
3604 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3605 {
3606         TDB_DATA data;
3607         int ret;
3608         int32_t res;
3609         char *errmsg;
3610         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3611         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3612                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3613         if (ret != 0 || res != 0) {
3614                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3615                 talloc_free(tmp_ctx);
3616                 return -1;
3617         }
3618         write(1, data.dptr, data.dsize);
3619         talloc_free(tmp_ctx);
3620         return 0;
3621 }
3622
3623 /*
3624   handler for memory dumps
3625 */
3626 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3627                              TDB_DATA data, void *private_data)
3628 {
3629         write(1, data.dptr, data.dsize);
3630         exit(0);
3631 }
3632
3633 /*
3634   dump memory usage on the recovery daemon
3635  */
3636 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3637 {
3638         int ret;
3639         TDB_DATA data;
3640         struct rd_memdump_reply rd;
3641
3642         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3643         if (rd.pnn == -1) {
3644                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3645                 return -1;
3646         }
3647         rd.srvid = getpid();
3648
3649         /* register a message port for receiveing the reply so that we
3650            can receive the reply
3651         */
3652         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3653
3654
3655         data.dptr = (uint8_t *)&rd;
3656         data.dsize = sizeof(rd);
3657
3658         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3659         if (ret != 0) {
3660                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3661                 return -1;
3662         }
3663
3664         /* this loop will terminate when we have received the reply */
3665         while (1) {     
3666                 event_loop_once(ctdb->ev);
3667         }
3668
3669         return 0;
3670 }
3671
3672 /*
3673   list all nodes in the cluster
3674   if the daemon is running, we read the data from the daemon.
3675   if the daemon is not running we parse the nodes file directly
3676  */
3677 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3678 {
3679         int i, ret;
3680         struct ctdb_node_map *nodemap=NULL;
3681
3682         if (ctdb != NULL) {
3683                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3684                 if (ret != 0) {
3685                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3686                         return ret;
3687                 }
3688
3689                 for(i=0;i<nodemap->num;i++){
3690                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3691                                 continue;
3692                         }
3693                         if (options.machinereadable){
3694                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3695                         } else {
3696                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3697                         }
3698                 }
3699         } else {
3700                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3701                 struct pnn_node *pnn_nodes;
3702                 struct pnn_node *pnn_node;
3703         
3704                 pnn_nodes = read_nodes_file(mem_ctx);
3705                 if (pnn_nodes == NULL) {
3706                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3707                         talloc_free(mem_ctx);
3708                         return -1;
3709                 }
3710
3711                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3712                         ctdb_sock_addr addr;
3713
3714                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3715                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3716                                 talloc_free(mem_ctx);
3717                                 return -1;
3718                         }
3719
3720                         if (options.machinereadable){
3721                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3722                         } else {
3723                                 printf("%s\n", pnn_node->addr);
3724                         }
3725                 }
3726                 talloc_free(mem_ctx);
3727         }
3728
3729         return 0;
3730 }
3731
3732 /*
3733   reload the nodes file on the local node
3734  */
3735 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3736 {
3737         int i, ret;
3738         int mypnn;
3739         struct ctdb_node_map *nodemap=NULL;
3740
3741         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3742         if (mypnn == -1) {
3743                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3744                 return -1;
3745         }
3746
3747         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3748         if (ret != 0) {
3749                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3750                 return ret;
3751         }
3752
3753         /* reload the nodes file on all remote nodes */
3754         for (i=0;i<nodemap->num;i++) {
3755                 if (nodemap->nodes[i].pnn == mypnn) {
3756                         continue;
3757                 }
3758                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3759                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3760                         nodemap->nodes[i].pnn);
3761                 if (ret != 0) {
3762                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3763                 }
3764         }
3765
3766         /* reload the nodes file on the local node */
3767         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3768         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3769         if (ret != 0) {
3770                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3771         }
3772
3773         /* initiate a recovery */
3774         control_recover(ctdb, argc, argv);
3775
3776         return 0;
3777 }
3778
3779
3780 static const struct {
3781         const char *name;
3782         int (*fn)(struct ctdb_context *, int, const char **);
3783         bool auto_all;
3784         bool without_daemon; /* can be run without daemon running ? */
3785         const char *msg;
3786         const char *args;
3787 } ctdb_commands[] = {
3788 #ifdef CTDB_VERS
3789         { "version",         control_version,           true,   false,  "show version of ctdb" },
3790 #endif
3791         { "status",          control_status,            true,   false,  "show node status" },
3792         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
3793         { "ping",            control_ping,              true,   false,  "ping all nodes" },
3794         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
3795         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
3796         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
3797         { "statistics",      control_statistics,        false,  false, "show statistics" },
3798         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
3799         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
3800         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
3801         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
3802         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
3803         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
3804         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
3805         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
3806         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
3807         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
3808         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
3809         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
3810         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
3811         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
3812         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
3813         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
3814         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
3815         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
3816         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
3817         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
3818         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
3819         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
3820         { "stop",            control_stop,              true,   false,  "stop a node" },
3821         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
3822         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
3823         { "unban",           control_unban,             true,   false,  "unban a node" },
3824         { "showban",         control_showban,           true,   false,  "show ban information"},
3825         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
3826         { "recover",         control_recover,           true,   false,  "force recovery" },
3827         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
3828         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
3829         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
3830         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
3831         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
3832         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
3833         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
3834         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
3835
3836         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
3837         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
3838         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
3839         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
3840         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
3841         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
3842         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
3843         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
3844         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
3845         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
3846         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
3847         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
3848         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
3849         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
3850         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
3851         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
3852         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
3853         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
3854         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
3855         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
3856         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
3857         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
3858         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
3859         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
3860         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
3861         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
3862         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
3863         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
3864         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
3865 };
3866
3867 /*
3868   show usage message
3869  */
3870 static void usage(void)
3871 {
3872         int i;
3873         printf(
3874 "Usage: ctdb [options] <control>\n" \
3875 "Options:\n" \
3876 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
3877 "   -Y                 generate machinereadable output\n"
3878 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
3879         printf("Controls:\n");
3880         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3881                 printf("  %-15s %-27s  %s\n", 
3882                        ctdb_commands[i].name, 
3883                        ctdb_commands[i].args?ctdb_commands[i].args:"",
3884                        ctdb_commands[i].msg);
3885         }
3886         exit(1);
3887 }
3888
3889
3890 static void ctdb_alarm(int sig)
3891 {
3892         printf("Maximum runtime exceeded - exiting\n");
3893         _exit(ERR_TIMEOUT);
3894 }
3895
3896 /*
3897   main program
3898 */
3899 int main(int argc, const char *argv[])
3900 {
3901         struct ctdb_context *ctdb;
3902         char *nodestring = NULL;
3903         struct poptOption popt_options[] = {
3904                 POPT_AUTOHELP
3905                 POPT_CTDB_CMDLINE
3906                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
3907                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
3908                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
3909                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
3910                 POPT_TABLEEND
3911         };
3912         int opt;
3913         const char **extra_argv;
3914         int extra_argc = 0;
3915         int ret=-1, i;
3916         poptContext pc;
3917         struct event_context *ev;
3918         const char *control;
3919
3920         setlinebuf(stdout);
3921         
3922         /* set some defaults */
3923         options.maxruntime = 0;
3924         options.timelimit = 3;
3925         options.pnn = CTDB_CURRENT_NODE;
3926
3927         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
3928
3929         while ((opt = poptGetNextOpt(pc)) != -1) {
3930                 switch (opt) {
3931                 default:
3932                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
3933                                 poptBadOption(pc, 0), poptStrerror(opt)));
3934                         exit(1);
3935                 }
3936         }
3937
3938         /* setup the remaining options for the main program to use */
3939         extra_argv = poptGetArgs(pc);
3940         if (extra_argv) {
3941                 extra_argv++;
3942                 while (extra_argv[extra_argc]) extra_argc++;
3943         }
3944
3945         if (extra_argc < 1) {
3946                 usage();
3947         }
3948
3949         if (options.maxruntime == 0) {
3950                 const char *ctdb_timeout;
3951                 ctdb_timeout = getenv("CTDB_TIMEOUT");
3952                 if (ctdb_timeout != NULL) {
3953                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
3954                 } else {
3955                         /* default timeout is 120 seconds */
3956                         options.maxruntime = 120;
3957                 }
3958         }
3959
3960         signal(SIGALRM, ctdb_alarm);
3961         alarm(options.maxruntime);
3962
3963         /* setup the node number to contact */
3964         if (nodestring != NULL) {
3965                 if (strcmp(nodestring, "all") == 0) {
3966                         options.pnn = CTDB_BROADCAST_ALL;
3967                 } else {
3968                         options.pnn = strtoul(nodestring, NULL, 0);
3969                 }
3970         }
3971
3972         control = extra_argv[0];
3973
3974         ev = event_context_init(NULL);
3975
3976         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3977                 if (strcmp(control, ctdb_commands[i].name) == 0) {
3978                         int j;
3979
3980                         if (ctdb_commands[i].without_daemon == true) {
3981                                 close(2);
3982                         }
3983
3984                         /* initialise ctdb */
3985                         ctdb = ctdb_cmdline_client(ev);
3986
3987                         if (ctdb_commands[i].without_daemon == false) {
3988                                 if (ctdb == NULL) {
3989                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
3990                                         exit(1);
3991                                 }
3992
3993                                 /* verify the node exists */
3994                                 verify_node(ctdb);
3995
3996                                 if (options.pnn == CTDB_CURRENT_NODE) {
3997                                         int pnn;
3998                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
3999                                         if (pnn == -1) {
4000                                                 return -1;
4001                                         }
4002                                         options.pnn = pnn;
4003                                 }
4004                         }
4005
4006                         if (ctdb_commands[i].auto_all && 
4007                             options.pnn == CTDB_BROADCAST_ALL) {
4008                                 uint32_t *nodes;
4009                                 uint32_t num_nodes;
4010                                 ret = 0;
4011
4012                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4013                                 CTDB_NO_MEMORY(ctdb, nodes);
4014         
4015                                 for (j=0;j<num_nodes;j++) {
4016                                         options.pnn = nodes[j];
4017                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4018                                 }
4019                                 talloc_free(nodes);
4020                         } else {
4021                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4022                         }
4023                         break;
4024                 }
4025         }
4026
4027         if (i == ARRAY_SIZE(ctdb_commands)) {
4028                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4029                 exit(1);
4030         }
4031
4032         return ret;
4033 }