ctdb: scriptstatus can now query non-monitor events
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
517                 }
518                 return 0;
519         }
520
521         printf("Number of nodes:%d\n", nodemap->num);
522         for(i=0;i<nodemap->num;i++){
523                 static const struct {
524                         uint32_t flag;
525                         const char *name;
526                 } flag_names[] = {
527                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
528                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
529                         { NODE_FLAGS_BANNED,                "BANNED" },
530                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
531                         { NODE_FLAGS_DELETED,               "DELETED" },
532                         { NODE_FLAGS_STOPPED,               "STOPPED" },
533                 };
534                 char *flags_str = NULL;
535                 int j;
536
537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
538                         continue;
539                 }
540                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
541                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
542                                 if (flags_str == NULL) {
543                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
544                                 } else {
545                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
546                                                                            flag_names[j].name);
547                                 }
548                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
549                         }
550                 }
551                 if (flags_str == NULL) {
552                         flags_str = talloc_strdup(ctdb, "OK");
553                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
554                 }
555                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
556                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
557                        flags_str,
558                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
559                 talloc_free(flags_str);
560         }
561
562         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
563         if (ret != 0) {
564                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
565                 return ret;
566         }
567         if (vnnmap->generation == INVALID_GENERATION) {
568                 printf("Generation:INVALID\n");
569         } else {
570                 printf("Generation:%d\n",vnnmap->generation);
571         }
572         printf("Size:%d\n",vnnmap->size);
573         for(i=0;i<vnnmap->size;i++){
574                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
575         }
576
577         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
580                 return ret;
581         }
582         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
583
584         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
585         if (ret != 0) {
586                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
587                 return ret;
588         }
589         printf("Recovery master:%d\n",recmaster);
590
591         return 0;
592 }
593
594
595 struct natgw_node {
596         struct natgw_node *next;
597         const char *addr;
598 };
599
600 /*
601   display the list of nodes belonging to this natgw configuration
602  */
603 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
604 {
605         int i, ret;
606         const char *natgw_list;
607         int nlines;
608         char **lines;
609         struct natgw_node *natgw_nodes = NULL;
610         struct natgw_node *natgw_node;
611         struct ctdb_node_map *nodemap=NULL;
612
613
614         /* read the natgw nodes file into a linked list */
615         natgw_list = getenv("NATGW_NODES");
616         if (natgw_list == NULL) {
617                 natgw_list = "/etc/ctdb/natgw_nodes";
618         }
619         lines = file_lines_load(natgw_list, &nlines, ctdb);
620         if (lines == NULL) {
621                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
622                 return -1;
623         }
624         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
625                 nlines--;
626         }
627         for (i=0;i<nlines;i++) {
628                 char *node;
629
630                 node = lines[i];
631                 /* strip leading spaces */
632                 while((*node == ' ') || (*node == '\t')) {
633                         node++;
634                 }
635                 if (*node == '#') {
636                         continue;
637                 }
638                 if (strcmp(node, "") == 0) {
639                         continue;
640                 }
641                 natgw_node = talloc(ctdb, struct natgw_node);
642                 natgw_node->addr = talloc_strdup(natgw_node, node);
643                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
644                 natgw_node->next = natgw_nodes;
645                 natgw_nodes = natgw_node;
646         }
647
648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
649         if (ret != 0) {
650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
651                 return ret;
652         }
653
654         i=0;
655         while(i<nodemap->num) {
656                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
657                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
658                                 break;
659                         }
660                 }
661
662                 /* this node was not in the natgw so we just remove it from
663                  * the list
664                  */
665                 if ((natgw_node == NULL) 
666                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
667                         int j;
668
669                         for (j=i+1; j<nodemap->num; j++) {
670                                 nodemap->nodes[j-1] = nodemap->nodes[j];
671                         }
672                         nodemap->num--;
673                         continue;
674                 }
675
676                 i++;
677         }               
678
679         /* pick a node to be natgwmaster
680          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
681          */
682         for(i=0;i<nodemap->num;i++){
683                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
684                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
685                         break;
686                 }
687         }
688         /* we couldnt find any healthy node, try unhealthy ones */
689         if (i == nodemap->num) {
690                 for(i=0;i<nodemap->num;i++){
691                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
692                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
693                                 break;
694                         }
695                 }
696         }
697         /* unless all nodes are STOPPED, when we pick one anyway */
698         if (i == nodemap->num) {
699                 for(i=0;i<nodemap->num;i++){
700                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
701                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
702                                 break;
703                         }
704                 }
705                 /* or if we still can not find any */
706                 if (i == nodemap->num) {
707                         printf("-1 0.0.0.0\n");
708                 }
709         }
710
711         /* print the pruned list of nodes belonging to this natgw list */
712         for(i=0;i<nodemap->num;i++){
713                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
714                         continue;
715                 }
716                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
717                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
718                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
723         }
724
725         return 0;
726 }
727
728 /*
729   display the status of the scripts for monitoring (or other events)
730  */
731 static int control_one_scriptstatus(struct ctdb_context *ctdb,
732                                     enum ctdb_eventscript_call type)
733 {
734         struct ctdb_scripts_wire *script_status;
735         int ret, i;
736
737         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
738         if (ret != 0) {
739                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
740                 return ret;
741         }
742
743         if (script_status == NULL) {
744                 if (!options.machinereadable) {
745                         printf("%s cycle never run\n",
746                                ctdb_eventscript_call_names[type]);
747                 }
748                 return 0;
749         }
750
751         if (!options.machinereadable) {
752                 printf("%d scripts were executed last %s cycle\n",
753                        script_status->num_scripts,
754                        ctdb_eventscript_call_names[type]);
755         }
756         for (i=0; i<script_status->num_scripts; i++) {
757                 const char *status = NULL;
758
759                 switch (script_status->scripts[i].status) {
760                 case -ETIME:
761                         status = "TIMEDOUT";
762                         break;
763                 case -ENOEXEC:
764                         status = "DISABLED";
765                         break;
766                 case 0:
767                         status = "OK";
768                         break;
769                 default:
770                         if (script_status->scripts[i].status > 0)
771                                 status = "ERROR";
772                         break;
773                 }
774                 if (options.machinereadable) {
775                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
776                                ctdb_eventscript_call_names[type],
777                                script_status->scripts[i].name,
778                                script_status->scripts[i].status,
779                                status,
780                                (long)script_status->scripts[i].start.tv_sec,
781                                (long)script_status->scripts[i].start.tv_usec,
782                                (long)script_status->scripts[i].finished.tv_sec,
783                                (long)script_status->scripts[i].finished.tv_usec,
784                                script_status->scripts[i].output);
785                         continue;
786                 }
787                 if (status)
788                         printf("%-20s Status:%s    ",
789                                script_status->scripts[i].name, status);
790                 else
791                         /* Some other error, eg from stat. */
792                         printf("%-20s Status:CANNOT RUN (%s)",
793                                script_status->scripts[i].name,
794                                strerror(-script_status->scripts[i].status));
795
796                 if (script_status->scripts[i].status >= 0) {
797                         printf("Duration:%.3lf ",
798                         timeval_delta(&script_status->scripts[i].finished,
799                               &script_status->scripts[i].start));
800                 }
801                 if (script_status->scripts[i].status != -ENOEXEC) {
802                         printf("%s",
803                                ctime(&script_status->scripts[i].start.tv_sec));
804                 }
805                 if (script_status->scripts[i].status != 0) {
806                         printf("   OUTPUT:%s\n",
807                                 script_status->scripts[i].output);
808                 }
809         }
810         return 0;
811 }
812
813
814 static int control_scriptstatus(struct ctdb_context *ctdb,
815                                 int argc, const char **argv)
816 {
817         int ret;
818         enum ctdb_eventscript_call type, min, max;
819         const char *arg;
820
821         if (argc > 1) {
822                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
823                 return -1;
824         }
825
826         if (argc == 0)
827                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
828         else
829                 arg = argv[0];
830
831         for (type = 0; type < CTDB_EVENT_MAX; type++) {
832                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
833                         min = type;
834                         max = type+1;
835                         break;
836                 }
837         }
838         if (type == CTDB_EVENT_MAX) {
839                 if (strcmp(arg, "all") == 0) {
840                         min = 0;
841                         max = CTDB_EVENT_MAX;
842                 } else {
843                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
844                         return -1;
845                 }
846         }
847
848         if (options.machinereadable) {
849                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
850         }
851
852         for (type = min; type < max; type++) {
853                 ret = control_one_scriptstatus(ctdb, type);
854                 if (ret != 0) {
855                         return ret;
856                 }
857         }
858
859         return 0;
860 }
861
862 /*
863   enable an eventscript
864  */
865 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
866 {
867         int ret;
868
869         if (argc < 1) {
870                 usage();
871         }
872
873         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
874         if (ret != 0) {
875           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
876                 return ret;
877         }
878
879         return 0;
880 }
881
882 /*
883   disable an eventscript
884  */
885 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
886 {
887         int ret;
888
889         if (argc < 1) {
890                 usage();
891         }
892
893         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
894         if (ret != 0) {
895           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
896                 return ret;
897         }
898
899         return 0;
900 }
901
902 /*
903   display the pnn of the recovery master
904  */
905 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
906 {
907         int ret;
908         uint32_t recmaster;
909
910         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
911         if (ret != 0) {
912                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
913                 return ret;
914         }
915         printf("%d\n",recmaster);
916
917         return 0;
918 }
919
920 /*
921   get a list of all tickles for this pnn
922  */
923 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
924 {
925         struct ctdb_control_tcp_tickle_list *list;
926         ctdb_sock_addr addr;
927         int i, ret;
928
929         if (argc < 1) {
930                 usage();
931         }
932
933         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
934                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
935                 return -1;
936         }
937
938         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
939         if (ret == -1) {
940                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
941                 return -1;
942         }
943
944         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
945         printf("Num tickles:%u\n", list->tickles.num);
946         for (i=0;i<list->tickles.num;i++) {
947                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
948                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
949         }
950
951         talloc_free(list);
952         
953         return 0;
954 }
955
956
957
958 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
959 {
960         struct ctdb_all_public_ips *ips;
961         struct ctdb_public_ip ip;
962         int i, ret;
963         uint32_t *nodes;
964         uint32_t disable_time;
965         TDB_DATA data;
966         struct ctdb_node_map *nodemap=NULL;
967         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
968
969         disable_time = 30;
970         data.dptr  = (uint8_t*)&disable_time;
971         data.dsize = sizeof(disable_time);
972         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
973         if (ret != 0) {
974                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
975                 return -1;
976         }
977
978
979
980         /* read the public ip list from the node */
981         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
982         if (ret != 0) {
983                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
984                 talloc_free(tmp_ctx);
985                 return -1;
986         }
987
988         for (i=0;i<ips->num;i++) {
989                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
990                         break;
991                 }
992         }
993         if (i==ips->num) {
994                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
995                         pnn, ctdb_addr_to_str(addr)));
996                 talloc_free(tmp_ctx);
997                 return -1;
998         }
999
1000         ip.pnn  = pnn;
1001         ip.addr = *addr;
1002
1003         data.dptr  = (uint8_t *)&ip;
1004         data.dsize = sizeof(ip);
1005
1006         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1007         if (ret != 0) {
1008                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1009                 talloc_free(tmp_ctx);
1010                 return ret;
1011         }
1012
1013         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1014         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1015                                         nodes, 0,
1016                                         TIMELIMIT(),
1017                                         false, data,
1018                                         NULL, NULL,
1019                                         NULL);
1020         if (ret != 0) {
1021                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1022                 talloc_free(tmp_ctx);
1023                 return -1;
1024         }
1025
1026         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1027         if (ret != 0) {
1028                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1029                 talloc_free(tmp_ctx);
1030                 return -1;
1031         }
1032
1033         talloc_free(tmp_ctx);
1034         return 0;
1035 }
1036
1037 /*
1038   move/failover an ip address to a specific node
1039  */
1040 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1041 {
1042         uint32_t pnn;
1043         ctdb_sock_addr addr;
1044
1045         if (argc < 2) {
1046                 usage();
1047                 return -1;
1048         }
1049
1050         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1051                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1052                 return -1;
1053         }
1054
1055
1056         if (sscanf(argv[1], "%u", &pnn) != 1) {
1057                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1058                 return -1;
1059         }
1060
1061         if (move_ip(ctdb, &addr, pnn) != 0) {
1062                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1063                 return -1;
1064         }
1065
1066         return 0;
1067 }
1068
1069 void getips_store_callback(void *param, void *data)
1070 {
1071         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1072         struct ctdb_all_public_ips *ips = param;
1073         int i;
1074
1075         i = ips->num++;
1076         ips->ips[i].pnn  = node_ip->pnn;
1077         ips->ips[i].addr = node_ip->addr;
1078 }
1079
1080 void getips_count_callback(void *param, void *data)
1081 {
1082         uint32_t *count = param;
1083
1084         (*count)++;
1085 }
1086
1087 #define IP_KEYLEN       4
1088 static uint32_t *ip_key(ctdb_sock_addr *ip)
1089 {
1090         static uint32_t key[IP_KEYLEN];
1091
1092         bzero(key, sizeof(key));
1093
1094         switch (ip->sa.sa_family) {
1095         case AF_INET:
1096                 key[0]  = ip->ip.sin_addr.s_addr;
1097                 break;
1098         case AF_INET6:
1099                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1100                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1101                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1102                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1103                 break;
1104         default:
1105                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1106                 return key;
1107         }
1108
1109         return key;
1110 }
1111
1112 static void *add_ip_callback(void *parm, void *data)
1113 {
1114         return parm;
1115 }
1116
1117 static int
1118 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1119 {
1120         struct ctdb_all_public_ips *tmp_ips;
1121         struct ctdb_node_map *nodemap=NULL;
1122         trbt_tree_t *ip_tree;
1123         int i, j, len, ret;
1124         uint32_t count;
1125
1126         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1127         if (ret != 0) {
1128                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1129                 return ret;
1130         }
1131
1132         ip_tree = trbt_create(tmp_ctx, 0);
1133
1134         for(i=0;i<nodemap->num;i++){
1135                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1136                         continue;
1137                 }
1138                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1139                         continue;
1140                 }
1141
1142                 /* read the public ip list from this node */
1143                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1144                 if (ret != 0) {
1145                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1146                         return -1;
1147                 }
1148         
1149                 for (j=0; j<tmp_ips->num;j++) {
1150                         struct ctdb_public_ip *node_ip;
1151
1152                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1153                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1154                         node_ip->addr = tmp_ips->ips[j].addr;
1155
1156                         trbt_insertarray32_callback(ip_tree,
1157                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1158                                 add_ip_callback,
1159                                 node_ip);
1160                 }
1161                 talloc_free(tmp_ips);
1162         }
1163
1164         /* traverse */
1165         count = 0;
1166         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1167
1168         len = offsetof(struct ctdb_all_public_ips, ips) + 
1169                 count*sizeof(struct ctdb_public_ip);
1170         tmp_ips = talloc_zero_size(tmp_ctx, len);
1171         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1172
1173         *ips = tmp_ips;
1174
1175         return 0;
1176 }
1177
1178
1179 /* 
1180  * scans all other nodes and returns a pnn for another node that can host this 
1181  * ip address or -1
1182  */
1183 static int
1184 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1185 {
1186         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1187         struct ctdb_all_public_ips *ips;
1188         struct ctdb_node_map *nodemap=NULL;
1189         int i, j, ret;
1190
1191         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1192         if (ret != 0) {
1193                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1194                 talloc_free(tmp_ctx);
1195                 return ret;
1196         }
1197
1198         for(i=0;i<nodemap->num;i++){
1199                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1200                         continue;
1201                 }
1202                 if (nodemap->nodes[i].pnn == options.pnn) {
1203                         continue;
1204                 }
1205
1206                 /* read the public ip list from this node */
1207                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1208                 if (ret != 0) {
1209                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1210                         return -1;
1211                 }
1212
1213                 for (j=0;j<ips->num;j++) {
1214                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1215                                 talloc_free(tmp_ctx);
1216                                 return nodemap->nodes[i].pnn;
1217                         }
1218                 }
1219                 talloc_free(ips);
1220         }
1221
1222         talloc_free(tmp_ctx);
1223         return -1;
1224 }
1225
1226 /*
1227   add a public ip address to a node
1228  */
1229 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1230 {
1231         int i, ret;
1232         int len;
1233         uint32_t pnn;
1234         unsigned mask;
1235         ctdb_sock_addr addr;
1236         struct ctdb_control_ip_iface *pub;
1237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1238         struct ctdb_all_public_ips *ips;
1239
1240
1241         if (argc != 2) {
1242                 talloc_free(tmp_ctx);
1243                 usage();
1244         }
1245
1246         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1247                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1248                 talloc_free(tmp_ctx);
1249                 return -1;
1250         }
1251
1252         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1253         if (ret != 0) {
1254                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1255                 talloc_free(tmp_ctx);
1256                 return ret;
1257         }
1258
1259
1260         /* check if some other node is already serving this ip, if not,
1261          * we will claim it
1262          */
1263         for (i=0;i<ips->num;i++) {
1264                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1265                         break;
1266                 }
1267         }
1268
1269         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1270         pub = talloc_size(tmp_ctx, len); 
1271         CTDB_NO_MEMORY(ctdb, pub);
1272
1273         pub->addr  = addr;
1274         pub->mask  = mask;
1275         pub->len   = strlen(argv[1])+1;
1276         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1277
1278         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1279         if (ret != 0) {
1280                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1281                 talloc_free(tmp_ctx);
1282                 return ret;
1283         }
1284
1285         if (i == ips->num) {
1286                 /* no one has this ip so we claim it */
1287                 pnn  = options.pnn;
1288         } else {
1289                 pnn  = ips->ips[i].pnn;
1290         }
1291
1292         if (move_ip(ctdb, &addr, pnn) != 0) {
1293                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1294                 return -1;
1295         }
1296
1297         talloc_free(tmp_ctx);
1298         return 0;
1299 }
1300
1301 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1302
1303 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1304 {
1305         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1306         struct ctdb_node_map *nodemap=NULL;
1307         struct ctdb_all_public_ips *ips;
1308         int ret, i, j;
1309
1310         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1311         if (ret != 0) {
1312                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1313                 return ret;
1314         }
1315
1316         /* remove it from the nodes that are not hosting the ip currently */
1317         for(i=0;i<nodemap->num;i++){
1318                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1319                         continue;
1320                 }
1321                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1322                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1323                         continue;
1324                 }
1325
1326                 for (j=0;j<ips->num;j++) {
1327                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1328                                 break;
1329                         }
1330                 }
1331                 if (j==ips->num) {
1332                         continue;
1333                 }
1334
1335                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1336                         continue;
1337                 }
1338
1339                 options.pnn = nodemap->nodes[i].pnn;
1340                 control_delip(ctdb, argc, argv);
1341         }
1342
1343
1344         /* remove it from every node (also the one hosting it) */
1345         for(i=0;i<nodemap->num;i++){
1346                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1347                         continue;
1348                 }
1349                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1350                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1351                         continue;
1352                 }
1353
1354                 for (j=0;j<ips->num;j++) {
1355                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1356                                 break;
1357                         }
1358                 }
1359                 if (j==ips->num) {
1360                         continue;
1361                 }
1362
1363                 options.pnn = nodemap->nodes[i].pnn;
1364                 control_delip(ctdb, argc, argv);
1365         }
1366
1367         talloc_free(tmp_ctx);
1368         return 0;
1369 }
1370         
1371 /*
1372   delete a public ip address from a node
1373  */
1374 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1375 {
1376         int i, ret;
1377         ctdb_sock_addr addr;
1378         struct ctdb_control_ip_iface pub;
1379         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1380         struct ctdb_all_public_ips *ips;
1381
1382         if (argc != 1) {
1383                 talloc_free(tmp_ctx);
1384                 usage();
1385         }
1386
1387         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1388                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1389                 return -1;
1390         }
1391
1392         if (options.pnn == CTDB_BROADCAST_ALL) {
1393                 return control_delip_all(ctdb, argc, argv, &addr);
1394         }
1395
1396         pub.addr  = addr;
1397         pub.mask  = 0;
1398         pub.len   = 0;
1399
1400         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1401         if (ret != 0) {
1402                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1403                 talloc_free(tmp_ctx);
1404                 return ret;
1405         }
1406         
1407         for (i=0;i<ips->num;i++) {
1408                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1409                         break;
1410                 }
1411         }
1412
1413         if (i==ips->num) {
1414                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1415                         ctdb_addr_to_str(&addr)));
1416                 talloc_free(tmp_ctx);
1417                 return -1;
1418         }
1419
1420         if (ips->ips[i].pnn == options.pnn) {
1421                 ret = find_other_host_for_public_ip(ctdb, &addr);
1422                 if (ret != -1) {
1423                         if (move_ip(ctdb, &addr, ret) != 0) {
1424                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1425                                 return -1;
1426                         }
1427                 }
1428         }
1429
1430         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1431         if (ret != 0) {
1432                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1433                 talloc_free(tmp_ctx);
1434                 return ret;
1435         }
1436
1437         talloc_free(tmp_ctx);
1438         return 0;
1439 }
1440
1441 /*
1442   kill a tcp connection
1443  */
1444 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1445 {
1446         int ret;
1447         struct ctdb_control_killtcp killtcp;
1448
1449         if (argc < 2) {
1450                 usage();
1451         }
1452
1453         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1454                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1455                 return -1;
1456         }
1457
1458         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1459                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1460                 return -1;
1461         }
1462
1463         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1464         if (ret != 0) {
1465                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1466                 return ret;
1467         }
1468
1469         return 0;
1470 }
1471
1472
1473 /*
1474   send a gratious arp
1475  */
1476 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1477 {
1478         int ret;
1479         ctdb_sock_addr addr;
1480
1481         if (argc < 2) {
1482                 usage();
1483         }
1484
1485         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1486                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1487                 return -1;
1488         }
1489
1490         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1491         if (ret != 0) {
1492                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1493                 return ret;
1494         }
1495
1496         return 0;
1497 }
1498
1499 /*
1500   register a server id
1501  */
1502 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1503 {
1504         int ret;
1505         struct ctdb_server_id server_id;
1506
1507         if (argc < 3) {
1508                 usage();
1509         }
1510
1511         server_id.pnn       = strtoul(argv[0], NULL, 0);
1512         server_id.type      = strtoul(argv[1], NULL, 0);
1513         server_id.server_id = strtoul(argv[2], NULL, 0);
1514
1515         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1516         if (ret != 0) {
1517                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1518                 return ret;
1519         }
1520         return -1;
1521 }
1522
1523 /*
1524   unregister a server id
1525  */
1526 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1527 {
1528         int ret;
1529         struct ctdb_server_id server_id;
1530
1531         if (argc < 3) {
1532                 usage();
1533         }
1534
1535         server_id.pnn       = strtoul(argv[0], NULL, 0);
1536         server_id.type      = strtoul(argv[1], NULL, 0);
1537         server_id.server_id = strtoul(argv[2], NULL, 0);
1538
1539         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1540         if (ret != 0) {
1541                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1542                 return ret;
1543         }
1544         return -1;
1545 }
1546
1547 /*
1548   check if a server id exists
1549  */
1550 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1551 {
1552         uint32_t status;
1553         int ret;
1554         struct ctdb_server_id server_id;
1555
1556         if (argc < 3) {
1557                 usage();
1558         }
1559
1560         server_id.pnn       = strtoul(argv[0], NULL, 0);
1561         server_id.type      = strtoul(argv[1], NULL, 0);
1562         server_id.server_id = strtoul(argv[2], NULL, 0);
1563
1564         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1565         if (ret != 0) {
1566                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1567                 return ret;
1568         }
1569
1570         if (status) {
1571                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1572         } else {
1573                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1574         }
1575         return 0;
1576 }
1577
1578 /*
1579   get a list of all server ids that are registered on a node
1580  */
1581 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1582 {
1583         int i, ret;
1584         struct ctdb_server_id_list *server_ids;
1585
1586         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1587         if (ret != 0) {
1588                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1589                 return ret;
1590         }
1591
1592         for (i=0; i<server_ids->num; i++) {
1593                 printf("Server id %d:%d:%d\n", 
1594                         server_ids->server_ids[i].pnn, 
1595                         server_ids->server_ids[i].type, 
1596                         server_ids->server_ids[i].server_id); 
1597         }
1598
1599         return -1;
1600 }
1601
1602 /*
1603   send a tcp tickle ack
1604  */
1605 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1606 {
1607         int ret;
1608         ctdb_sock_addr  src, dst;
1609
1610         if (argc < 2) {
1611                 usage();
1612         }
1613
1614         if (!parse_ip_port(argv[0], &src)) {
1615                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1616                 return -1;
1617         }
1618
1619         if (!parse_ip_port(argv[1], &dst)) {
1620                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1621                 return -1;
1622         }
1623
1624         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1625         if (ret==0) {
1626                 return 0;
1627         }
1628         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1629
1630         return -1;
1631 }
1632
1633
1634 /*
1635   display public ip status
1636  */
1637 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1638 {
1639         int i, ret;
1640         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1641         struct ctdb_all_public_ips *ips;
1642
1643         if (options.pnn == CTDB_BROADCAST_ALL) {
1644                 /* read the list of public ips from all nodes */
1645                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1646         } else {
1647                 /* read the public ip list from this node */
1648                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1649         }
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1652                 talloc_free(tmp_ctx);
1653                 return ret;
1654         }
1655
1656         if (options.machinereadable){
1657                 printf(":Public IP:Node:\n");
1658         } else {
1659                 if (options.pnn == CTDB_BROADCAST_ALL) {
1660                         printf("Public IPs on ALL nodes\n");
1661                 } else {
1662                         printf("Public IPs on node %u\n", options.pnn);
1663                 }
1664         }
1665
1666         for (i=1;i<=ips->num;i++) {
1667                 if (options.machinereadable){
1668                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1669                 } else {
1670                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1671                 }
1672         }
1673
1674         talloc_free(tmp_ctx);
1675         return 0;
1676 }
1677
1678 /*
1679   display pid of a ctdb daemon
1680  */
1681 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1682 {
1683         uint32_t pid;
1684         int ret;
1685
1686         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1687         if (ret != 0) {
1688                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1689                 return ret;
1690         }
1691         printf("Pid:%d\n", pid);
1692
1693         return 0;
1694 }
1695
1696 /*
1697   handler for receiving the response to ipreallocate
1698 */
1699 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1700                              TDB_DATA data, void *private_data)
1701 {
1702         exit(0);
1703 }
1704
1705 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1706 {
1707         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1708
1709         event_add_timed(ctdb->ev, ctdb, 
1710                                 timeval_current_ofs(1, 0),
1711                                 ctdb_every_second, ctdb);
1712 }
1713
1714 /*
1715   ask the recovery daemon on the recovery master to perform a ip reallocation
1716  */
1717 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1718 {
1719         int i, ret;
1720         TDB_DATA data;
1721         struct takeover_run_reply rd;
1722         uint32_t recmaster;
1723         struct ctdb_node_map *nodemap=NULL;
1724         int retries=0;
1725         struct timeval tv = timeval_current();
1726
1727         /* we need some events to trigger so we can timeout and restart
1728            the loop
1729         */
1730         event_add_timed(ctdb->ev, ctdb, 
1731                                 timeval_current_ofs(1, 0),
1732                                 ctdb_every_second, ctdb);
1733
1734         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1735         if (rd.pnn == -1) {
1736                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1737                 return -1;
1738         }
1739         rd.srvid = getpid();
1740
1741         /* register a message port for receiveing the reply so that we
1742            can receive the reply
1743         */
1744         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1745
1746         data.dptr = (uint8_t *)&rd;
1747         data.dsize = sizeof(rd);
1748
1749 again:
1750         if (retries>5) {
1751                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1752                 exit(10);
1753         }
1754
1755         /* check that there are valid nodes available */
1756         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1757                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1758                 exit(10);
1759         }
1760         for (i=0; i<nodemap->num;i++) {
1761                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1762                         break;
1763                 }
1764         }
1765         if (i==nodemap->num) {
1766                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1767                 return 0;
1768         }
1769
1770
1771         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1774                 return ret;
1775         }
1776
1777         /* verify the node exists */
1778         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1779                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1780                 exit(10);
1781         }
1782
1783
1784         /* check tha there are nodes available that can act as a recmaster */
1785         for (i=0; i<nodemap->num; i++) {
1786                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1787                         continue;
1788                 }
1789                 break;
1790         }
1791         if (i == nodemap->num) {
1792                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1793                 return 0;
1794         }
1795
1796         /* verify the recovery master is not STOPPED, nor BANNED */
1797         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1798                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1799                 retries++;
1800                 sleep(1);
1801                 goto again;
1802         } 
1803
1804         
1805         /* verify the recovery master is not STOPPED, nor BANNED */
1806         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1807                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1808                 retries++;
1809                 sleep(1);
1810                 goto again;
1811         } 
1812
1813         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1816                 return -1;
1817         }
1818
1819         tv = timeval_current();
1820         /* this loop will terminate when we have received the reply */
1821         while (timeval_elapsed(&tv) < 3.0) {    
1822                 event_loop_once(ctdb->ev);
1823         }
1824
1825         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1826         retries++;
1827         sleep(1);
1828         goto again;
1829
1830         return 0;
1831 }
1832
1833
1834 /*
1835   disable a remote node
1836  */
1837 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1838 {
1839         int ret;
1840         struct ctdb_node_map *nodemap=NULL;
1841
1842         do {
1843                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1844                 if (ret != 0) {
1845                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1846                         return ret;
1847                 }
1848
1849                 sleep(1);
1850
1851                 /* read the nodemap and verify the change took effect */
1852                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1853                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1854                         exit(10);
1855                 }
1856
1857         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1858         ret = control_ipreallocate(ctdb, argc, argv);
1859         if (ret != 0) {
1860                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1861                 return ret;
1862         }
1863
1864         return 0;
1865 }
1866
1867 /*
1868   enable a disabled remote node
1869  */
1870 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1871 {
1872         int ret;
1873
1874         struct ctdb_node_map *nodemap=NULL;
1875
1876         do {
1877                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1878                 if (ret != 0) {
1879                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1880                         return ret;
1881                 }
1882
1883                 sleep(1);
1884
1885                 /* read the nodemap and verify the change took effect */
1886                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1887                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1888                         exit(10);
1889                 }
1890
1891         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1892         ret = control_ipreallocate(ctdb, argc, argv);
1893         if (ret != 0) {
1894                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1895                 return ret;
1896         }
1897
1898         return 0;
1899 }
1900
1901 /*
1902   stop a remote node
1903  */
1904 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1905 {
1906         int ret;
1907         struct ctdb_node_map *nodemap=NULL;
1908
1909         do {
1910                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1911                 if (ret != 0) {
1912                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1913                 }
1914         
1915                 sleep(1);
1916
1917                 /* read the nodemap and verify the change took effect */
1918                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1919                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1920                         exit(10);
1921                 }
1922
1923         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1924         ret = control_ipreallocate(ctdb, argc, argv);
1925         if (ret != 0) {
1926                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1927                 return ret;
1928         }
1929
1930         return 0;
1931 }
1932
1933 /*
1934   restart a stopped remote node
1935  */
1936 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1937 {
1938         int ret;
1939
1940         struct ctdb_node_map *nodemap=NULL;
1941
1942         do {
1943                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1944                 if (ret != 0) {
1945                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1946                         return ret;
1947                 }
1948         
1949                 sleep(1);
1950
1951                 /* read the nodemap and verify the change took effect */
1952                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1953                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1954                         exit(10);
1955                 }
1956
1957         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1958         ret = control_ipreallocate(ctdb, argc, argv);
1959         if (ret != 0) {
1960                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1961                 return ret;
1962         }
1963
1964         return 0;
1965 }
1966
1967 static uint32_t get_generation(struct ctdb_context *ctdb)
1968 {
1969         struct ctdb_vnn_map *vnnmap=NULL;
1970         int ret;
1971
1972         /* wait until the recmaster is not in recovery mode */
1973         while (1) {
1974                 uint32_t recmode, recmaster;
1975                 
1976                 if (vnnmap != NULL) {
1977                         talloc_free(vnnmap);
1978                         vnnmap = NULL;
1979                 }
1980
1981                 /* get the recmaster */
1982                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1983                 if (ret != 0) {
1984                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1985                         exit(10);
1986                 }
1987
1988                 /* get recovery mode */
1989                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1990                 if (ret != 0) {
1991                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1992                         exit(10);
1993                 }
1994
1995                 /* get the current generation number */
1996                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
1997                 if (ret != 0) {
1998                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
1999                         exit(10);
2000                 }
2001
2002                 if ((recmode == CTDB_RECOVERY_NORMAL)
2003                 &&  (vnnmap->generation != 1)){
2004                         return vnnmap->generation;
2005                 }
2006                 sleep(1);
2007         }
2008 }
2009
2010 /*
2011   ban a node from the cluster
2012  */
2013 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2014 {
2015         int ret;
2016         struct ctdb_node_map *nodemap=NULL;
2017         struct ctdb_ban_time bantime;
2018
2019         if (argc < 1) {
2020                 usage();
2021         }
2022         
2023         /* verify the node exists */
2024         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2025         if (ret != 0) {
2026                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2027                 return ret;
2028         }
2029
2030         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2031                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2032                 return -1;
2033         }
2034
2035         bantime.pnn  = options.pnn;
2036         bantime.time = strtoul(argv[0], NULL, 0);
2037
2038         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2041                 return -1;
2042         }       
2043
2044         ret = control_ipreallocate(ctdb, argc, argv);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2047                 return ret;
2048         }
2049
2050         return 0;
2051 }
2052
2053
2054 /*
2055   unban a node from the cluster
2056  */
2057 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2058 {
2059         int ret;
2060         struct ctdb_node_map *nodemap=NULL;
2061         struct ctdb_ban_time bantime;
2062
2063         /* verify the node exists */
2064         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2065         if (ret != 0) {
2066                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2067                 return ret;
2068         }
2069
2070         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2071                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2072                 return -1;
2073         }
2074
2075         bantime.pnn  = options.pnn;
2076         bantime.time = 0;
2077
2078         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2079         if (ret != 0) {
2080                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2081                 return -1;
2082         }       
2083
2084         ret = control_ipreallocate(ctdb, argc, argv);
2085         if (ret != 0) {
2086                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2087                 return ret;
2088         }
2089
2090         return 0;
2091 }
2092
2093
2094 /*
2095   show ban information for a node
2096  */
2097 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2098 {
2099         int ret;
2100         struct ctdb_node_map *nodemap=NULL;
2101         struct ctdb_ban_time *bantime;
2102
2103         /* verify the node exists */
2104         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2105         if (ret != 0) {
2106                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2107                 return ret;
2108         }
2109
2110         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2111         if (ret != 0) {
2112                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2113                 return -1;
2114         }       
2115
2116         if (bantime->time == 0) {
2117                 printf("Node %u is not banned\n", bantime->pnn);
2118         } else {
2119                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2120         }
2121
2122         return 0;
2123 }
2124
2125 /*
2126   shutdown a daemon
2127  */
2128 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2129 {
2130         int ret;
2131
2132         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2135                 return ret;
2136         }
2137
2138         return 0;
2139 }
2140
2141 /*
2142   trigger a recovery
2143  */
2144 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2145 {
2146         int ret;
2147         uint32_t generation, next_generation;
2148
2149         /* record the current generation number */
2150         generation = get_generation(ctdb);
2151
2152         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2153         if (ret != 0) {
2154                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2155                 return ret;
2156         }
2157
2158         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2161                 return ret;
2162         }
2163
2164         /* wait until we are in a new generation */
2165         while (1) {
2166                 next_generation = get_generation(ctdb);
2167                 if (next_generation != generation) {
2168                         return 0;
2169                 }
2170                 sleep(1);
2171         }
2172
2173         return 0;
2174 }
2175
2176
2177 /*
2178   display monitoring mode of a remote node
2179  */
2180 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2181 {
2182         uint32_t monmode;
2183         int ret;
2184
2185         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2186         if (ret != 0) {
2187                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2188                 return ret;
2189         }
2190         if (!options.machinereadable){
2191                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2192         } else {
2193                 printf(":mode:\n");
2194                 printf(":%d:\n",monmode);
2195         }
2196         return 0;
2197 }
2198
2199
2200 /*
2201   display capabilities of a remote node
2202  */
2203 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2204 {
2205         uint32_t capabilities;
2206         int ret;
2207
2208         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2209         if (ret != 0) {
2210                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2211                 return ret;
2212         }
2213         
2214         if (!options.machinereadable){
2215                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2216                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2217                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2218                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2219         } else {
2220                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2221                 printf(":%d:%d:%d:%d:\n",
2222                         !!(capabilities&CTDB_CAP_RECMASTER),
2223                         !!(capabilities&CTDB_CAP_LMASTER),
2224                         !!(capabilities&CTDB_CAP_LVS),
2225                         !!(capabilities&CTDB_CAP_NATGW));
2226         }
2227         return 0;
2228 }
2229
2230 /*
2231   display lvs configuration
2232  */
2233 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2234 {
2235         uint32_t *capabilities;
2236         struct ctdb_node_map *nodemap=NULL;
2237         int i, ret;
2238         int healthy_count = 0;
2239
2240         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2241         if (ret != 0) {
2242                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2243                 return ret;
2244         }
2245
2246         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2247         CTDB_NO_MEMORY(ctdb, capabilities);
2248         
2249         /* collect capabilities for all connected nodes */
2250         for (i=0; i<nodemap->num; i++) {
2251                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2252                         continue;
2253                 }
2254                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2255                         continue;
2256                 }
2257         
2258                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2259                 if (ret != 0) {
2260                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2261                         return ret;
2262                 }
2263
2264                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2265                         continue;
2266                 }
2267
2268                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2269                         healthy_count++;
2270                 }
2271         }
2272
2273         /* Print all LVS nodes */
2274         for (i=0; i<nodemap->num; i++) {
2275                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2276                         continue;
2277                 }
2278                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2279                         continue;
2280                 }
2281                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2282                         continue;
2283                 }
2284
2285                 if (healthy_count != 0) {
2286                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2287                                 continue;
2288                         }
2289                 }
2290
2291                 printf("%d:%s\n", i, 
2292                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2293         }
2294
2295         return 0;
2296 }
2297
2298 /*
2299   display who is the lvs master
2300  */
2301 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2302 {
2303         uint32_t *capabilities;
2304         struct ctdb_node_map *nodemap=NULL;
2305         int i, ret;
2306         int healthy_count = 0;
2307
2308         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2309         if (ret != 0) {
2310                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2311                 return ret;
2312         }
2313
2314         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2315         CTDB_NO_MEMORY(ctdb, capabilities);
2316         
2317         /* collect capabilities for all connected nodes */
2318         for (i=0; i<nodemap->num; i++) {
2319                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2320                         continue;
2321                 }
2322                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2323                         continue;
2324                 }
2325         
2326                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2327                 if (ret != 0) {
2328                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2329                         return ret;
2330                 }
2331
2332                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2333                         continue;
2334                 }
2335
2336                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2337                         healthy_count++;
2338                 }
2339         }
2340
2341         /* find and show the lvsmaster */
2342         for (i=0; i<nodemap->num; i++) {
2343                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2344                         continue;
2345                 }
2346                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2347                         continue;
2348                 }
2349                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2350                         continue;
2351                 }
2352
2353                 if (healthy_count != 0) {
2354                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2355                                 continue;
2356                         }
2357                 }
2358
2359                 if (options.machinereadable){
2360                         printf("%d\n", i);
2361                 } else {
2362                         printf("Node %d is LVS master\n", i);
2363                 }
2364                 return 0;
2365         }
2366
2367         printf("There is no LVS master\n");
2368         return -1;
2369 }
2370
2371 /*
2372   disable monitoring on a  node
2373  */
2374 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2375 {
2376         
2377         int ret;
2378
2379         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2380         if (ret != 0) {
2381                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2382                 return ret;
2383         }
2384         printf("Monitoring mode:%s\n","DISABLED");
2385
2386         return 0;
2387 }
2388
2389 /*
2390   enable monitoring on a  node
2391  */
2392 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2393 {
2394         
2395         int ret;
2396
2397         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2398         if (ret != 0) {
2399                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2400                 return ret;
2401         }
2402         printf("Monitoring mode:%s\n","ACTIVE");
2403
2404         return 0;
2405 }
2406
2407 /*
2408   display remote list of keys/data for a db
2409  */
2410 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2411 {
2412         const char *db_name;
2413         struct ctdb_db_context *ctdb_db;
2414         int ret;
2415
2416         if (argc < 1) {
2417                 usage();
2418         }
2419
2420         db_name = argv[0];
2421
2422
2423         if (db_exists(ctdb, db_name)) {
2424                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2425                 return -1;
2426         }
2427
2428         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2429
2430         if (ctdb_db == NULL) {
2431                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2432                 return -1;
2433         }
2434
2435         /* traverse and dump the cluster tdb */
2436         ret = ctdb_dump_db(ctdb_db, stdout);
2437         if (ret == -1) {
2438                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2439                 return -1;
2440         }
2441         talloc_free(ctdb_db);
2442
2443         printf("Dumped %d records\n", ret);
2444         return 0;
2445 }
2446
2447
2448 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2449                              TDB_DATA data, void *private_data)
2450 {
2451         DEBUG(DEBUG_ERR,("Log data received\n"));
2452         if (data.dsize > 0) {
2453                 printf("%s", data.dptr);
2454         }
2455
2456         exit(0);
2457 }
2458
2459 /*
2460   display a list of log messages from the in memory ringbuffer
2461  */
2462 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2463 {
2464         int ret;
2465         int32_t res;
2466         struct ctdb_get_log_addr log_addr;
2467         TDB_DATA data;
2468         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2469         char *errmsg;
2470         struct timeval tv;
2471
2472         if (argc != 1) {
2473                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2474                 talloc_free(tmp_ctx);
2475                 return -1;
2476         }
2477
2478         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2479         log_addr.srvid = getpid();
2480         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2481                 log_addr.level = get_debug_by_desc(argv[0]);
2482         } else {
2483                 log_addr.level = strtol(argv[0], NULL, 0);
2484         }
2485
2486
2487         data.dptr = (unsigned char *)&log_addr;
2488         data.dsize = sizeof(log_addr);
2489
2490         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2491
2492         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2493         sleep(1);
2494
2495         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2496
2497         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2498                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2499         if (ret != 0 || res != 0) {
2500                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2501                 talloc_free(tmp_ctx);
2502                 return -1;
2503         }
2504
2505
2506         tv = timeval_current();
2507         /* this loop will terminate when we have received the reply */
2508         while (timeval_elapsed(&tv) < 3.0) {    
2509                 event_loop_once(ctdb->ev);
2510         }
2511
2512         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2513
2514         talloc_free(tmp_ctx);
2515         return 0;
2516 }
2517
2518 /*
2519   clear the in memory log area
2520  */
2521 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2522 {
2523         int ret;
2524         int32_t res;
2525         char *errmsg;
2526         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2527
2528         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2529                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2530         if (ret != 0 || res != 0) {
2531                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2532                 talloc_free(tmp_ctx);
2533                 return -1;
2534         }
2535
2536         talloc_free(tmp_ctx);
2537         return 0;
2538 }
2539
2540
2541
2542 /*
2543   display a list of the databases on a remote ctdb
2544  */
2545 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2546 {
2547         int i, ret;
2548         struct ctdb_dbid_map *dbmap=NULL;
2549
2550         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2551         if (ret != 0) {
2552                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2553                 return ret;
2554         }
2555
2556         printf("Number of databases:%d\n", dbmap->num);
2557         for(i=0;i<dbmap->num;i++){
2558                 const char *path;
2559                 const char *name;
2560                 bool persistent;
2561
2562                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2563                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2564                 persistent = dbmap->dbs[i].persistent;
2565                 printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name, 
2566                        path, persistent?"PERSISTENT":"");
2567         }
2568
2569         return 0;
2570 }
2571
2572 /*
2573   check if the local node is recmaster or not
2574   it will return 1 if this node is the recmaster and 0 if it is not
2575   or if the local ctdb daemon could not be contacted
2576  */
2577 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2578 {
2579         uint32_t mypnn, recmaster;
2580         int ret;
2581
2582         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2583         if (mypnn == -1) {
2584                 printf("Failed to get pnn of node\n");
2585                 return 1;
2586         }
2587
2588         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2589         if (ret != 0) {
2590                 printf("Failed to get the recmaster\n");
2591                 return 1;
2592         }
2593
2594         if (recmaster != mypnn) {
2595                 printf("this node is not the recmaster\n");
2596                 return 1;
2597         }
2598
2599         printf("this node is the recmaster\n");
2600         return 0;
2601 }
2602
2603 /*
2604   ping a node
2605  */
2606 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2607 {
2608         int ret;
2609         struct timeval tv = timeval_current();
2610         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2611         if (ret == -1) {
2612                 printf("Unable to get ping response from node %u\n", options.pnn);
2613                 return -1;
2614         } else {
2615                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2616                        options.pnn, timeval_elapsed(&tv), ret);
2617         }
2618         return 0;
2619 }
2620
2621
2622 /*
2623   get a tunable
2624  */
2625 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2626 {
2627         const char *name;
2628         uint32_t value;
2629         int ret;
2630
2631         if (argc < 1) {
2632                 usage();
2633         }
2634
2635         name = argv[0];
2636         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2637         if (ret == -1) {
2638                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2639                 return -1;
2640         }
2641
2642         printf("%-19s = %u\n", name, value);
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2650 {
2651         const char *name;
2652         uint32_t value;
2653         int ret;
2654
2655         if (argc < 2) {
2656                 usage();
2657         }
2658
2659         name = argv[0];
2660         value = strtoul(argv[1], NULL, 0);
2661
2662         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2663         if (ret == -1) {
2664                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2665                 return -1;
2666         }
2667         return 0;
2668 }
2669
2670 /*
2671   list all tunables
2672  */
2673 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2674 {
2675         uint32_t count;
2676         const char **list;
2677         int ret, i;
2678
2679         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2680         if (ret == -1) {
2681                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2682                 return -1;
2683         }
2684
2685         for (i=0;i<count;i++) {
2686                 control_getvar(ctdb, 1, &list[i]);
2687         }
2688
2689         talloc_free(list);
2690         
2691         return 0;
2692 }
2693
2694 /*
2695   display debug level on a node
2696  */
2697 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2698 {
2699         int ret;
2700         int32_t level;
2701
2702         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2703         if (ret != 0) {
2704                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2705                 return ret;
2706         } else {
2707                 if (options.machinereadable){
2708                         printf(":Name:Level:\n");
2709                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2710                 } else {
2711                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2712                 }
2713         }
2714         return 0;
2715 }
2716
2717 /*
2718   display reclock file of a node
2719  */
2720 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2721 {
2722         int ret;
2723         const char *reclock;
2724
2725         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2726         if (ret != 0) {
2727                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2728                 return ret;
2729         } else {
2730                 if (options.machinereadable){
2731                         if (reclock != NULL) {
2732                                 printf("%s", reclock);
2733                         }
2734                 } else {
2735                         if (reclock == NULL) {
2736                                 printf("No reclock file used.\n");
2737                         } else {
2738                                 printf("Reclock file:%s\n", reclock);
2739                         }
2740                 }
2741         }
2742         return 0;
2743 }
2744
2745 /*
2746   set the reclock file of a node
2747  */
2748 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2749 {
2750         int ret;
2751         const char *reclock;
2752
2753         if (argc == 0) {
2754                 reclock = NULL;
2755         } else if (argc == 1) {
2756                 reclock = argv[0];
2757         } else {
2758                 usage();
2759         }
2760
2761         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2762         if (ret != 0) {
2763                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2764                 return ret;
2765         }
2766         return 0;
2767 }
2768
2769 /*
2770   set the natgw state on/off
2771  */
2772 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2773 {
2774         int ret;
2775         uint32_t natgwstate;
2776
2777         if (argc == 0) {
2778                 usage();
2779         }
2780
2781         if (!strcmp(argv[0], "on")) {
2782                 natgwstate = 1;
2783         } else if (!strcmp(argv[0], "off")) {
2784                 natgwstate = 0;
2785         } else {
2786                 usage();
2787         }
2788
2789         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2790         if (ret != 0) {
2791                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2792                 return ret;
2793         }
2794
2795         return 0;
2796 }
2797
2798 /*
2799   set the lmaster role on/off
2800  */
2801 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2802 {
2803         int ret;
2804         uint32_t lmasterrole;
2805
2806         if (argc == 0) {
2807                 usage();
2808         }
2809
2810         if (!strcmp(argv[0], "on")) {
2811                 lmasterrole = 1;
2812         } else if (!strcmp(argv[0], "off")) {
2813                 lmasterrole = 0;
2814         } else {
2815                 usage();
2816         }
2817
2818         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2819         if (ret != 0) {
2820                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2821                 return ret;
2822         }
2823
2824         return 0;
2825 }
2826
2827 /*
2828   set the recmaster role on/off
2829  */
2830 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2831 {
2832         int ret;
2833         uint32_t recmasterrole;
2834
2835         if (argc == 0) {
2836                 usage();
2837         }
2838
2839         if (!strcmp(argv[0], "on")) {
2840                 recmasterrole = 1;
2841         } else if (!strcmp(argv[0], "off")) {
2842                 recmasterrole = 0;
2843         } else {
2844                 usage();
2845         }
2846
2847         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2848         if (ret != 0) {
2849                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2850                 return ret;
2851         }
2852
2853         return 0;
2854 }
2855
2856 /*
2857   set debug level on a node or all nodes
2858  */
2859 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2860 {
2861         int i, ret;
2862         int32_t level;
2863
2864         if (argc == 0) {
2865                 printf("You must specify the debug level. Valid levels are:\n");
2866                 for (i=0; debug_levels[i].description != NULL; i++) {
2867                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2868                 }
2869
2870                 return 0;
2871         }
2872
2873         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2874                 level = get_debug_by_desc(argv[0]);
2875         } else {
2876                 level = strtol(argv[0], NULL, 0);
2877         }
2878
2879         for (i=0; debug_levels[i].description != NULL; i++) {
2880                 if (level == debug_levels[i].level) {
2881                         break;
2882                 }
2883         }
2884         if (debug_levels[i].description == NULL) {
2885                 printf("Invalid debug level, must be one of\n");
2886                 for (i=0; debug_levels[i].description != NULL; i++) {
2887                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2888                 }
2889                 return -1;
2890         }
2891
2892         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2893         if (ret != 0) {
2894                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2895         }
2896         return 0;
2897 }
2898
2899
2900 /*
2901   freeze a node
2902  */
2903 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2904 {
2905         int ret;
2906         uint32_t priority;
2907         
2908         if (argc == 1) {
2909                 priority = strtol(argv[0], NULL, 0);
2910         } else {
2911                 priority = 0;
2912         }
2913         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2914
2915         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2916         if (ret != 0) {
2917                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2918         }               
2919         return 0;
2920 }
2921
2922 /*
2923   thaw a node
2924  */
2925 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
2926 {
2927         int ret;
2928         uint32_t priority;
2929         
2930         if (argc == 1) {
2931                 priority = strtol(argv[0], NULL, 0);
2932         } else {
2933                 priority = 0;
2934         }
2935         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
2936
2937         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2938         if (ret != 0) {
2939                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
2940         }               
2941         return 0;
2942 }
2943
2944
2945 /*
2946   attach to a database
2947  */
2948 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
2949 {
2950         const char *db_name;
2951         struct ctdb_db_context *ctdb_db;
2952
2953         if (argc < 1) {
2954                 usage();
2955         }
2956         db_name = argv[0];
2957
2958         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2959         if (ctdb_db == NULL) {
2960                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2961                 return -1;
2962         }
2963
2964         return 0;
2965 }
2966
2967 /*
2968   set db priority
2969  */
2970 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2971 {
2972         struct ctdb_db_priority db_prio;
2973         int ret;
2974
2975         if (argc < 2) {
2976                 usage();
2977         }
2978
2979         db_prio.db_id    = strtoul(argv[0], NULL, 0);
2980         db_prio.priority = strtoul(argv[1], NULL, 0);
2981
2982         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
2983         if (ret != 0) {
2984                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
2985                 return -1;
2986         }
2987
2988         return 0;
2989 }
2990
2991 /*
2992   get db priority
2993  */
2994 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
2995 {
2996         uint32_t db_id, priority;
2997         int ret;
2998
2999         if (argc < 1) {
3000                 usage();
3001         }
3002
3003         db_id = strtoul(argv[0], NULL, 0);
3004
3005         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3006         if (ret != 0) {
3007                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3008                 return -1;
3009         }
3010
3011         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3012
3013         return 0;
3014 }
3015
3016 /*
3017   run an eventscript on a node
3018  */
3019 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3020 {
3021         TDB_DATA data;
3022         int ret;
3023         int32_t res;
3024         char *errmsg;
3025         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3026
3027         if (argc != 1) {
3028                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3029                 return -1;
3030         }
3031
3032         data.dptr = (unsigned char *)discard_const(argv[0]);
3033         data.dsize = strlen((char *)data.dptr) + 1;
3034
3035         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3036
3037         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3038                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3039         if (ret != 0 || res != 0) {
3040                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3041                 talloc_free(tmp_ctx);
3042                 return -1;
3043         }
3044         talloc_free(tmp_ctx);
3045         return 0;
3046 }
3047
3048 #define DB_VERSION 1
3049 #define MAX_DB_NAME 64
3050 struct db_file_header {
3051         unsigned long version;
3052         time_t timestamp;
3053         unsigned long persistent;
3054         unsigned long size;
3055         const char name[MAX_DB_NAME];
3056 };
3057
3058 struct backup_data {
3059         struct ctdb_marshall_buffer *records;
3060         uint32_t len;
3061         uint32_t total;
3062         bool traverse_error;
3063 };
3064
3065 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3066 {
3067         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3068         struct ctdb_rec_data *rec;
3069
3070         /* add the record */
3071         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3072         if (rec == NULL) {
3073                 bd->traverse_error = true;
3074                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3075                 return -1;
3076         }
3077         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3078         if (bd->records == NULL) {
3079                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3080                 bd->traverse_error = true;
3081                 return -1;
3082         }
3083         bd->records->count++;
3084         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3085         bd->len += rec->length;
3086         talloc_free(rec);
3087
3088         bd->total++;
3089         return 0;
3090 }
3091
3092 /*
3093  * backup a database to a file 
3094  */
3095 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3096 {
3097         int i, ret;
3098         struct ctdb_dbid_map *dbmap=NULL;
3099         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3100         struct db_file_header dbhdr;
3101         struct ctdb_db_context *ctdb_db;
3102         struct backup_data *bd;
3103         int fh = -1;
3104         int status = -1;
3105
3106         if (argc != 2) {
3107                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3108                 return -1;
3109         }
3110
3111         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3112         if (ret != 0) {
3113                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3114                 return ret;
3115         }
3116
3117         for(i=0;i<dbmap->num;i++){
3118                 const char *name;
3119
3120                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3121                 if(!strcmp(argv[0], name)){
3122                         talloc_free(discard_const(name));
3123                         break;
3124                 }
3125                 talloc_free(discard_const(name));
3126         }
3127         if (i == dbmap->num) {
3128                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3129                 talloc_free(tmp_ctx);
3130                 return -1;
3131         }
3132
3133
3134         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3135         if (ctdb_db == NULL) {
3136                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3137                 talloc_free(tmp_ctx);
3138                 return -1;
3139         }
3140
3141
3142         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3143         if (ret == -1) {
3144                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3145                 talloc_free(tmp_ctx);
3146                 return -1;
3147         }
3148
3149
3150         bd = talloc_zero(tmp_ctx, struct backup_data);
3151         if (bd == NULL) {
3152                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3153                 talloc_free(tmp_ctx);
3154                 return -1;
3155         }
3156
3157         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3158         if (bd->records == NULL) {
3159                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3160                 talloc_free(tmp_ctx);
3161                 return -1;
3162         }
3163
3164         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3165         bd->records->db_id = ctdb_db->db_id;
3166         /* traverse the database collecting all records */
3167         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3168             bd->traverse_error) {
3169                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3170                 talloc_free(tmp_ctx);
3171                 return -1;              
3172         }
3173
3174         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3175
3176
3177         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3178         if (fh == -1) {
3179                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3180                 talloc_free(tmp_ctx);
3181                 return -1;
3182         }
3183
3184         dbhdr.version = DB_VERSION;
3185         dbhdr.timestamp = time(NULL);
3186         dbhdr.persistent = dbmap->dbs[i].persistent;
3187         dbhdr.size = bd->len;
3188         if (strlen(argv[0]) >= MAX_DB_NAME) {
3189                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3190                 goto done;
3191         }
3192         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3193         ret = write(fh, &dbhdr, sizeof(dbhdr));
3194         if (ret == -1) {
3195                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3196                 goto done;
3197         }
3198         ret = write(fh, bd->records, bd->len);
3199         if (ret == -1) {
3200                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3201                 goto done;
3202         }
3203
3204         status = 0;
3205 done:
3206         if (fh != -1) {
3207                 ret = close(fh);
3208                 if (ret == -1) {
3209                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3210                 }
3211         }
3212         talloc_free(tmp_ctx);
3213         return status;
3214 }
3215
3216 /*
3217  * restore a database from a file 
3218  */
3219 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3220 {
3221         int ret;
3222         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3223         TDB_DATA outdata;
3224         TDB_DATA data;
3225         struct db_file_header dbhdr;
3226         struct ctdb_db_context *ctdb_db;
3227         struct ctdb_node_map *nodemap=NULL;
3228         struct ctdb_vnn_map *vnnmap=NULL;
3229         int i, fh;
3230         struct ctdb_control_wipe_database w;
3231         uint32_t *nodes;
3232         uint32_t generation;
3233         struct tm *tm;
3234         char tbuf[100];
3235
3236         if (argc != 1) {
3237                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3238                 return -1;
3239         }
3240
3241         fh = open(argv[0], O_RDONLY);
3242         if (fh == -1) {
3243                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3244                 talloc_free(tmp_ctx);
3245                 return -1;
3246         }
3247
3248         read(fh, &dbhdr, sizeof(dbhdr));
3249         if (dbhdr.version != DB_VERSION) {
3250                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3251                 talloc_free(tmp_ctx);
3252                 return -1;
3253         }
3254
3255         outdata.dsize = dbhdr.size;
3256         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3257         if (outdata.dptr == NULL) {
3258                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3259                 close(fh);
3260                 talloc_free(tmp_ctx);
3261                 return -1;
3262         }               
3263         read(fh, outdata.dptr, outdata.dsize);
3264         close(fh);
3265
3266         tm = localtime(&dbhdr.timestamp);
3267         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3268         printf("Restoring database '%s' from backup @ %s\n",
3269                 dbhdr.name, tbuf);
3270
3271
3272         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3273         if (ctdb_db == NULL) {
3274                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3275                 talloc_free(tmp_ctx);
3276                 return -1;
3277         }
3278
3279         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3280         if (ret != 0) {
3281                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3282                 talloc_free(tmp_ctx);
3283                 return ret;
3284         }
3285
3286
3287         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3288         if (ret != 0) {
3289                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3290                 talloc_free(tmp_ctx);
3291                 return ret;
3292         }
3293
3294         /* freeze all nodes */
3295         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3296         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3297                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3298                                         nodes, i,
3299                                         TIMELIMIT(),
3300                                         false, tdb_null,
3301                                         NULL, NULL,
3302                                         NULL) != 0) {
3303                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3304                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3305                         talloc_free(tmp_ctx);
3306                         return -1;
3307                 }
3308         }
3309
3310         generation = vnnmap->generation;
3311         data.dptr = (void *)&generation;
3312         data.dsize = sizeof(generation);
3313
3314         /* start a cluster wide transaction */
3315         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3316         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3317                                         nodes, 0,
3318                                         TIMELIMIT(), false, data,
3319                                         NULL, NULL,
3320                                         NULL) != 0) {
3321                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3322                 return -1;
3323         }
3324
3325
3326         w.db_id = ctdb_db->db_id;
3327         w.transaction_id = generation;
3328
3329         data.dptr = (void *)&w;
3330         data.dsize = sizeof(w);
3331
3332         /* wipe all the remote databases. */
3333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3335                                         nodes, 0,
3336                                         TIMELIMIT(), false, data,
3337                                         NULL, NULL,
3338                                         NULL) != 0) {
3339                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3340                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3341                 talloc_free(tmp_ctx);
3342                 return -1;
3343         }
3344         
3345         /* push the database */
3346         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3347         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3348                                         nodes, 0,
3349                                         TIMELIMIT(), false, outdata,
3350                                         NULL, NULL,
3351                                         NULL) != 0) {
3352                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3353                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3354                 talloc_free(tmp_ctx);
3355                 return -1;
3356         }
3357
3358         data.dptr = (void *)&generation;
3359         data.dsize = sizeof(generation);
3360
3361         /* commit all the changes */
3362         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3363                                         nodes, 0,
3364                                         TIMELIMIT(), false, data,
3365                                         NULL, NULL,
3366                                         NULL) != 0) {
3367                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3368                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3369                 talloc_free(tmp_ctx);
3370                 return -1;
3371         }
3372
3373
3374         /* thaw all nodes */
3375         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3376         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3377                                         nodes, 0,
3378                                         TIMELIMIT(),
3379                                         false, tdb_null,
3380                                         NULL, NULL,
3381                                         NULL) != 0) {
3382                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3383                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3384                 talloc_free(tmp_ctx);
3385                 return -1;
3386         }
3387
3388
3389         talloc_free(tmp_ctx);
3390         return 0;
3391 }
3392
3393 /*
3394  * wipe a database from a file
3395  */
3396 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3397                           const char **argv)
3398 {
3399         int ret;
3400         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3401         TDB_DATA data;
3402         struct ctdb_db_context *ctdb_db;
3403         struct ctdb_node_map *nodemap = NULL;
3404         struct ctdb_vnn_map *vnnmap = NULL;
3405         int i;
3406         struct ctdb_control_wipe_database w;
3407         uint32_t *nodes;
3408         uint32_t generation;
3409         struct ctdb_dbid_map *dbmap = NULL;
3410
3411         if (argc != 1) {
3412                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3413                 return -1;
3414         }
3415
3416         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3417                                  &dbmap);
3418         if (ret != 0) {
3419                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3420                                   options.pnn));
3421                 return ret;
3422         }
3423
3424         for(i=0;i<dbmap->num;i++){
3425                 const char *name;
3426
3427                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3428                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3429                 if(!strcmp(argv[0], name)){
3430                         talloc_free(discard_const(name));
3431                         break;
3432                 }
3433                 talloc_free(discard_const(name));
3434         }
3435         if (i == dbmap->num) {
3436                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3437                                   argv[0]));
3438                 talloc_free(tmp_ctx);
3439                 return -1;
3440         }
3441
3442         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3443         if (ctdb_db == NULL) {
3444                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3445                                   argv[0]));
3446                 talloc_free(tmp_ctx);
3447                 return -1;
3448         }
3449
3450         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3451                                    &nodemap);
3452         if (ret != 0) {
3453                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3454                                   options.pnn));
3455                 talloc_free(tmp_ctx);
3456                 return ret;
3457         }
3458
3459         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3460                                   &vnnmap);
3461         if (ret != 0) {
3462                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3463                                   options.pnn));
3464                 talloc_free(tmp_ctx);
3465                 return ret;
3466         }
3467
3468         /* freeze all nodes */
3469         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3470         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3471                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3472                                                 nodes, i,
3473                                                 TIMELIMIT(),
3474                                                 false, tdb_null,
3475                                                 NULL, NULL,
3476                                                 NULL);
3477                 if (ret != 0) {
3478                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3479                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3480                                              CTDB_RECOVERY_ACTIVE);
3481                         talloc_free(tmp_ctx);
3482                         return -1;
3483                 }
3484         }
3485
3486         generation = vnnmap->generation;
3487         data.dptr = (void *)&generation;
3488         data.dsize = sizeof(generation);
3489
3490         /* start a cluster wide transaction */
3491         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3492         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3493                                         nodes, 0,
3494                                         TIMELIMIT(), false, data,
3495                                         NULL, NULL,
3496                                         NULL);
3497         if (ret!= 0) {
3498                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3499                                   "transactions.\n"));
3500                 return -1;
3501         }
3502
3503         w.db_id = ctdb_db->db_id;
3504         w.transaction_id = generation;
3505
3506         data.dptr = (void *)&w;
3507         data.dsize = sizeof(w);
3508
3509         /* wipe all the remote databases. */
3510         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3511         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3512                                         nodes, 0,
3513                                         TIMELIMIT(), false, data,
3514                                         NULL, NULL,
3515                                         NULL) != 0) {
3516                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3517                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3518                 talloc_free(tmp_ctx);
3519                 return -1;
3520         }
3521
3522         data.dptr = (void *)&generation;
3523         data.dsize = sizeof(generation);
3524
3525         /* commit all the changes */
3526         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3527                                         nodes, 0,
3528                                         TIMELIMIT(), false, data,
3529                                         NULL, NULL,
3530                                         NULL) != 0) {
3531                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3532                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3533                 talloc_free(tmp_ctx);
3534                 return -1;
3535         }
3536
3537         /* thaw all nodes */
3538         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3539         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3540                                         nodes, 0,
3541                                         TIMELIMIT(),
3542                                         false, tdb_null,
3543                                         NULL, NULL,
3544                                         NULL) != 0) {
3545                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3546                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3547                 talloc_free(tmp_ctx);
3548                 return -1;
3549         }
3550
3551         talloc_free(tmp_ctx);
3552         return 0;
3553 }
3554
3555 /*
3556  * set flags of a node in the nodemap
3557  */
3558 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3559 {
3560         int ret;
3561         int32_t status;
3562         int node;
3563         int flags;
3564         TDB_DATA data;
3565         struct ctdb_node_flag_change c;
3566
3567         if (argc != 2) {
3568                 usage();
3569                 return -1;
3570         }
3571
3572         if (sscanf(argv[0], "%d", &node) != 1) {
3573                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3574                 usage();
3575                 return -1;
3576         }
3577         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3578                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3579                 usage();
3580                 return -1;
3581         }
3582
3583         c.pnn       = node;
3584         c.old_flags = 0;
3585         c.new_flags = flags;
3586
3587         data.dsize = sizeof(c);
3588         data.dptr = (unsigned char *)&c;
3589
3590         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3591                            data, NULL, NULL, &status, NULL, NULL);
3592         if (ret != 0 || status != 0) {
3593                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3594                 return -1;
3595         }
3596         return 0;
3597 }
3598
3599 /*
3600   dump memory usage
3601  */
3602 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3603 {
3604         TDB_DATA data;
3605         int ret;
3606         int32_t res;
3607         char *errmsg;
3608         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3609         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3610                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3611         if (ret != 0 || res != 0) {
3612                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3613                 talloc_free(tmp_ctx);
3614                 return -1;
3615         }
3616         write(1, data.dptr, data.dsize);
3617         talloc_free(tmp_ctx);
3618         return 0;
3619 }
3620
3621 /*
3622   handler for memory dumps
3623 */
3624 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3625                              TDB_DATA data, void *private_data)
3626 {
3627         write(1, data.dptr, data.dsize);
3628         exit(0);
3629 }
3630
3631 /*
3632   dump memory usage on the recovery daemon
3633  */
3634 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3635 {
3636         int ret;
3637         TDB_DATA data;
3638         struct rd_memdump_reply rd;
3639
3640         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3641         if (rd.pnn == -1) {
3642                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3643                 return -1;
3644         }
3645         rd.srvid = getpid();
3646
3647         /* register a message port for receiveing the reply so that we
3648            can receive the reply
3649         */
3650         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3651
3652
3653         data.dptr = (uint8_t *)&rd;
3654         data.dsize = sizeof(rd);
3655
3656         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3657         if (ret != 0) {
3658                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3659                 return -1;
3660         }
3661
3662         /* this loop will terminate when we have received the reply */
3663         while (1) {     
3664                 event_loop_once(ctdb->ev);
3665         }
3666
3667         return 0;
3668 }
3669
3670 /*
3671   list all nodes in the cluster
3672   if the daemon is running, we read the data from the daemon.
3673   if the daemon is not running we parse the nodes file directly
3674  */
3675 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3676 {
3677         int i, ret;
3678         struct ctdb_node_map *nodemap=NULL;
3679
3680         if (ctdb != NULL) {
3681                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3682                 if (ret != 0) {
3683                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3684                         return ret;
3685                 }
3686
3687                 for(i=0;i<nodemap->num;i++){
3688                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3689                                 continue;
3690                         }
3691                         if (options.machinereadable){
3692                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3693                         } else {
3694                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3695                         }
3696                 }
3697         } else {
3698                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3699                 struct pnn_node *pnn_nodes;
3700                 struct pnn_node *pnn_node;
3701         
3702                 pnn_nodes = read_nodes_file(mem_ctx);
3703                 if (pnn_nodes == NULL) {
3704                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3705                         talloc_free(mem_ctx);
3706                         return -1;
3707                 }
3708
3709                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3710                         ctdb_sock_addr addr;
3711
3712                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3713                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3714                                 talloc_free(mem_ctx);
3715                                 return -1;
3716                         }
3717
3718                         if (options.machinereadable){
3719                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3720                         } else {
3721                                 printf("%s\n", pnn_node->addr);
3722                         }
3723                 }
3724                 talloc_free(mem_ctx);
3725         }
3726
3727         return 0;
3728 }
3729
3730 /*
3731   reload the nodes file on the local node
3732  */
3733 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3734 {
3735         int i, ret;
3736         int mypnn;
3737         struct ctdb_node_map *nodemap=NULL;
3738
3739         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3740         if (mypnn == -1) {
3741                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3742                 return -1;
3743         }
3744
3745         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3746         if (ret != 0) {
3747                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3748                 return ret;
3749         }
3750
3751         /* reload the nodes file on all remote nodes */
3752         for (i=0;i<nodemap->num;i++) {
3753                 if (nodemap->nodes[i].pnn == mypnn) {
3754                         continue;
3755                 }
3756                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3757                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3758                         nodemap->nodes[i].pnn);
3759                 if (ret != 0) {
3760                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3761                 }
3762         }
3763
3764         /* reload the nodes file on the local node */
3765         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3766         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3767         if (ret != 0) {
3768                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3769         }
3770
3771         /* initiate a recovery */
3772         control_recover(ctdb, argc, argv);
3773
3774         return 0;
3775 }
3776
3777
3778 static const struct {
3779         const char *name;
3780         int (*fn)(struct ctdb_context *, int, const char **);
3781         bool auto_all;
3782         bool without_daemon; /* can be run without daemon running ? */
3783         const char *msg;
3784         const char *args;
3785 } ctdb_commands[] = {
3786 #ifdef CTDB_VERS
3787         { "version",         control_version,           true,   false,  "show version of ctdb" },
3788 #endif
3789         { "status",          control_status,            true,   false,  "show node status" },
3790         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
3791         { "ping",            control_ping,              true,   false,  "ping all nodes" },
3792         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
3793         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
3794         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
3795         { "statistics",      control_statistics,        false,  false, "show statistics" },
3796         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
3797         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
3798         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
3799         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
3800         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
3801         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
3802         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
3803         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
3804         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
3805         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
3806         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
3807         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
3808         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
3809         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
3810         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
3811         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
3812         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
3813         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
3814         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
3815         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
3816         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
3817         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
3818         { "stop",            control_stop,              true,   false,  "stop a node" },
3819         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
3820         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
3821         { "unban",           control_unban,             true,   false,  "unban a node" },
3822         { "showban",         control_showban,           true,   false,  "show ban information"},
3823         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
3824         { "recover",         control_recover,           true,   false,  "force recovery" },
3825         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
3826         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
3827         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
3828         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
3829         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
3830         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
3831         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
3832         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
3833
3834         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
3835         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
3836         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
3837         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
3838         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
3839         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
3840         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
3841         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
3842         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
3843         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
3844         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
3845         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
3846         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
3847         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
3848         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
3849         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
3850         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
3851         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
3852         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
3853         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
3854         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
3855         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
3856         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
3857         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
3858         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
3859         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
3860         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
3861         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
3862         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
3863 };
3864
3865 /*
3866   show usage message
3867  */
3868 static void usage(void)
3869 {
3870         int i;
3871         printf(
3872 "Usage: ctdb [options] <control>\n" \
3873 "Options:\n" \
3874 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
3875 "   -Y                 generate machinereadable output\n"
3876 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
3877         printf("Controls:\n");
3878         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3879                 printf("  %-15s %-27s  %s\n", 
3880                        ctdb_commands[i].name, 
3881                        ctdb_commands[i].args?ctdb_commands[i].args:"",
3882                        ctdb_commands[i].msg);
3883         }
3884         exit(1);
3885 }
3886
3887
3888 static void ctdb_alarm(int sig)
3889 {
3890         printf("Maximum runtime exceeded - exiting\n");
3891         _exit(ERR_TIMEOUT);
3892 }
3893
3894 /*
3895   main program
3896 */
3897 int main(int argc, const char *argv[])
3898 {
3899         struct ctdb_context *ctdb;
3900         char *nodestring = NULL;
3901         struct poptOption popt_options[] = {
3902                 POPT_AUTOHELP
3903                 POPT_CTDB_CMDLINE
3904                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
3905                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
3906                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
3907                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
3908                 POPT_TABLEEND
3909         };
3910         int opt;
3911         const char **extra_argv;
3912         int extra_argc = 0;
3913         int ret=-1, i;
3914         poptContext pc;
3915         struct event_context *ev;
3916         const char *control;
3917
3918         setlinebuf(stdout);
3919         
3920         /* set some defaults */
3921         options.maxruntime = 0;
3922         options.timelimit = 3;
3923         options.pnn = CTDB_CURRENT_NODE;
3924
3925         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
3926
3927         while ((opt = poptGetNextOpt(pc)) != -1) {
3928                 switch (opt) {
3929                 default:
3930                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
3931                                 poptBadOption(pc, 0), poptStrerror(opt)));
3932                         exit(1);
3933                 }
3934         }
3935
3936         /* setup the remaining options for the main program to use */
3937         extra_argv = poptGetArgs(pc);
3938         if (extra_argv) {
3939                 extra_argv++;
3940                 while (extra_argv[extra_argc]) extra_argc++;
3941         }
3942
3943         if (extra_argc < 1) {
3944                 usage();
3945         }
3946
3947         if (options.maxruntime == 0) {
3948                 const char *ctdb_timeout;
3949                 ctdb_timeout = getenv("CTDB_TIMEOUT");
3950                 if (ctdb_timeout != NULL) {
3951                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
3952                 } else {
3953                         /* default timeout is 120 seconds */
3954                         options.maxruntime = 120;
3955                 }
3956         }
3957
3958         signal(SIGALRM, ctdb_alarm);
3959         alarm(options.maxruntime);
3960
3961         /* setup the node number to contact */
3962         if (nodestring != NULL) {
3963                 if (strcmp(nodestring, "all") == 0) {
3964                         options.pnn = CTDB_BROADCAST_ALL;
3965                 } else {
3966                         options.pnn = strtoul(nodestring, NULL, 0);
3967                 }
3968         }
3969
3970         control = extra_argv[0];
3971
3972         ev = event_context_init(NULL);
3973
3974         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
3975                 if (strcmp(control, ctdb_commands[i].name) == 0) {
3976                         int j;
3977
3978                         if (ctdb_commands[i].without_daemon == true) {
3979                                 close(2);
3980                         }
3981
3982                         /* initialise ctdb */
3983                         ctdb = ctdb_cmdline_client(ev);
3984
3985                         if (ctdb_commands[i].without_daemon == false) {
3986                                 if (ctdb == NULL) {
3987                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
3988                                         exit(1);
3989                                 }
3990
3991                                 /* verify the node exists */
3992                                 verify_node(ctdb);
3993
3994                                 if (options.pnn == CTDB_CURRENT_NODE) {
3995                                         int pnn;
3996                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
3997                                         if (pnn == -1) {
3998                                                 return -1;
3999                                         }
4000                                         options.pnn = pnn;
4001                                 }
4002                         }
4003
4004                         if (ctdb_commands[i].auto_all && 
4005                             options.pnn == CTDB_BROADCAST_ALL) {
4006                                 uint32_t *nodes;
4007                                 uint32_t num_nodes;
4008                                 ret = 0;
4009
4010                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4011                                 CTDB_NO_MEMORY(ctdb, nodes);
4012         
4013                                 for (j=0;j<num_nodes;j++) {
4014                                         options.pnn = nodes[j];
4015                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4016                                 }
4017                                 talloc_free(nodes);
4018                         } else {
4019                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4020                         }
4021                         break;
4022                 }
4023         }
4024
4025         if (i == ARRAY_SIZE(ctdb_commands)) {
4026                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4027                 exit(1);
4028         }
4029
4030         return ret;
4031 }