tools/ctdb: display interfaces in "ctdb ip" and "ctdb ip -Y" outputs
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
517                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE));
518                 }
519                 return 0;
520         }
521
522         printf("Number of nodes:%d\n", nodemap->num);
523         for(i=0;i<nodemap->num;i++){
524                 static const struct {
525                         uint32_t flag;
526                         const char *name;
527                 } flag_names[] = {
528                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
529                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
530                         { NODE_FLAGS_BANNED,                "BANNED" },
531                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
532                         { NODE_FLAGS_DELETED,               "DELETED" },
533                         { NODE_FLAGS_STOPPED,               "STOPPED" },
534                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
535                 };
536                 char *flags_str = NULL;
537                 int j;
538
539                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
540                         continue;
541                 }
542                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
543                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
544                                 if (flags_str == NULL) {
545                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
546                                 } else {
547                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
548                                                                            flag_names[j].name);
549                                 }
550                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
551                         }
552                 }
553                 if (flags_str == NULL) {
554                         flags_str = talloc_strdup(ctdb, "OK");
555                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
556                 }
557                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
558                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
559                        flags_str,
560                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
561                 talloc_free(flags_str);
562         }
563
564         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
565         if (ret != 0) {
566                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
567                 return ret;
568         }
569         if (vnnmap->generation == INVALID_GENERATION) {
570                 printf("Generation:INVALID\n");
571         } else {
572                 printf("Generation:%d\n",vnnmap->generation);
573         }
574         printf("Size:%d\n",vnnmap->size);
575         for(i=0;i<vnnmap->size;i++){
576                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
577         }
578
579         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
580         if (ret != 0) {
581                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
582                 return ret;
583         }
584         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
585
586         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
587         if (ret != 0) {
588                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
589                 return ret;
590         }
591         printf("Recovery master:%d\n",recmaster);
592
593         return 0;
594 }
595
596
597 struct natgw_node {
598         struct natgw_node *next;
599         const char *addr;
600 };
601
602 /*
603   display the list of nodes belonging to this natgw configuration
604  */
605 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
606 {
607         int i, ret;
608         const char *natgw_list;
609         int nlines;
610         char **lines;
611         struct natgw_node *natgw_nodes = NULL;
612         struct natgw_node *natgw_node;
613         struct ctdb_node_map *nodemap=NULL;
614
615
616         /* read the natgw nodes file into a linked list */
617         natgw_list = getenv("NATGW_NODES");
618         if (natgw_list == NULL) {
619                 natgw_list = "/etc/ctdb/natgw_nodes";
620         }
621         lines = file_lines_load(natgw_list, &nlines, ctdb);
622         if (lines == NULL) {
623                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
624                 return -1;
625         }
626         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
627                 nlines--;
628         }
629         for (i=0;i<nlines;i++) {
630                 char *node;
631
632                 node = lines[i];
633                 /* strip leading spaces */
634                 while((*node == ' ') || (*node == '\t')) {
635                         node++;
636                 }
637                 if (*node == '#') {
638                         continue;
639                 }
640                 if (strcmp(node, "") == 0) {
641                         continue;
642                 }
643                 natgw_node = talloc(ctdb, struct natgw_node);
644                 natgw_node->addr = talloc_strdup(natgw_node, node);
645                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
646                 natgw_node->next = natgw_nodes;
647                 natgw_nodes = natgw_node;
648         }
649
650         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
651         if (ret != 0) {
652                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
653                 return ret;
654         }
655
656         i=0;
657         while(i<nodemap->num) {
658                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
659                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
660                                 break;
661                         }
662                 }
663
664                 /* this node was not in the natgw so we just remove it from
665                  * the list
666                  */
667                 if ((natgw_node == NULL) 
668                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
669                         int j;
670
671                         for (j=i+1; j<nodemap->num; j++) {
672                                 nodemap->nodes[j-1] = nodemap->nodes[j];
673                         }
674                         nodemap->num--;
675                         continue;
676                 }
677
678                 i++;
679         }               
680
681         /* pick a node to be natgwmaster
682          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
683          */
684         for(i=0;i<nodemap->num;i++){
685                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
686                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
687                         break;
688                 }
689         }
690         /* we couldnt find any healthy node, try unhealthy ones */
691         if (i == nodemap->num) {
692                 for(i=0;i<nodemap->num;i++){
693                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
694                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
695                                 break;
696                         }
697                 }
698         }
699         /* unless all nodes are STOPPED, when we pick one anyway */
700         if (i == nodemap->num) {
701                 for(i=0;i<nodemap->num;i++){
702                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
703                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
704                                 break;
705                         }
706                 }
707                 /* or if we still can not find any */
708                 if (i == nodemap->num) {
709                         printf("-1 0.0.0.0\n");
710                 }
711         }
712
713         /* print the pruned list of nodes belonging to this natgw list */
714         for(i=0;i<nodemap->num;i++){
715                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
716                         continue;
717                 }
718                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
719                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
723                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
724                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
725         }
726
727         return 0;
728 }
729
730 /*
731   display the status of the scripts for monitoring (or other events)
732  */
733 static int control_one_scriptstatus(struct ctdb_context *ctdb,
734                                     enum ctdb_eventscript_call type)
735 {
736         struct ctdb_scripts_wire *script_status;
737         int ret, i;
738
739         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
740         if (ret != 0) {
741                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
742                 return ret;
743         }
744
745         if (script_status == NULL) {
746                 if (!options.machinereadable) {
747                         printf("%s cycle never run\n",
748                                ctdb_eventscript_call_names[type]);
749                 }
750                 return 0;
751         }
752
753         if (!options.machinereadable) {
754                 printf("%d scripts were executed last %s cycle\n",
755                        script_status->num_scripts,
756                        ctdb_eventscript_call_names[type]);
757         }
758         for (i=0; i<script_status->num_scripts; i++) {
759                 const char *status = NULL;
760
761                 switch (script_status->scripts[i].status) {
762                 case -ETIME:
763                         status = "TIMEDOUT";
764                         break;
765                 case -ENOEXEC:
766                         status = "DISABLED";
767                         break;
768                 case 0:
769                         status = "OK";
770                         break;
771                 default:
772                         if (script_status->scripts[i].status > 0)
773                                 status = "ERROR";
774                         break;
775                 }
776                 if (options.machinereadable) {
777                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
778                                ctdb_eventscript_call_names[type],
779                                script_status->scripts[i].name,
780                                script_status->scripts[i].status,
781                                status,
782                                (long)script_status->scripts[i].start.tv_sec,
783                                (long)script_status->scripts[i].start.tv_usec,
784                                (long)script_status->scripts[i].finished.tv_sec,
785                                (long)script_status->scripts[i].finished.tv_usec,
786                                script_status->scripts[i].output);
787                         continue;
788                 }
789                 if (status)
790                         printf("%-20s Status:%s    ",
791                                script_status->scripts[i].name, status);
792                 else
793                         /* Some other error, eg from stat. */
794                         printf("%-20s Status:CANNOT RUN (%s)",
795                                script_status->scripts[i].name,
796                                strerror(-script_status->scripts[i].status));
797
798                 if (script_status->scripts[i].status >= 0) {
799                         printf("Duration:%.3lf ",
800                         timeval_delta(&script_status->scripts[i].finished,
801                               &script_status->scripts[i].start));
802                 }
803                 if (script_status->scripts[i].status != -ENOEXEC) {
804                         printf("%s",
805                                ctime(&script_status->scripts[i].start.tv_sec));
806                         if (script_status->scripts[i].status != 0) {
807                                 printf("   OUTPUT:%s\n",
808                                        script_status->scripts[i].output);
809                         }
810                 } else {
811                         printf("\n");
812                 }
813         }
814         return 0;
815 }
816
817
818 static int control_scriptstatus(struct ctdb_context *ctdb,
819                                 int argc, const char **argv)
820 {
821         int ret;
822         enum ctdb_eventscript_call type, min, max;
823         const char *arg;
824
825         if (argc > 1) {
826                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
827                 return -1;
828         }
829
830         if (argc == 0)
831                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
832         else
833                 arg = argv[0];
834
835         for (type = 0; type < CTDB_EVENT_MAX; type++) {
836                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
837                         min = type;
838                         max = type+1;
839                         break;
840                 }
841         }
842         if (type == CTDB_EVENT_MAX) {
843                 if (strcmp(arg, "all") == 0) {
844                         min = 0;
845                         max = CTDB_EVENT_MAX;
846                 } else {
847                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
848                         return -1;
849                 }
850         }
851
852         if (options.machinereadable) {
853                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
854         }
855
856         for (type = min; type < max; type++) {
857                 ret = control_one_scriptstatus(ctdb, type);
858                 if (ret != 0) {
859                         return ret;
860                 }
861         }
862
863         return 0;
864 }
865
866 /*
867   enable an eventscript
868  */
869 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
870 {
871         int ret;
872
873         if (argc < 1) {
874                 usage();
875         }
876
877         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
878         if (ret != 0) {
879           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
880                 return ret;
881         }
882
883         return 0;
884 }
885
886 /*
887   disable an eventscript
888  */
889 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
890 {
891         int ret;
892
893         if (argc < 1) {
894                 usage();
895         }
896
897         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
898         if (ret != 0) {
899           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
900                 return ret;
901         }
902
903         return 0;
904 }
905
906 /*
907   display the pnn of the recovery master
908  */
909 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
910 {
911         int ret;
912         uint32_t recmaster;
913
914         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
915         if (ret != 0) {
916                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
917                 return ret;
918         }
919         printf("%d\n",recmaster);
920
921         return 0;
922 }
923
924 /*
925   get a list of all tickles for this pnn
926  */
927 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
928 {
929         struct ctdb_control_tcp_tickle_list *list;
930         ctdb_sock_addr addr;
931         int i, ret;
932
933         if (argc < 1) {
934                 usage();
935         }
936
937         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
938                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
939                 return -1;
940         }
941
942         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
943         if (ret == -1) {
944                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
945                 return -1;
946         }
947
948         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
949         printf("Num tickles:%u\n", list->tickles.num);
950         for (i=0;i<list->tickles.num;i++) {
951                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
952                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
953         }
954
955         talloc_free(list);
956         
957         return 0;
958 }
959
960
961
962 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
963 {
964         struct ctdb_all_public_ips *ips;
965         struct ctdb_public_ip ip;
966         int i, ret;
967         uint32_t *nodes;
968         uint32_t disable_time;
969         TDB_DATA data;
970         struct ctdb_node_map *nodemap=NULL;
971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
972
973         disable_time = 30;
974         data.dptr  = (uint8_t*)&disable_time;
975         data.dsize = sizeof(disable_time);
976         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
977         if (ret != 0) {
978                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
979                 return -1;
980         }
981
982
983
984         /* read the public ip list from the node */
985         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
986         if (ret != 0) {
987                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
988                 talloc_free(tmp_ctx);
989                 return -1;
990         }
991
992         for (i=0;i<ips->num;i++) {
993                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
994                         break;
995                 }
996         }
997         if (i==ips->num) {
998                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
999                         pnn, ctdb_addr_to_str(addr)));
1000                 talloc_free(tmp_ctx);
1001                 return -1;
1002         }
1003
1004         ip.pnn  = pnn;
1005         ip.addr = *addr;
1006
1007         data.dptr  = (uint8_t *)&ip;
1008         data.dsize = sizeof(ip);
1009
1010         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1013                 talloc_free(tmp_ctx);
1014                 return ret;
1015         }
1016
1017         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1018         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1019                                         nodes, 0,
1020                                         TIMELIMIT(),
1021                                         false, data,
1022                                         NULL, NULL,
1023                                         NULL);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036
1037         talloc_free(tmp_ctx);
1038         return 0;
1039 }
1040
1041 /*
1042   move/failover an ip address to a specific node
1043  */
1044 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1045 {
1046         uint32_t pnn;
1047         ctdb_sock_addr addr;
1048
1049         if (argc < 2) {
1050                 usage();
1051                 return -1;
1052         }
1053
1054         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1055                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1056                 return -1;
1057         }
1058
1059
1060         if (sscanf(argv[1], "%u", &pnn) != 1) {
1061                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1062                 return -1;
1063         }
1064
1065         if (move_ip(ctdb, &addr, pnn) != 0) {
1066                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1067                 return -1;
1068         }
1069
1070         return 0;
1071 }
1072
1073 void getips_store_callback(void *param, void *data)
1074 {
1075         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1076         struct ctdb_all_public_ips *ips = param;
1077         int i;
1078
1079         i = ips->num++;
1080         ips->ips[i].pnn  = node_ip->pnn;
1081         ips->ips[i].addr = node_ip->addr;
1082 }
1083
1084 void getips_count_callback(void *param, void *data)
1085 {
1086         uint32_t *count = param;
1087
1088         (*count)++;
1089 }
1090
1091 #define IP_KEYLEN       4
1092 static uint32_t *ip_key(ctdb_sock_addr *ip)
1093 {
1094         static uint32_t key[IP_KEYLEN];
1095
1096         bzero(key, sizeof(key));
1097
1098         switch (ip->sa.sa_family) {
1099         case AF_INET:
1100                 key[0]  = ip->ip.sin_addr.s_addr;
1101                 break;
1102         case AF_INET6:
1103                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1104                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1105                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1106                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1107                 break;
1108         default:
1109                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1110                 return key;
1111         }
1112
1113         return key;
1114 }
1115
1116 static void *add_ip_callback(void *parm, void *data)
1117 {
1118         return parm;
1119 }
1120
1121 static int
1122 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1123 {
1124         struct ctdb_all_public_ips *tmp_ips;
1125         struct ctdb_node_map *nodemap=NULL;
1126         trbt_tree_t *ip_tree;
1127         int i, j, len, ret;
1128         uint32_t count;
1129
1130         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1131         if (ret != 0) {
1132                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1133                 return ret;
1134         }
1135
1136         ip_tree = trbt_create(tmp_ctx, 0);
1137
1138         for(i=0;i<nodemap->num;i++){
1139                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1140                         continue;
1141                 }
1142                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1143                         continue;
1144                 }
1145
1146                 /* read the public ip list from this node */
1147                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1148                 if (ret != 0) {
1149                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1150                         return -1;
1151                 }
1152         
1153                 for (j=0; j<tmp_ips->num;j++) {
1154                         struct ctdb_public_ip *node_ip;
1155
1156                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1157                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1158                         node_ip->addr = tmp_ips->ips[j].addr;
1159
1160                         trbt_insertarray32_callback(ip_tree,
1161                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1162                                 add_ip_callback,
1163                                 node_ip);
1164                 }
1165                 talloc_free(tmp_ips);
1166         }
1167
1168         /* traverse */
1169         count = 0;
1170         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1171
1172         len = offsetof(struct ctdb_all_public_ips, ips) + 
1173                 count*sizeof(struct ctdb_public_ip);
1174         tmp_ips = talloc_zero_size(tmp_ctx, len);
1175         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1176
1177         *ips = tmp_ips;
1178
1179         return 0;
1180 }
1181
1182
1183 /* 
1184  * scans all other nodes and returns a pnn for another node that can host this 
1185  * ip address or -1
1186  */
1187 static int
1188 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1189 {
1190         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1191         struct ctdb_all_public_ips *ips;
1192         struct ctdb_node_map *nodemap=NULL;
1193         int i, j, ret;
1194
1195         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1196         if (ret != 0) {
1197                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1198                 talloc_free(tmp_ctx);
1199                 return ret;
1200         }
1201
1202         for(i=0;i<nodemap->num;i++){
1203                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1204                         continue;
1205                 }
1206                 if (nodemap->nodes[i].pnn == options.pnn) {
1207                         continue;
1208                 }
1209
1210                 /* read the public ip list from this node */
1211                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1212                 if (ret != 0) {
1213                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1214                         return -1;
1215                 }
1216
1217                 for (j=0;j<ips->num;j++) {
1218                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1219                                 talloc_free(tmp_ctx);
1220                                 return nodemap->nodes[i].pnn;
1221                         }
1222                 }
1223                 talloc_free(ips);
1224         }
1225
1226         talloc_free(tmp_ctx);
1227         return -1;
1228 }
1229
1230 /*
1231   add a public ip address to a node
1232  */
1233 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1234 {
1235         int i, ret;
1236         int len;
1237         uint32_t pnn;
1238         unsigned mask;
1239         ctdb_sock_addr addr;
1240         struct ctdb_control_ip_iface *pub;
1241         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1242         struct ctdb_all_public_ips *ips;
1243
1244
1245         if (argc != 2) {
1246                 talloc_free(tmp_ctx);
1247                 usage();
1248         }
1249
1250         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1251                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1252                 talloc_free(tmp_ctx);
1253                 return -1;
1254         }
1255
1256         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1257         if (ret != 0) {
1258                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1259                 talloc_free(tmp_ctx);
1260                 return ret;
1261         }
1262
1263
1264         /* check if some other node is already serving this ip, if not,
1265          * we will claim it
1266          */
1267         for (i=0;i<ips->num;i++) {
1268                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1269                         break;
1270                 }
1271         }
1272
1273         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1274         pub = talloc_size(tmp_ctx, len); 
1275         CTDB_NO_MEMORY(ctdb, pub);
1276
1277         pub->addr  = addr;
1278         pub->mask  = mask;
1279         pub->len   = strlen(argv[1])+1;
1280         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1281
1282         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1285                 talloc_free(tmp_ctx);
1286                 return ret;
1287         }
1288
1289         if (i == ips->num) {
1290                 /* no one has this ip so we claim it */
1291                 pnn  = options.pnn;
1292         } else {
1293                 pnn  = ips->ips[i].pnn;
1294         }
1295
1296         if (move_ip(ctdb, &addr, pnn) != 0) {
1297                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1298                 return -1;
1299         }
1300
1301         talloc_free(tmp_ctx);
1302         return 0;
1303 }
1304
1305 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1306
1307 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1308 {
1309         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1310         struct ctdb_node_map *nodemap=NULL;
1311         struct ctdb_all_public_ips *ips;
1312         int ret, i, j;
1313
1314         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1317                 return ret;
1318         }
1319
1320         /* remove it from the nodes that are not hosting the ip currently */
1321         for(i=0;i<nodemap->num;i++){
1322                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1323                         continue;
1324                 }
1325                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1326                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1327                         continue;
1328                 }
1329
1330                 for (j=0;j<ips->num;j++) {
1331                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1332                                 break;
1333                         }
1334                 }
1335                 if (j==ips->num) {
1336                         continue;
1337                 }
1338
1339                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1340                         continue;
1341                 }
1342
1343                 options.pnn = nodemap->nodes[i].pnn;
1344                 control_delip(ctdb, argc, argv);
1345         }
1346
1347
1348         /* remove it from every node (also the one hosting it) */
1349         for(i=0;i<nodemap->num;i++){
1350                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1351                         continue;
1352                 }
1353                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1354                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1355                         continue;
1356                 }
1357
1358                 for (j=0;j<ips->num;j++) {
1359                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1360                                 break;
1361                         }
1362                 }
1363                 if (j==ips->num) {
1364                         continue;
1365                 }
1366
1367                 options.pnn = nodemap->nodes[i].pnn;
1368                 control_delip(ctdb, argc, argv);
1369         }
1370
1371         talloc_free(tmp_ctx);
1372         return 0;
1373 }
1374         
1375 /*
1376   delete a public ip address from a node
1377  */
1378 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1379 {
1380         int i, ret;
1381         ctdb_sock_addr addr;
1382         struct ctdb_control_ip_iface pub;
1383         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1384         struct ctdb_all_public_ips *ips;
1385
1386         if (argc != 1) {
1387                 talloc_free(tmp_ctx);
1388                 usage();
1389         }
1390
1391         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1392                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1393                 return -1;
1394         }
1395
1396         if (options.pnn == CTDB_BROADCAST_ALL) {
1397                 return control_delip_all(ctdb, argc, argv, &addr);
1398         }
1399
1400         pub.addr  = addr;
1401         pub.mask  = 0;
1402         pub.len   = 0;
1403
1404         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1407                 talloc_free(tmp_ctx);
1408                 return ret;
1409         }
1410         
1411         for (i=0;i<ips->num;i++) {
1412                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1413                         break;
1414                 }
1415         }
1416
1417         if (i==ips->num) {
1418                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1419                         ctdb_addr_to_str(&addr)));
1420                 talloc_free(tmp_ctx);
1421                 return -1;
1422         }
1423
1424         if (ips->ips[i].pnn == options.pnn) {
1425                 ret = find_other_host_for_public_ip(ctdb, &addr);
1426                 if (ret != -1) {
1427                         if (move_ip(ctdb, &addr, ret) != 0) {
1428                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1429                                 return -1;
1430                         }
1431                 }
1432         }
1433
1434         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1437                 talloc_free(tmp_ctx);
1438                 return ret;
1439         }
1440
1441         talloc_free(tmp_ctx);
1442         return 0;
1443 }
1444
1445 /*
1446   kill a tcp connection
1447  */
1448 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1449 {
1450         int ret;
1451         struct ctdb_control_killtcp killtcp;
1452
1453         if (argc < 2) {
1454                 usage();
1455         }
1456
1457         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1458                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1459                 return -1;
1460         }
1461
1462         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1463                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1464                 return -1;
1465         }
1466
1467         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1468         if (ret != 0) {
1469                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1470                 return ret;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   send a gratious arp
1479  */
1480 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1481 {
1482         int ret;
1483         ctdb_sock_addr addr;
1484
1485         if (argc < 2) {
1486                 usage();
1487         }
1488
1489         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1490                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1491                 return -1;
1492         }
1493
1494         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1495         if (ret != 0) {
1496                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1497                 return ret;
1498         }
1499
1500         return 0;
1501 }
1502
1503 /*
1504   register a server id
1505  */
1506 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1507 {
1508         int ret;
1509         struct ctdb_server_id server_id;
1510
1511         if (argc < 3) {
1512                 usage();
1513         }
1514
1515         server_id.pnn       = strtoul(argv[0], NULL, 0);
1516         server_id.type      = strtoul(argv[1], NULL, 0);
1517         server_id.server_id = strtoul(argv[2], NULL, 0);
1518
1519         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1520         if (ret != 0) {
1521                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1522                 return ret;
1523         }
1524         return -1;
1525 }
1526
1527 /*
1528   unregister a server id
1529  */
1530 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1531 {
1532         int ret;
1533         struct ctdb_server_id server_id;
1534
1535         if (argc < 3) {
1536                 usage();
1537         }
1538
1539         server_id.pnn       = strtoul(argv[0], NULL, 0);
1540         server_id.type      = strtoul(argv[1], NULL, 0);
1541         server_id.server_id = strtoul(argv[2], NULL, 0);
1542
1543         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1544         if (ret != 0) {
1545                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1546                 return ret;
1547         }
1548         return -1;
1549 }
1550
1551 /*
1552   check if a server id exists
1553  */
1554 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1555 {
1556         uint32_t status;
1557         int ret;
1558         struct ctdb_server_id server_id;
1559
1560         if (argc < 3) {
1561                 usage();
1562         }
1563
1564         server_id.pnn       = strtoul(argv[0], NULL, 0);
1565         server_id.type      = strtoul(argv[1], NULL, 0);
1566         server_id.server_id = strtoul(argv[2], NULL, 0);
1567
1568         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1569         if (ret != 0) {
1570                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1571                 return ret;
1572         }
1573
1574         if (status) {
1575                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1576         } else {
1577                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1578         }
1579         return 0;
1580 }
1581
1582 /*
1583   get a list of all server ids that are registered on a node
1584  */
1585 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1586 {
1587         int i, ret;
1588         struct ctdb_server_id_list *server_ids;
1589
1590         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1593                 return ret;
1594         }
1595
1596         for (i=0; i<server_ids->num; i++) {
1597                 printf("Server id %d:%d:%d\n", 
1598                         server_ids->server_ids[i].pnn, 
1599                         server_ids->server_ids[i].type, 
1600                         server_ids->server_ids[i].server_id); 
1601         }
1602
1603         return -1;
1604 }
1605
1606 /*
1607   send a tcp tickle ack
1608  */
1609 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1610 {
1611         int ret;
1612         ctdb_sock_addr  src, dst;
1613
1614         if (argc < 2) {
1615                 usage();
1616         }
1617
1618         if (!parse_ip_port(argv[0], &src)) {
1619                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1620                 return -1;
1621         }
1622
1623         if (!parse_ip_port(argv[1], &dst)) {
1624                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1625                 return -1;
1626         }
1627
1628         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1629         if (ret==0) {
1630                 return 0;
1631         }
1632         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1633
1634         return -1;
1635 }
1636
1637
1638 /*
1639   display public ip status
1640  */
1641 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1642 {
1643         int i, ret;
1644         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1645         struct ctdb_all_public_ips *ips;
1646
1647         if (options.pnn == CTDB_BROADCAST_ALL) {
1648                 /* read the list of public ips from all nodes */
1649                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1650         } else {
1651                 /* read the public ip list from this node */
1652                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1653         }
1654         if (ret != 0) {
1655                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1656                 talloc_free(tmp_ctx);
1657                 return ret;
1658         }
1659
1660         if (options.machinereadable){
1661                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1662         } else {
1663                 if (options.pnn == CTDB_BROADCAST_ALL) {
1664                         printf("Public IPs on ALL nodes\n");
1665                 } else {
1666                         printf("Public IPs on node %u\n", options.pnn);
1667                 }
1668         }
1669
1670         for (i=1;i<=ips->num;i++) {
1671                 struct ctdb_control_public_ip_info *info = NULL;
1672                 int32_t pnn;
1673                 char *aciface = NULL;
1674                 char *avifaces = NULL;
1675                 char *cifaces = NULL;
1676
1677                 if (options.pnn == CTDB_BROADCAST_ALL) {
1678                         pnn = ips->ips[ips->num-i].pnn;
1679                 } else {
1680                         pnn = options.pnn;
1681                 }
1682
1683                 if (pnn != -1) {
1684                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1685                                                    &ips->ips[ips->num-i].addr, &info);
1686                 } else {
1687                         ret = -1;
1688                 }
1689
1690                 if (ret == 0) {
1691                         int j;
1692                         for (j=0; j < info->num; j++) {
1693                                 if (cifaces == NULL) {
1694                                         cifaces = talloc_strdup(info,
1695                                                                 info->ifaces[j].name);
1696                                 } else {
1697                                         cifaces = talloc_asprintf_append(cifaces,
1698                                                                          ",%s",
1699                                                                          info->ifaces[j].name);
1700                                 }
1701
1702                                 if (info->active_idx == j) {
1703                                         aciface = info->ifaces[j].name;
1704                                 }
1705
1706                                 if (info->ifaces[j].link_state == 0) {
1707                                         continue;
1708                                 }
1709
1710                                 if (avifaces == NULL) {
1711                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1712                                 } else {
1713                                         avifaces = talloc_asprintf_append(avifaces,
1714                                                                           ",%s",
1715                                                                           info->ifaces[j].name);
1716                                 }
1717                         }
1718                 }
1719
1720                 if (options.machinereadable){
1721                         printf(":%s:%d:%s:%s:%s:\n",
1722                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1723                                ips->ips[ips->num-i].pnn,
1724                                aciface?aciface:"",
1725                                avifaces?avifaces:"",
1726                                cifaces?cifaces:"");
1727                 } else {
1728                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1729                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1730                                ips->ips[ips->num-i].pnn,
1731                                aciface?aciface:"",
1732                                avifaces?avifaces:"",
1733                                cifaces?cifaces:"");
1734                 }
1735                 talloc_free(info);
1736         }
1737
1738         talloc_free(tmp_ctx);
1739         return 0;
1740 }
1741
1742 /*
1743   public ip info
1744  */
1745 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1746 {
1747         int i, ret;
1748         ctdb_sock_addr addr;
1749         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1750         struct ctdb_control_public_ip_info *info;
1751
1752         if (argc != 1) {
1753                 talloc_free(tmp_ctx);
1754                 usage();
1755         }
1756
1757         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1758                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1759                 return -1;
1760         }
1761
1762         /* read the public ip info from this node */
1763         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1764                                            tmp_ctx, &addr, &info);
1765         if (ret != 0) {
1766                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1767                                   argv[0], options.pnn));
1768                 talloc_free(tmp_ctx);
1769                 return ret;
1770         }
1771
1772         printf("Public IP[%s] info on node %u\n",
1773                ctdb_addr_to_str(&info->ip.addr),
1774                options.pnn);
1775
1776         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1777                ctdb_addr_to_str(&info->ip.addr),
1778                info->ip.pnn, info->num);
1779
1780         for (i=0; i<info->num; i++) {
1781                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1782
1783                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1784                        i+1, info->ifaces[i].name,
1785                        info->ifaces[i].link_state?"up":"down",
1786                        (unsigned int)info->ifaces[i].references,
1787                        (i==info->active_idx)?" (active)":"");
1788         }
1789
1790         talloc_free(tmp_ctx);
1791         return 0;
1792 }
1793
1794 /*
1795   display interfaces status
1796  */
1797 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1798 {
1799         int i, ret;
1800         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1801         struct ctdb_control_get_ifaces *ifaces;
1802
1803         /* read the public ip list from this node */
1804         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1805                                    tmp_ctx, &ifaces);
1806         if (ret != 0) {
1807                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1808                                   options.pnn));
1809                 talloc_free(tmp_ctx);
1810                 return ret;
1811         }
1812
1813         if (options.machinereadable){
1814                 printf(":Name:LinkStatus:References:\n");
1815         } else {
1816                 printf("Interfaces on node %u\n", options.pnn);
1817         }
1818
1819         for (i=0; i<ifaces->num; i++) {
1820                 if (options.machinereadable){
1821                         printf(":%s:%s:%u\n",
1822                                ifaces->ifaces[i].name,
1823                                ifaces->ifaces[i].link_state?"1":"0",
1824                                (unsigned int)ifaces->ifaces[i].references);
1825                 } else {
1826                         printf("name:%s link:%s references:%u\n",
1827                                ifaces->ifaces[i].name,
1828                                ifaces->ifaces[i].link_state?"up":"down",
1829                                (unsigned int)ifaces->ifaces[i].references);
1830                 }
1831         }
1832
1833         talloc_free(tmp_ctx);
1834         return 0;
1835 }
1836
1837
1838 /*
1839   set link status of an interface
1840  */
1841 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1842 {
1843         int ret;
1844         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1845         struct ctdb_control_iface_info info;
1846
1847         ZERO_STRUCT(info);
1848
1849         if (argc != 2) {
1850                 usage();
1851         }
1852
1853         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1854                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1855                                   argv[0]));
1856                 talloc_free(tmp_ctx);
1857                 return -1;
1858         }
1859         strcpy(info.name, argv[0]);
1860
1861         if (strcmp(argv[1], "up") == 0) {
1862                 info.link_state = 1;
1863         } else if (strcmp(argv[1], "down") == 0) {
1864                 info.link_state = 0;
1865         } else {
1866                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1867                                   argv[1]));
1868                 talloc_free(tmp_ctx);
1869                 return -1;
1870         }
1871
1872         /* read the public ip list from this node */
1873         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1874                                    tmp_ctx, &info);
1875         if (ret != 0) {
1876                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1877                                   argv[0], options.pnn));
1878                 talloc_free(tmp_ctx);
1879                 return ret;
1880         }
1881
1882         talloc_free(tmp_ctx);
1883         return 0;
1884 }
1885
1886 /*
1887   display pid of a ctdb daemon
1888  */
1889 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1890 {
1891         uint32_t pid;
1892         int ret;
1893
1894         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1897                 return ret;
1898         }
1899         printf("Pid:%d\n", pid);
1900
1901         return 0;
1902 }
1903
1904 /*
1905   handler for receiving the response to ipreallocate
1906 */
1907 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1908                              TDB_DATA data, void *private_data)
1909 {
1910         exit(0);
1911 }
1912
1913 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1914 {
1915         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1916
1917         event_add_timed(ctdb->ev, ctdb, 
1918                                 timeval_current_ofs(1, 0),
1919                                 ctdb_every_second, ctdb);
1920 }
1921
1922 /*
1923   ask the recovery daemon on the recovery master to perform a ip reallocation
1924  */
1925 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1926 {
1927         int i, ret;
1928         TDB_DATA data;
1929         struct takeover_run_reply rd;
1930         uint32_t recmaster;
1931         struct ctdb_node_map *nodemap=NULL;
1932         int retries=0;
1933         struct timeval tv = timeval_current();
1934
1935         /* we need some events to trigger so we can timeout and restart
1936            the loop
1937         */
1938         event_add_timed(ctdb->ev, ctdb, 
1939                                 timeval_current_ofs(1, 0),
1940                                 ctdb_every_second, ctdb);
1941
1942         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1943         if (rd.pnn == -1) {
1944                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1945                 return -1;
1946         }
1947         rd.srvid = getpid();
1948
1949         /* register a message port for receiveing the reply so that we
1950            can receive the reply
1951         */
1952         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1953
1954         data.dptr = (uint8_t *)&rd;
1955         data.dsize = sizeof(rd);
1956
1957 again:
1958         if (retries>5) {
1959                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1960                 exit(10);
1961         }
1962
1963         /* check that there are valid nodes available */
1964         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1965                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1966                 exit(10);
1967         }
1968         for (i=0; i<nodemap->num;i++) {
1969                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1970                         break;
1971                 }
1972         }
1973         if (i==nodemap->num) {
1974                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1975                 return 0;
1976         }
1977
1978
1979         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1980         if (ret != 0) {
1981                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1982                 return ret;
1983         }
1984
1985         /* verify the node exists */
1986         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1987                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1988                 exit(10);
1989         }
1990
1991
1992         /* check tha there are nodes available that can act as a recmaster */
1993         for (i=0; i<nodemap->num; i++) {
1994                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1995                         continue;
1996                 }
1997                 break;
1998         }
1999         if (i == nodemap->num) {
2000                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2001                 return 0;
2002         }
2003
2004         /* verify the recovery master is not STOPPED, nor BANNED */
2005         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2006                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2007                 retries++;
2008                 sleep(1);
2009                 goto again;
2010         } 
2011
2012         
2013         /* verify the recovery master is not STOPPED, nor BANNED */
2014         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2015                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2016                 retries++;
2017                 sleep(1);
2018                 goto again;
2019         } 
2020
2021         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2022         if (ret != 0) {
2023                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2024                 return -1;
2025         }
2026
2027         tv = timeval_current();
2028         /* this loop will terminate when we have received the reply */
2029         while (timeval_elapsed(&tv) < 3.0) {    
2030                 event_loop_once(ctdb->ev);
2031         }
2032
2033         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2034         retries++;
2035         sleep(1);
2036         goto again;
2037
2038         return 0;
2039 }
2040
2041
2042 /*
2043   disable a remote node
2044  */
2045 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2046 {
2047         int ret;
2048         struct ctdb_node_map *nodemap=NULL;
2049
2050         do {
2051                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2052                 if (ret != 0) {
2053                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2054                         return ret;
2055                 }
2056
2057                 sleep(1);
2058
2059                 /* read the nodemap and verify the change took effect */
2060                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2061                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2062                         exit(10);
2063                 }
2064
2065         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2066         ret = control_ipreallocate(ctdb, argc, argv);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2069                 return ret;
2070         }
2071
2072         return 0;
2073 }
2074
2075 /*
2076   enable a disabled remote node
2077  */
2078 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2079 {
2080         int ret;
2081
2082         struct ctdb_node_map *nodemap=NULL;
2083
2084         do {
2085                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2086                 if (ret != 0) {
2087                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2088                         return ret;
2089                 }
2090
2091                 sleep(1);
2092
2093                 /* read the nodemap and verify the change took effect */
2094                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2095                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2096                         exit(10);
2097                 }
2098
2099         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2100         ret = control_ipreallocate(ctdb, argc, argv);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2103                 return ret;
2104         }
2105
2106         return 0;
2107 }
2108
2109 /*
2110   stop a remote node
2111  */
2112 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2113 {
2114         int ret;
2115         struct ctdb_node_map *nodemap=NULL;
2116
2117         do {
2118                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2119                 if (ret != 0) {
2120                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2121                 }
2122         
2123                 sleep(1);
2124
2125                 /* read the nodemap and verify the change took effect */
2126                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2127                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2128                         exit(10);
2129                 }
2130
2131         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2132         ret = control_ipreallocate(ctdb, argc, argv);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2135                 return ret;
2136         }
2137
2138         return 0;
2139 }
2140
2141 /*
2142   restart a stopped remote node
2143  */
2144 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2145 {
2146         int ret;
2147
2148         struct ctdb_node_map *nodemap=NULL;
2149
2150         do {
2151                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2152                 if (ret != 0) {
2153                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2154                         return ret;
2155                 }
2156         
2157                 sleep(1);
2158
2159                 /* read the nodemap and verify the change took effect */
2160                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2161                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2162                         exit(10);
2163                 }
2164
2165         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2166         ret = control_ipreallocate(ctdb, argc, argv);
2167         if (ret != 0) {
2168                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2169                 return ret;
2170         }
2171
2172         return 0;
2173 }
2174
2175 static uint32_t get_generation(struct ctdb_context *ctdb)
2176 {
2177         struct ctdb_vnn_map *vnnmap=NULL;
2178         int ret;
2179
2180         /* wait until the recmaster is not in recovery mode */
2181         while (1) {
2182                 uint32_t recmode, recmaster;
2183                 
2184                 if (vnnmap != NULL) {
2185                         talloc_free(vnnmap);
2186                         vnnmap = NULL;
2187                 }
2188
2189                 /* get the recmaster */
2190                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2191                 if (ret != 0) {
2192                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2193                         exit(10);
2194                 }
2195
2196                 /* get recovery mode */
2197                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2198                 if (ret != 0) {
2199                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2200                         exit(10);
2201                 }
2202
2203                 /* get the current generation number */
2204                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2205                 if (ret != 0) {
2206                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2207                         exit(10);
2208                 }
2209
2210                 if ((recmode == CTDB_RECOVERY_NORMAL)
2211                 &&  (vnnmap->generation != 1)){
2212                         return vnnmap->generation;
2213                 }
2214                 sleep(1);
2215         }
2216 }
2217
2218 /*
2219   ban a node from the cluster
2220  */
2221 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2222 {
2223         int ret;
2224         struct ctdb_node_map *nodemap=NULL;
2225         struct ctdb_ban_time bantime;
2226
2227         if (argc < 1) {
2228                 usage();
2229         }
2230         
2231         /* verify the node exists */
2232         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2233         if (ret != 0) {
2234                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2235                 return ret;
2236         }
2237
2238         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2239                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2240                 return -1;
2241         }
2242
2243         bantime.pnn  = options.pnn;
2244         bantime.time = strtoul(argv[0], NULL, 0);
2245
2246         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2247         if (ret != 0) {
2248                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2249                 return -1;
2250         }       
2251
2252         ret = control_ipreallocate(ctdb, argc, argv);
2253         if (ret != 0) {
2254                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2255                 return ret;
2256         }
2257
2258         return 0;
2259 }
2260
2261
2262 /*
2263   unban a node from the cluster
2264  */
2265 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2266 {
2267         int ret;
2268         struct ctdb_node_map *nodemap=NULL;
2269         struct ctdb_ban_time bantime;
2270
2271         /* verify the node exists */
2272         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2273         if (ret != 0) {
2274                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2275                 return ret;
2276         }
2277
2278         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2279                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2280                 return -1;
2281         }
2282
2283         bantime.pnn  = options.pnn;
2284         bantime.time = 0;
2285
2286         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2289                 return -1;
2290         }       
2291
2292         ret = control_ipreallocate(ctdb, argc, argv);
2293         if (ret != 0) {
2294                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2295                 return ret;
2296         }
2297
2298         return 0;
2299 }
2300
2301
2302 /*
2303   show ban information for a node
2304  */
2305 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2306 {
2307         int ret;
2308         struct ctdb_node_map *nodemap=NULL;
2309         struct ctdb_ban_time *bantime;
2310
2311         /* verify the node exists */
2312         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2313         if (ret != 0) {
2314                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2315                 return ret;
2316         }
2317
2318         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2319         if (ret != 0) {
2320                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2321                 return -1;
2322         }       
2323
2324         if (bantime->time == 0) {
2325                 printf("Node %u is not banned\n", bantime->pnn);
2326         } else {
2327                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2328         }
2329
2330         return 0;
2331 }
2332
2333 /*
2334   shutdown a daemon
2335  */
2336 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2337 {
2338         int ret;
2339
2340         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2341         if (ret != 0) {
2342                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2343                 return ret;
2344         }
2345
2346         return 0;
2347 }
2348
2349 /*
2350   trigger a recovery
2351  */
2352 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2353 {
2354         int ret;
2355         uint32_t generation, next_generation;
2356
2357         /* record the current generation number */
2358         generation = get_generation(ctdb);
2359
2360         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2361         if (ret != 0) {
2362                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2363                 return ret;
2364         }
2365
2366         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2367         if (ret != 0) {
2368                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2369                 return ret;
2370         }
2371
2372         /* wait until we are in a new generation */
2373         while (1) {
2374                 next_generation = get_generation(ctdb);
2375                 if (next_generation != generation) {
2376                         return 0;
2377                 }
2378                 sleep(1);
2379         }
2380
2381         return 0;
2382 }
2383
2384
2385 /*
2386   display monitoring mode of a remote node
2387  */
2388 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2389 {
2390         uint32_t monmode;
2391         int ret;
2392
2393         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2394         if (ret != 0) {
2395                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2396                 return ret;
2397         }
2398         if (!options.machinereadable){
2399                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2400         } else {
2401                 printf(":mode:\n");
2402                 printf(":%d:\n",monmode);
2403         }
2404         return 0;
2405 }
2406
2407
2408 /*
2409   display capabilities of a remote node
2410  */
2411 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2412 {
2413         uint32_t capabilities;
2414         int ret;
2415
2416         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2417         if (ret != 0) {
2418                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2419                 return ret;
2420         }
2421         
2422         if (!options.machinereadable){
2423                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2424                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2425                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2426                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2427         } else {
2428                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2429                 printf(":%d:%d:%d:%d:\n",
2430                         !!(capabilities&CTDB_CAP_RECMASTER),
2431                         !!(capabilities&CTDB_CAP_LMASTER),
2432                         !!(capabilities&CTDB_CAP_LVS),
2433                         !!(capabilities&CTDB_CAP_NATGW));
2434         }
2435         return 0;
2436 }
2437
2438 /*
2439   display lvs configuration
2440  */
2441 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2442 {
2443         uint32_t *capabilities;
2444         struct ctdb_node_map *nodemap=NULL;
2445         int i, ret;
2446         int healthy_count = 0;
2447
2448         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2449         if (ret != 0) {
2450                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2451                 return ret;
2452         }
2453
2454         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2455         CTDB_NO_MEMORY(ctdb, capabilities);
2456         
2457         /* collect capabilities for all connected nodes */
2458         for (i=0; i<nodemap->num; i++) {
2459                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2460                         continue;
2461                 }
2462                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2463                         continue;
2464                 }
2465         
2466                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2467                 if (ret != 0) {
2468                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2469                         return ret;
2470                 }
2471
2472                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2473                         continue;
2474                 }
2475
2476                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2477                         healthy_count++;
2478                 }
2479         }
2480
2481         /* Print all LVS nodes */
2482         for (i=0; i<nodemap->num; i++) {
2483                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2484                         continue;
2485                 }
2486                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2487                         continue;
2488                 }
2489                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2490                         continue;
2491                 }
2492
2493                 if (healthy_count != 0) {
2494                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2495                                 continue;
2496                         }
2497                 }
2498
2499                 printf("%d:%s\n", i, 
2500                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2501         }
2502
2503         return 0;
2504 }
2505
2506 /*
2507   display who is the lvs master
2508  */
2509 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2510 {
2511         uint32_t *capabilities;
2512         struct ctdb_node_map *nodemap=NULL;
2513         int i, ret;
2514         int healthy_count = 0;
2515
2516         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2517         if (ret != 0) {
2518                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2519                 return ret;
2520         }
2521
2522         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2523         CTDB_NO_MEMORY(ctdb, capabilities);
2524         
2525         /* collect capabilities for all connected nodes */
2526         for (i=0; i<nodemap->num; i++) {
2527                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2528                         continue;
2529                 }
2530                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2531                         continue;
2532                 }
2533         
2534                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2535                 if (ret != 0) {
2536                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2537                         return ret;
2538                 }
2539
2540                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2541                         continue;
2542                 }
2543
2544                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2545                         healthy_count++;
2546                 }
2547         }
2548
2549         /* find and show the lvsmaster */
2550         for (i=0; i<nodemap->num; i++) {
2551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2552                         continue;
2553                 }
2554                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2555                         continue;
2556                 }
2557                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2558                         continue;
2559                 }
2560
2561                 if (healthy_count != 0) {
2562                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2563                                 continue;
2564                         }
2565                 }
2566
2567                 if (options.machinereadable){
2568                         printf("%d\n", i);
2569                 } else {
2570                         printf("Node %d is LVS master\n", i);
2571                 }
2572                 return 0;
2573         }
2574
2575         printf("There is no LVS master\n");
2576         return -1;
2577 }
2578
2579 /*
2580   disable monitoring on a  node
2581  */
2582 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2583 {
2584         
2585         int ret;
2586
2587         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2588         if (ret != 0) {
2589                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2590                 return ret;
2591         }
2592         printf("Monitoring mode:%s\n","DISABLED");
2593
2594         return 0;
2595 }
2596
2597 /*
2598   enable monitoring on a  node
2599  */
2600 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2601 {
2602         
2603         int ret;
2604
2605         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2606         if (ret != 0) {
2607                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2608                 return ret;
2609         }
2610         printf("Monitoring mode:%s\n","ACTIVE");
2611
2612         return 0;
2613 }
2614
2615 /*
2616   display remote list of keys/data for a db
2617  */
2618 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2619 {
2620         const char *db_name;
2621         struct ctdb_db_context *ctdb_db;
2622         int ret;
2623
2624         if (argc < 1) {
2625                 usage();
2626         }
2627
2628         db_name = argv[0];
2629
2630
2631         if (db_exists(ctdb, db_name)) {
2632                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2633                 return -1;
2634         }
2635
2636         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2637
2638         if (ctdb_db == NULL) {
2639                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2640                 return -1;
2641         }
2642
2643         /* traverse and dump the cluster tdb */
2644         ret = ctdb_dump_db(ctdb_db, stdout);
2645         if (ret == -1) {
2646                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2647                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2648                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2649                                   db_name));
2650                 return -1;
2651         }
2652         talloc_free(ctdb_db);
2653
2654         printf("Dumped %d records\n", ret);
2655         return 0;
2656 }
2657
2658
2659 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2660                              TDB_DATA data, void *private_data)
2661 {
2662         DEBUG(DEBUG_ERR,("Log data received\n"));
2663         if (data.dsize > 0) {
2664                 printf("%s", data.dptr);
2665         }
2666
2667         exit(0);
2668 }
2669
2670 /*
2671   display a list of log messages from the in memory ringbuffer
2672  */
2673 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2674 {
2675         int ret;
2676         int32_t res;
2677         struct ctdb_get_log_addr log_addr;
2678         TDB_DATA data;
2679         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2680         char *errmsg;
2681         struct timeval tv;
2682
2683         if (argc != 1) {
2684                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2685                 talloc_free(tmp_ctx);
2686                 return -1;
2687         }
2688
2689         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2690         log_addr.srvid = getpid();
2691         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2692                 log_addr.level = get_debug_by_desc(argv[0]);
2693         } else {
2694                 log_addr.level = strtol(argv[0], NULL, 0);
2695         }
2696
2697
2698         data.dptr = (unsigned char *)&log_addr;
2699         data.dsize = sizeof(log_addr);
2700
2701         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2702
2703         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2704         sleep(1);
2705
2706         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2707
2708         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2709                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2710         if (ret != 0 || res != 0) {
2711                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2712                 talloc_free(tmp_ctx);
2713                 return -1;
2714         }
2715
2716
2717         tv = timeval_current();
2718         /* this loop will terminate when we have received the reply */
2719         while (timeval_elapsed(&tv) < 3.0) {    
2720                 event_loop_once(ctdb->ev);
2721         }
2722
2723         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2724
2725         talloc_free(tmp_ctx);
2726         return 0;
2727 }
2728
2729 /*
2730   clear the in memory log area
2731  */
2732 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2733 {
2734         int ret;
2735         int32_t res;
2736         char *errmsg;
2737         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2738
2739         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2740                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2741         if (ret != 0 || res != 0) {
2742                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2743                 talloc_free(tmp_ctx);
2744                 return -1;
2745         }
2746
2747         talloc_free(tmp_ctx);
2748         return 0;
2749 }
2750
2751
2752
2753 /*
2754   display a list of the databases on a remote ctdb
2755  */
2756 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2757 {
2758         int i, ret;
2759         struct ctdb_dbid_map *dbmap=NULL;
2760
2761         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2762         if (ret != 0) {
2763                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2764                 return ret;
2765         }
2766
2767         if(options.machinereadable){
2768                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2769                 for(i=0;i<dbmap->num;i++){
2770                         const char *path;
2771                         const char *name;
2772                         const char *health;
2773                         bool persistent;
2774
2775                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2776                                             dbmap->dbs[i].dbid, ctdb, &path);
2777                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2778                                             dbmap->dbs[i].dbid, ctdb, &name);
2779                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2780                                               dbmap->dbs[i].dbid, ctdb, &health);
2781                         persistent = dbmap->dbs[i].persistent;
2782                         printf(":0x%08X:%s:%s:%d:%d:\n",
2783                                dbmap->dbs[i].dbid, name, path,
2784                                !!(persistent), !!(health));
2785                 }
2786                 return 0;
2787         }
2788
2789         printf("Number of databases:%d\n", dbmap->num);
2790         for(i=0;i<dbmap->num;i++){
2791                 const char *path;
2792                 const char *name;
2793                 const char *health;
2794                 bool persistent;
2795
2796                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2797                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2798                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2799                 persistent = dbmap->dbs[i].persistent;
2800                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2801                        dbmap->dbs[i].dbid, name, path,
2802                        persistent?" PERSISTENT":"",
2803                        health?" UNHEALTHY":"");
2804         }
2805
2806         return 0;
2807 }
2808
2809 /*
2810   display the status of a database on a remote ctdb
2811  */
2812 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2813 {
2814         int i, ret;
2815         struct ctdb_dbid_map *dbmap=NULL;
2816         const char *db_name;
2817
2818         if (argc < 1) {
2819                 usage();
2820         }
2821
2822         db_name = argv[0];
2823
2824         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2825         if (ret != 0) {
2826                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2827                 return ret;
2828         }
2829
2830         for(i=0;i<dbmap->num;i++){
2831                 const char *path;
2832                 const char *name;
2833                 const char *health;
2834                 bool persistent;
2835
2836                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2837                 if (strcmp(name, db_name) != 0) {
2838                         continue;
2839                 }
2840
2841                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2842                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2843                 persistent = dbmap->dbs[i].persistent;
2844                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2845                        dbmap->dbs[i].dbid, name, path,
2846                        persistent?"yes":"no",
2847                        health?health:"OK");
2848                 return 0;
2849         }
2850
2851         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2852         return 0;
2853 }
2854
2855 /*
2856   check if the local node is recmaster or not
2857   it will return 1 if this node is the recmaster and 0 if it is not
2858   or if the local ctdb daemon could not be contacted
2859  */
2860 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2861 {
2862         uint32_t mypnn, recmaster;
2863         int ret;
2864
2865         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2866         if (mypnn == -1) {
2867                 printf("Failed to get pnn of node\n");
2868                 return 1;
2869         }
2870
2871         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2872         if (ret != 0) {
2873                 printf("Failed to get the recmaster\n");
2874                 return 1;
2875         }
2876
2877         if (recmaster != mypnn) {
2878                 printf("this node is not the recmaster\n");
2879                 return 1;
2880         }
2881
2882         printf("this node is the recmaster\n");
2883         return 0;
2884 }
2885
2886 /*
2887   ping a node
2888  */
2889 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2890 {
2891         int ret;
2892         struct timeval tv = timeval_current();
2893         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2894         if (ret == -1) {
2895                 printf("Unable to get ping response from node %u\n", options.pnn);
2896                 return -1;
2897         } else {
2898                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2899                        options.pnn, timeval_elapsed(&tv), ret);
2900         }
2901         return 0;
2902 }
2903
2904
2905 /*
2906   get a tunable
2907  */
2908 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2909 {
2910         const char *name;
2911         uint32_t value;
2912         int ret;
2913
2914         if (argc < 1) {
2915                 usage();
2916         }
2917
2918         name = argv[0];
2919         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2920         if (ret == -1) {
2921                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2922                 return -1;
2923         }
2924
2925         printf("%-19s = %u\n", name, value);
2926         return 0;
2927 }
2928
2929 /*
2930   set a tunable
2931  */
2932 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2933 {
2934         const char *name;
2935         uint32_t value;
2936         int ret;
2937
2938         if (argc < 2) {
2939                 usage();
2940         }
2941
2942         name = argv[0];
2943         value = strtoul(argv[1], NULL, 0);
2944
2945         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2946         if (ret == -1) {
2947                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2948                 return -1;
2949         }
2950         return 0;
2951 }
2952
2953 /*
2954   list all tunables
2955  */
2956 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2957 {
2958         uint32_t count;
2959         const char **list;
2960         int ret, i;
2961
2962         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2963         if (ret == -1) {
2964                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2965                 return -1;
2966         }
2967
2968         for (i=0;i<count;i++) {
2969                 control_getvar(ctdb, 1, &list[i]);
2970         }
2971
2972         talloc_free(list);
2973         
2974         return 0;
2975 }
2976
2977 /*
2978   display debug level on a node
2979  */
2980 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2981 {
2982         int ret;
2983         int32_t level;
2984
2985         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2986         if (ret != 0) {
2987                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2988                 return ret;
2989         } else {
2990                 if (options.machinereadable){
2991                         printf(":Name:Level:\n");
2992                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2993                 } else {
2994                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2995                 }
2996         }
2997         return 0;
2998 }
2999
3000 /*
3001   display reclock file of a node
3002  */
3003 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3004 {
3005         int ret;
3006         const char *reclock;
3007
3008         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3009         if (ret != 0) {
3010                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3011                 return ret;
3012         } else {
3013                 if (options.machinereadable){
3014                         if (reclock != NULL) {
3015                                 printf("%s", reclock);
3016                         }
3017                 } else {
3018                         if (reclock == NULL) {
3019                                 printf("No reclock file used.\n");
3020                         } else {
3021                                 printf("Reclock file:%s\n", reclock);
3022                         }
3023                 }
3024         }
3025         return 0;
3026 }
3027
3028 /*
3029   set the reclock file of a node
3030  */
3031 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3032 {
3033         int ret;
3034         const char *reclock;
3035
3036         if (argc == 0) {
3037                 reclock = NULL;
3038         } else if (argc == 1) {
3039                 reclock = argv[0];
3040         } else {
3041                 usage();
3042         }
3043
3044         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3045         if (ret != 0) {
3046                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3047                 return ret;
3048         }
3049         return 0;
3050 }
3051
3052 /*
3053   set the natgw state on/off
3054  */
3055 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3056 {
3057         int ret;
3058         uint32_t natgwstate;
3059
3060         if (argc == 0) {
3061                 usage();
3062         }
3063
3064         if (!strcmp(argv[0], "on")) {
3065                 natgwstate = 1;
3066         } else if (!strcmp(argv[0], "off")) {
3067                 natgwstate = 0;
3068         } else {
3069                 usage();
3070         }
3071
3072         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3073         if (ret != 0) {
3074                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3075                 return ret;
3076         }
3077
3078         return 0;
3079 }
3080
3081 /*
3082   set the lmaster role on/off
3083  */
3084 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3085 {
3086         int ret;
3087         uint32_t lmasterrole;
3088
3089         if (argc == 0) {
3090                 usage();
3091         }
3092
3093         if (!strcmp(argv[0], "on")) {
3094                 lmasterrole = 1;
3095         } else if (!strcmp(argv[0], "off")) {
3096                 lmasterrole = 0;
3097         } else {
3098                 usage();
3099         }
3100
3101         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3102         if (ret != 0) {
3103                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3104                 return ret;
3105         }
3106
3107         return 0;
3108 }
3109
3110 /*
3111   set the recmaster role on/off
3112  */
3113 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3114 {
3115         int ret;
3116         uint32_t recmasterrole;
3117
3118         if (argc == 0) {
3119                 usage();
3120         }
3121
3122         if (!strcmp(argv[0], "on")) {
3123                 recmasterrole = 1;
3124         } else if (!strcmp(argv[0], "off")) {
3125                 recmasterrole = 0;
3126         } else {
3127                 usage();
3128         }
3129
3130         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3131         if (ret != 0) {
3132                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3133                 return ret;
3134         }
3135
3136         return 0;
3137 }
3138
3139 /*
3140   set debug level on a node or all nodes
3141  */
3142 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3143 {
3144         int i, ret;
3145         int32_t level;
3146
3147         if (argc == 0) {
3148                 printf("You must specify the debug level. Valid levels are:\n");
3149                 for (i=0; debug_levels[i].description != NULL; i++) {
3150                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3151                 }
3152
3153                 return 0;
3154         }
3155
3156         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3157                 level = get_debug_by_desc(argv[0]);
3158         } else {
3159                 level = strtol(argv[0], NULL, 0);
3160         }
3161
3162         for (i=0; debug_levels[i].description != NULL; i++) {
3163                 if (level == debug_levels[i].level) {
3164                         break;
3165                 }
3166         }
3167         if (debug_levels[i].description == NULL) {
3168                 printf("Invalid debug level, must be one of\n");
3169                 for (i=0; debug_levels[i].description != NULL; i++) {
3170                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3171                 }
3172                 return -1;
3173         }
3174
3175         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3176         if (ret != 0) {
3177                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3178         }
3179         return 0;
3180 }
3181
3182
3183 /*
3184   freeze a node
3185  */
3186 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3187 {
3188         int ret;
3189         uint32_t priority;
3190         
3191         if (argc == 1) {
3192                 priority = strtol(argv[0], NULL, 0);
3193         } else {
3194                 priority = 0;
3195         }
3196         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3197
3198         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3199         if (ret != 0) {
3200                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3201         }               
3202         return 0;
3203 }
3204
3205 /*
3206   thaw a node
3207  */
3208 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3209 {
3210         int ret;
3211         uint32_t priority;
3212         
3213         if (argc == 1) {
3214                 priority = strtol(argv[0], NULL, 0);
3215         } else {
3216                 priority = 0;
3217         }
3218         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3219
3220         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3221         if (ret != 0) {
3222                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3223         }               
3224         return 0;
3225 }
3226
3227
3228 /*
3229   attach to a database
3230  */
3231 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3232 {
3233         const char *db_name;
3234         struct ctdb_db_context *ctdb_db;
3235
3236         if (argc < 1) {
3237                 usage();
3238         }
3239         db_name = argv[0];
3240
3241         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3242         if (ctdb_db == NULL) {
3243                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3244                 return -1;
3245         }
3246
3247         return 0;
3248 }
3249
3250 /*
3251   set db priority
3252  */
3253 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3254 {
3255         struct ctdb_db_priority db_prio;
3256         int ret;
3257
3258         if (argc < 2) {
3259                 usage();
3260         }
3261
3262         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3263         db_prio.priority = strtoul(argv[1], NULL, 0);
3264
3265         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3266         if (ret != 0) {
3267                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3268                 return -1;
3269         }
3270
3271         return 0;
3272 }
3273
3274 /*
3275   get db priority
3276  */
3277 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3278 {
3279         uint32_t db_id, priority;
3280         int ret;
3281
3282         if (argc < 1) {
3283                 usage();
3284         }
3285
3286         db_id = strtoul(argv[0], NULL, 0);
3287
3288         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3289         if (ret != 0) {
3290                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3291                 return -1;
3292         }
3293
3294         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3295
3296         return 0;
3297 }
3298
3299 /*
3300   run an eventscript on a node
3301  */
3302 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3303 {
3304         TDB_DATA data;
3305         int ret;
3306         int32_t res;
3307         char *errmsg;
3308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3309
3310         if (argc != 1) {
3311                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3312                 return -1;
3313         }
3314
3315         data.dptr = (unsigned char *)discard_const(argv[0]);
3316         data.dsize = strlen((char *)data.dptr) + 1;
3317
3318         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3319
3320         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3321                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3322         if (ret != 0 || res != 0) {
3323                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3324                 talloc_free(tmp_ctx);
3325                 return -1;
3326         }
3327         talloc_free(tmp_ctx);
3328         return 0;
3329 }
3330
3331 #define DB_VERSION 1
3332 #define MAX_DB_NAME 64
3333 struct db_file_header {
3334         unsigned long version;
3335         time_t timestamp;
3336         unsigned long persistent;
3337         unsigned long size;
3338         const char name[MAX_DB_NAME];
3339 };
3340
3341 struct backup_data {
3342         struct ctdb_marshall_buffer *records;
3343         uint32_t len;
3344         uint32_t total;
3345         bool traverse_error;
3346 };
3347
3348 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3349 {
3350         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3351         struct ctdb_rec_data *rec;
3352
3353         /* add the record */
3354         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3355         if (rec == NULL) {
3356                 bd->traverse_error = true;
3357                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3358                 return -1;
3359         }
3360         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3361         if (bd->records == NULL) {
3362                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3363                 bd->traverse_error = true;
3364                 return -1;
3365         }
3366         bd->records->count++;
3367         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3368         bd->len += rec->length;
3369         talloc_free(rec);
3370
3371         bd->total++;
3372         return 0;
3373 }
3374
3375 /*
3376  * backup a database to a file 
3377  */
3378 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3379 {
3380         int i, ret;
3381         struct ctdb_dbid_map *dbmap=NULL;
3382         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3383         struct db_file_header dbhdr;
3384         struct ctdb_db_context *ctdb_db;
3385         struct backup_data *bd;
3386         int fh = -1;
3387         int status = -1;
3388         const char *reason = NULL;
3389
3390         if (argc != 2) {
3391                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3392                 return -1;
3393         }
3394
3395         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3396         if (ret != 0) {
3397                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3398                 return ret;
3399         }
3400
3401         for(i=0;i<dbmap->num;i++){
3402                 const char *name;
3403
3404                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3405                 if(!strcmp(argv[0], name)){
3406                         talloc_free(discard_const(name));
3407                         break;
3408                 }
3409                 talloc_free(discard_const(name));
3410         }
3411         if (i == dbmap->num) {
3412                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3413                 talloc_free(tmp_ctx);
3414                 return -1;
3415         }
3416
3417         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3418                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3419         if (ret != 0) {
3420                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3421                                  argv[0]));
3422                 talloc_free(tmp_ctx);
3423                 return -1;
3424         }
3425         if (reason) {
3426                 uint32_t allow_unhealthy = 0;
3427
3428                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3429                                       "AllowUnhealthyDBRead",
3430                                       &allow_unhealthy);
3431
3432                 if (allow_unhealthy != 1) {
3433                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3434                                          argv[0], reason));
3435
3436                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3437                                          allow_unhealthy));
3438                         talloc_free(tmp_ctx);
3439                         return -1;
3440                 }
3441
3442                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3443                                      argv[0], argv[0]));
3444                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3445                                      "tunnable AllowUnhealthyDBRead = %u\n",
3446                                      allow_unhealthy));
3447         }
3448
3449         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3450         if (ctdb_db == NULL) {
3451                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3452                 talloc_free(tmp_ctx);
3453                 return -1;
3454         }
3455
3456
3457         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3458         if (ret == -1) {
3459                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3460                 talloc_free(tmp_ctx);
3461                 return -1;
3462         }
3463
3464
3465         bd = talloc_zero(tmp_ctx, struct backup_data);
3466         if (bd == NULL) {
3467                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3468                 talloc_free(tmp_ctx);
3469                 return -1;
3470         }
3471
3472         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3473         if (bd->records == NULL) {
3474                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3475                 talloc_free(tmp_ctx);
3476                 return -1;
3477         }
3478
3479         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3480         bd->records->db_id = ctdb_db->db_id;
3481         /* traverse the database collecting all records */
3482         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3483             bd->traverse_error) {
3484                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3485                 talloc_free(tmp_ctx);
3486                 return -1;              
3487         }
3488
3489         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3490
3491
3492         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3493         if (fh == -1) {
3494                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3495                 talloc_free(tmp_ctx);
3496                 return -1;
3497         }
3498
3499         dbhdr.version = DB_VERSION;
3500         dbhdr.timestamp = time(NULL);
3501         dbhdr.persistent = dbmap->dbs[i].persistent;
3502         dbhdr.size = bd->len;
3503         if (strlen(argv[0]) >= MAX_DB_NAME) {
3504                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3505                 goto done;
3506         }
3507         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3508         ret = write(fh, &dbhdr, sizeof(dbhdr));
3509         if (ret == -1) {
3510                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3511                 goto done;
3512         }
3513         ret = write(fh, bd->records, bd->len);
3514         if (ret == -1) {
3515                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3516                 goto done;
3517         }
3518
3519         status = 0;
3520 done:
3521         if (fh != -1) {
3522                 ret = close(fh);
3523                 if (ret == -1) {
3524                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3525                 }
3526         }
3527         talloc_free(tmp_ctx);
3528         return status;
3529 }
3530
3531 /*
3532  * restore a database from a file 
3533  */
3534 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3535 {
3536         int ret;
3537         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3538         TDB_DATA outdata;
3539         TDB_DATA data;
3540         struct db_file_header dbhdr;
3541         struct ctdb_db_context *ctdb_db;
3542         struct ctdb_node_map *nodemap=NULL;
3543         struct ctdb_vnn_map *vnnmap=NULL;
3544         int i, fh;
3545         struct ctdb_control_wipe_database w;
3546         uint32_t *nodes;
3547         uint32_t generation;
3548         struct tm *tm;
3549         char tbuf[100];
3550
3551         if (argc != 1) {
3552                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3553                 return -1;
3554         }
3555
3556         fh = open(argv[0], O_RDONLY);
3557         if (fh == -1) {
3558                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3559                 talloc_free(tmp_ctx);
3560                 return -1;
3561         }
3562
3563         read(fh, &dbhdr, sizeof(dbhdr));
3564         if (dbhdr.version != DB_VERSION) {
3565                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3566                 talloc_free(tmp_ctx);
3567                 return -1;
3568         }
3569
3570         outdata.dsize = dbhdr.size;
3571         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3572         if (outdata.dptr == NULL) {
3573                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3574                 close(fh);
3575                 talloc_free(tmp_ctx);
3576                 return -1;
3577         }               
3578         read(fh, outdata.dptr, outdata.dsize);
3579         close(fh);
3580
3581         tm = localtime(&dbhdr.timestamp);
3582         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3583         printf("Restoring database '%s' from backup @ %s\n",
3584                 dbhdr.name, tbuf);
3585
3586
3587         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3588         if (ctdb_db == NULL) {
3589                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3590                 talloc_free(tmp_ctx);
3591                 return -1;
3592         }
3593
3594         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3595         if (ret != 0) {
3596                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3597                 talloc_free(tmp_ctx);
3598                 return ret;
3599         }
3600
3601
3602         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3603         if (ret != 0) {
3604                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3605                 talloc_free(tmp_ctx);
3606                 return ret;
3607         }
3608
3609         /* freeze all nodes */
3610         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3611         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3612                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3613                                         nodes, i,
3614                                         TIMELIMIT(),
3615                                         false, tdb_null,
3616                                         NULL, NULL,
3617                                         NULL) != 0) {
3618                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3619                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3620                         talloc_free(tmp_ctx);
3621                         return -1;
3622                 }
3623         }
3624
3625         generation = vnnmap->generation;
3626         data.dptr = (void *)&generation;
3627         data.dsize = sizeof(generation);
3628
3629         /* start a cluster wide transaction */
3630         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3631         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3632                                         nodes, 0,
3633                                         TIMELIMIT(), false, data,
3634                                         NULL, NULL,
3635                                         NULL) != 0) {
3636                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3637                 return -1;
3638         }
3639
3640
3641         w.db_id = ctdb_db->db_id;
3642         w.transaction_id = generation;
3643
3644         data.dptr = (void *)&w;
3645         data.dsize = sizeof(w);
3646
3647         /* wipe all the remote databases. */
3648         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3649         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3650                                         nodes, 0,
3651                                         TIMELIMIT(), false, data,
3652                                         NULL, NULL,
3653                                         NULL) != 0) {
3654                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3655                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3656                 talloc_free(tmp_ctx);
3657                 return -1;
3658         }
3659         
3660         /* push the database */
3661         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3662         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3663                                         nodes, 0,
3664                                         TIMELIMIT(), false, outdata,
3665                                         NULL, NULL,
3666                                         NULL) != 0) {
3667                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3668                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3669                 talloc_free(tmp_ctx);
3670                 return -1;
3671         }
3672
3673         data.dptr = (void *)&ctdb_db->db_id;
3674         data.dsize = sizeof(ctdb_db->db_id);
3675
3676         /* mark the database as healthy */
3677         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3678         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3679                                         nodes, 0,
3680                                         TIMELIMIT(), false, data,
3681                                         NULL, NULL,
3682                                         NULL) != 0) {
3683                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3684                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3685                 talloc_free(tmp_ctx);
3686                 return -1;
3687         }
3688
3689         data.dptr = (void *)&generation;
3690         data.dsize = sizeof(generation);
3691
3692         /* commit all the changes */
3693         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3694                                         nodes, 0,
3695                                         TIMELIMIT(), false, data,
3696                                         NULL, NULL,
3697                                         NULL) != 0) {
3698                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3699                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3700                 talloc_free(tmp_ctx);
3701                 return -1;
3702         }
3703
3704
3705         /* thaw all nodes */
3706         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3707         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3708                                         nodes, 0,
3709                                         TIMELIMIT(),
3710                                         false, tdb_null,
3711                                         NULL, NULL,
3712                                         NULL) != 0) {
3713                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3714                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3715                 talloc_free(tmp_ctx);
3716                 return -1;
3717         }
3718
3719
3720         talloc_free(tmp_ctx);
3721         return 0;
3722 }
3723
3724 /*
3725  * dump a database backup from a file
3726  */
3727 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3728 {
3729         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3730         TDB_DATA outdata;
3731         struct db_file_header dbhdr;
3732         int i, fh;
3733         struct tm *tm;
3734         char tbuf[100];
3735         struct ctdb_rec_data *rec = NULL;
3736         struct ctdb_marshall_buffer *m;
3737
3738         if (argc != 1) {
3739                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3740                 return -1;
3741         }
3742
3743         fh = open(argv[0], O_RDONLY);
3744         if (fh == -1) {
3745                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3746                 talloc_free(tmp_ctx);
3747                 return -1;
3748         }
3749
3750         read(fh, &dbhdr, sizeof(dbhdr));
3751         if (dbhdr.version != DB_VERSION) {
3752                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3753                 talloc_free(tmp_ctx);
3754                 return -1;
3755         }
3756
3757         outdata.dsize = dbhdr.size;
3758         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3759         if (outdata.dptr == NULL) {
3760                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3761                 close(fh);
3762                 talloc_free(tmp_ctx);
3763                 return -1;
3764         }
3765         read(fh, outdata.dptr, outdata.dsize);
3766         close(fh);
3767         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3768
3769         tm = localtime(&dbhdr.timestamp);
3770         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3771         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3772                 dbhdr.name, m->db_id, tbuf);
3773
3774         for (i=0; i < m->count; i++) {
3775                 uint32_t reqid = 0;
3776                 TDB_DATA key, data;
3777
3778                 /* we do not want the header splitted, so we pass NULL*/
3779                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3780                                               NULL, &key, &data);
3781
3782                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3783         }
3784
3785         printf("Dumped %d records\n", i);
3786         talloc_free(tmp_ctx);
3787         return 0;
3788 }
3789
3790 /*
3791  * wipe a database from a file
3792  */
3793 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3794                           const char **argv)
3795 {
3796         int ret;
3797         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3798         TDB_DATA data;
3799         struct ctdb_db_context *ctdb_db;
3800         struct ctdb_node_map *nodemap = NULL;
3801         struct ctdb_vnn_map *vnnmap = NULL;
3802         int i;
3803         struct ctdb_control_wipe_database w;
3804         uint32_t *nodes;
3805         uint32_t generation;
3806         struct ctdb_dbid_map *dbmap = NULL;
3807
3808         if (argc != 1) {
3809                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3810                 return -1;
3811         }
3812
3813         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3814                                  &dbmap);
3815         if (ret != 0) {
3816                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3817                                   options.pnn));
3818                 return ret;
3819         }
3820
3821         for(i=0;i<dbmap->num;i++){
3822                 const char *name;
3823
3824                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3825                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3826                 if(!strcmp(argv[0], name)){
3827                         talloc_free(discard_const(name));
3828                         break;
3829                 }
3830                 talloc_free(discard_const(name));
3831         }
3832         if (i == dbmap->num) {
3833                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3834                                   argv[0]));
3835                 talloc_free(tmp_ctx);
3836                 return -1;
3837         }
3838
3839         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3840         if (ctdb_db == NULL) {
3841                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3842                                   argv[0]));
3843                 talloc_free(tmp_ctx);
3844                 return -1;
3845         }
3846
3847         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3848                                    &nodemap);
3849         if (ret != 0) {
3850                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3851                                   options.pnn));
3852                 talloc_free(tmp_ctx);
3853                 return ret;
3854         }
3855
3856         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3857                                   &vnnmap);
3858         if (ret != 0) {
3859                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3860                                   options.pnn));
3861                 talloc_free(tmp_ctx);
3862                 return ret;
3863         }
3864
3865         /* freeze all nodes */
3866         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3867         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3868                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3869                                                 nodes, i,
3870                                                 TIMELIMIT(),
3871                                                 false, tdb_null,
3872                                                 NULL, NULL,
3873                                                 NULL);
3874                 if (ret != 0) {
3875                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3876                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3877                                              CTDB_RECOVERY_ACTIVE);
3878                         talloc_free(tmp_ctx);
3879                         return -1;
3880                 }
3881         }
3882
3883         generation = vnnmap->generation;
3884         data.dptr = (void *)&generation;
3885         data.dsize = sizeof(generation);
3886
3887         /* start a cluster wide transaction */
3888         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3889         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3890                                         nodes, 0,
3891                                         TIMELIMIT(), false, data,
3892                                         NULL, NULL,
3893                                         NULL);
3894         if (ret!= 0) {
3895                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3896                                   "transactions.\n"));
3897                 return -1;
3898         }
3899
3900         w.db_id = ctdb_db->db_id;
3901         w.transaction_id = generation;
3902
3903         data.dptr = (void *)&w;
3904         data.dsize = sizeof(w);
3905
3906         /* wipe all the remote databases. */
3907         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3908         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3909                                         nodes, 0,
3910                                         TIMELIMIT(), false, data,
3911                                         NULL, NULL,
3912                                         NULL) != 0) {
3913                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3914                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3915                 talloc_free(tmp_ctx);
3916                 return -1;
3917         }
3918
3919         data.dptr = (void *)&ctdb_db->db_id;
3920         data.dsize = sizeof(ctdb_db->db_id);
3921
3922         /* mark the database as healthy */
3923         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3924         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3925                                         nodes, 0,
3926                                         TIMELIMIT(), false, data,
3927                                         NULL, NULL,
3928                                         NULL) != 0) {
3929                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3930                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3931                 talloc_free(tmp_ctx);
3932                 return -1;
3933         }
3934
3935         data.dptr = (void *)&generation;
3936         data.dsize = sizeof(generation);
3937
3938         /* commit all the changes */
3939         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3940                                         nodes, 0,
3941                                         TIMELIMIT(), false, data,
3942                                         NULL, NULL,
3943                                         NULL) != 0) {
3944                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3945                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3946                 talloc_free(tmp_ctx);
3947                 return -1;
3948         }
3949
3950         /* thaw all nodes */
3951         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3952         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3953                                         nodes, 0,
3954                                         TIMELIMIT(),
3955                                         false, tdb_null,
3956                                         NULL, NULL,
3957                                         NULL) != 0) {
3958                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3959                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3960                 talloc_free(tmp_ctx);
3961                 return -1;
3962         }
3963
3964         talloc_free(tmp_ctx);
3965         return 0;
3966 }
3967
3968 /*
3969  * set flags of a node in the nodemap
3970  */
3971 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3972 {
3973         int ret;
3974         int32_t status;
3975         int node;
3976         int flags;
3977         TDB_DATA data;
3978         struct ctdb_node_flag_change c;
3979
3980         if (argc != 2) {
3981                 usage();
3982                 return -1;
3983         }
3984
3985         if (sscanf(argv[0], "%d", &node) != 1) {
3986                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3987                 usage();
3988                 return -1;
3989         }
3990         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3991                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3992                 usage();
3993                 return -1;
3994         }
3995
3996         c.pnn       = node;
3997         c.old_flags = 0;
3998         c.new_flags = flags;
3999
4000         data.dsize = sizeof(c);
4001         data.dptr = (unsigned char *)&c;
4002
4003         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4004                            data, NULL, NULL, &status, NULL, NULL);
4005         if (ret != 0 || status != 0) {
4006                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4007                 return -1;
4008         }
4009         return 0;
4010 }
4011
4012 /*
4013   dump memory usage
4014  */
4015 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4016 {
4017         TDB_DATA data;
4018         int ret;
4019         int32_t res;
4020         char *errmsg;
4021         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4022         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4023                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4024         if (ret != 0 || res != 0) {
4025                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4026                 talloc_free(tmp_ctx);
4027                 return -1;
4028         }
4029         write(1, data.dptr, data.dsize);
4030         talloc_free(tmp_ctx);
4031         return 0;
4032 }
4033
4034 /*
4035   handler for memory dumps
4036 */
4037 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4038                              TDB_DATA data, void *private_data)
4039 {
4040         write(1, data.dptr, data.dsize);
4041         exit(0);
4042 }
4043
4044 /*
4045   dump memory usage on the recovery daemon
4046  */
4047 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4048 {
4049         int ret;
4050         TDB_DATA data;
4051         struct rd_memdump_reply rd;
4052
4053         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4054         if (rd.pnn == -1) {
4055                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4056                 return -1;
4057         }
4058         rd.srvid = getpid();
4059
4060         /* register a message port for receiveing the reply so that we
4061            can receive the reply
4062         */
4063         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4064
4065
4066         data.dptr = (uint8_t *)&rd;
4067         data.dsize = sizeof(rd);
4068
4069         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4070         if (ret != 0) {
4071                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4072                 return -1;
4073         }
4074
4075         /* this loop will terminate when we have received the reply */
4076         while (1) {     
4077                 event_loop_once(ctdb->ev);
4078         }
4079
4080         return 0;
4081 }
4082
4083 /*
4084   list all nodes in the cluster
4085   if the daemon is running, we read the data from the daemon.
4086   if the daemon is not running we parse the nodes file directly
4087  */
4088 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4089 {
4090         int i, ret;
4091         struct ctdb_node_map *nodemap=NULL;
4092
4093         if (ctdb != NULL) {
4094                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4095                 if (ret != 0) {
4096                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4097                         return ret;
4098                 }
4099
4100                 for(i=0;i<nodemap->num;i++){
4101                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4102                                 continue;
4103                         }
4104                         if (options.machinereadable){
4105                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4106                         } else {
4107                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4108                         }
4109                 }
4110         } else {
4111                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4112                 struct pnn_node *pnn_nodes;
4113                 struct pnn_node *pnn_node;
4114         
4115                 pnn_nodes = read_nodes_file(mem_ctx);
4116                 if (pnn_nodes == NULL) {
4117                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4118                         talloc_free(mem_ctx);
4119                         return -1;
4120                 }
4121
4122                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4123                         ctdb_sock_addr addr;
4124
4125                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4126                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4127                                 talloc_free(mem_ctx);
4128                                 return -1;
4129                         }
4130
4131                         if (options.machinereadable){
4132                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4133                         } else {
4134                                 printf("%s\n", pnn_node->addr);
4135                         }
4136                 }
4137                 talloc_free(mem_ctx);
4138         }
4139
4140         return 0;
4141 }
4142
4143 /*
4144   reload the nodes file on the local node
4145  */
4146 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4147 {
4148         int i, ret;
4149         int mypnn;
4150         struct ctdb_node_map *nodemap=NULL;
4151
4152         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4153         if (mypnn == -1) {
4154                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4155                 return -1;
4156         }
4157
4158         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4159         if (ret != 0) {
4160                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4161                 return ret;
4162         }
4163
4164         /* reload the nodes file on all remote nodes */
4165         for (i=0;i<nodemap->num;i++) {
4166                 if (nodemap->nodes[i].pnn == mypnn) {
4167                         continue;
4168                 }
4169                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4170                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4171                         nodemap->nodes[i].pnn);
4172                 if (ret != 0) {
4173                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4174                 }
4175         }
4176
4177         /* reload the nodes file on the local node */
4178         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4179         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4180         if (ret != 0) {
4181                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4182         }
4183
4184         /* initiate a recovery */
4185         control_recover(ctdb, argc, argv);
4186
4187         return 0;
4188 }
4189
4190
4191 static const struct {
4192         const char *name;
4193         int (*fn)(struct ctdb_context *, int, const char **);
4194         bool auto_all;
4195         bool without_daemon; /* can be run without daemon running ? */
4196         const char *msg;
4197         const char *args;
4198 } ctdb_commands[] = {
4199 #ifdef CTDB_VERS
4200         { "version",         control_version,           true,   false,  "show version of ctdb" },
4201 #endif
4202         { "status",          control_status,            true,   false,  "show node status" },
4203         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4204         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4205         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4206         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4207         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4208         { "statistics",      control_statistics,        false,  false, "show statistics" },
4209         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4210         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4211         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4212         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4213         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4214         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4215         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4216         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4217         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4218         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4219         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4220         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4221         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4222         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4223         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4224         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4225         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4226         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4227         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4228         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4229         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4230         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4231         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4232         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4233         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4234         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4235         { "stop",            control_stop,              true,   false,  "stop a node" },
4236         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4237         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4238         { "unban",           control_unban,             true,   false,  "unban a node" },
4239         { "showban",         control_showban,           true,   false,  "show ban information"},
4240         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4241         { "recover",         control_recover,           true,   false,  "force recovery" },
4242         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4243         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4244         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4245         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4246         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4247         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4248         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4249         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4250
4251         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4252         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4253         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4254         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4255         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4256         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4257         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4258         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4259         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4260         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4261         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4262         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4263         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4264         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4265         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4266         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4267         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4268         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4269         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4270         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4271         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4272         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4273         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4274         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4275         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4276         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4277         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4278         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4279         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4280         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4281 };
4282
4283 /*
4284   show usage message
4285  */
4286 static void usage(void)
4287 {
4288         int i;
4289         printf(
4290 "Usage: ctdb [options] <control>\n" \
4291 "Options:\n" \
4292 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4293 "   -Y                 generate machinereadable output\n"
4294 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4295         printf("Controls:\n");
4296         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4297                 printf("  %-15s %-27s  %s\n", 
4298                        ctdb_commands[i].name, 
4299                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4300                        ctdb_commands[i].msg);
4301         }
4302         exit(1);
4303 }
4304
4305
4306 static void ctdb_alarm(int sig)
4307 {
4308         printf("Maximum runtime exceeded - exiting\n");
4309         _exit(ERR_TIMEOUT);
4310 }
4311
4312 /*
4313   main program
4314 */
4315 int main(int argc, const char *argv[])
4316 {
4317         struct ctdb_context *ctdb;
4318         char *nodestring = NULL;
4319         struct poptOption popt_options[] = {
4320                 POPT_AUTOHELP
4321                 POPT_CTDB_CMDLINE
4322                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4323                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4324                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4325                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4326                 POPT_TABLEEND
4327         };
4328         int opt;
4329         const char **extra_argv;
4330         int extra_argc = 0;
4331         int ret=-1, i;
4332         poptContext pc;
4333         struct event_context *ev;
4334         const char *control;
4335
4336         setlinebuf(stdout);
4337         
4338         /* set some defaults */
4339         options.maxruntime = 0;
4340         options.timelimit = 3;
4341         options.pnn = CTDB_CURRENT_NODE;
4342
4343         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4344
4345         while ((opt = poptGetNextOpt(pc)) != -1) {
4346                 switch (opt) {
4347                 default:
4348                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4349                                 poptBadOption(pc, 0), poptStrerror(opt)));
4350                         exit(1);
4351                 }
4352         }
4353
4354         /* setup the remaining options for the main program to use */
4355         extra_argv = poptGetArgs(pc);
4356         if (extra_argv) {
4357                 extra_argv++;
4358                 while (extra_argv[extra_argc]) extra_argc++;
4359         }
4360
4361         if (extra_argc < 1) {
4362                 usage();
4363         }
4364
4365         if (options.maxruntime == 0) {
4366                 const char *ctdb_timeout;
4367                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4368                 if (ctdb_timeout != NULL) {
4369                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4370                 } else {
4371                         /* default timeout is 120 seconds */
4372                         options.maxruntime = 120;
4373                 }
4374         }
4375
4376         signal(SIGALRM, ctdb_alarm);
4377         alarm(options.maxruntime);
4378
4379         /* setup the node number to contact */
4380         if (nodestring != NULL) {
4381                 if (strcmp(nodestring, "all") == 0) {
4382                         options.pnn = CTDB_BROADCAST_ALL;
4383                 } else {
4384                         options.pnn = strtoul(nodestring, NULL, 0);
4385                 }
4386         }
4387
4388         control = extra_argv[0];
4389
4390         ev = event_context_init(NULL);
4391
4392         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4393                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4394                         int j;
4395
4396                         if (ctdb_commands[i].without_daemon == true) {
4397                                 close(2);
4398                         }
4399
4400                         /* initialise ctdb */
4401                         ctdb = ctdb_cmdline_client(ev);
4402
4403                         if (ctdb_commands[i].without_daemon == false) {
4404                                 if (ctdb == NULL) {
4405                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4406                                         exit(1);
4407                                 }
4408
4409                                 /* verify the node exists */
4410                                 verify_node(ctdb);
4411
4412                                 if (options.pnn == CTDB_CURRENT_NODE) {
4413                                         int pnn;
4414                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4415                                         if (pnn == -1) {
4416                                                 return -1;
4417                                         }
4418                                         options.pnn = pnn;
4419                                 }
4420                         }
4421
4422                         if (ctdb_commands[i].auto_all && 
4423                             options.pnn == CTDB_BROADCAST_ALL) {
4424                                 uint32_t *nodes;
4425                                 uint32_t num_nodes;
4426                                 ret = 0;
4427
4428                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4429                                 CTDB_NO_MEMORY(ctdb, nodes);
4430         
4431                                 for (j=0;j<num_nodes;j++) {
4432                                         options.pnn = nodes[j];
4433                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4434                                 }
4435                                 talloc_free(nodes);
4436                         } else {
4437                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4438                         }
4439                         break;
4440                 }
4441         }
4442
4443         if (i == ARRAY_SIZE(ctdb_commands)) {
4444                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4445                 exit(1);
4446         }
4447
4448         return ret;
4449 }