5c49dbaa77790eea9f6861c7f28fdef805aad392
[tridge/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
517                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE));
518                 }
519                 return 0;
520         }
521
522         printf("Number of nodes:%d\n", nodemap->num);
523         for(i=0;i<nodemap->num;i++){
524                 static const struct {
525                         uint32_t flag;
526                         const char *name;
527                 } flag_names[] = {
528                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
529                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
530                         { NODE_FLAGS_BANNED,                "BANNED" },
531                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
532                         { NODE_FLAGS_DELETED,               "DELETED" },
533                         { NODE_FLAGS_STOPPED,               "STOPPED" },
534                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
535                 };
536                 char *flags_str = NULL;
537                 int j;
538
539                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
540                         continue;
541                 }
542                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
543                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
544                                 if (flags_str == NULL) {
545                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
546                                 } else {
547                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
548                                                                            flag_names[j].name);
549                                 }
550                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
551                         }
552                 }
553                 if (flags_str == NULL) {
554                         flags_str = talloc_strdup(ctdb, "OK");
555                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
556                 }
557                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
558                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
559                        flags_str,
560                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
561                 talloc_free(flags_str);
562         }
563
564         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
565         if (ret != 0) {
566                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
567                 return ret;
568         }
569         if (vnnmap->generation == INVALID_GENERATION) {
570                 printf("Generation:INVALID\n");
571         } else {
572                 printf("Generation:%d\n",vnnmap->generation);
573         }
574         printf("Size:%d\n",vnnmap->size);
575         for(i=0;i<vnnmap->size;i++){
576                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
577         }
578
579         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
580         if (ret != 0) {
581                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
582                 return ret;
583         }
584         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
585
586         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
587         if (ret != 0) {
588                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
589                 return ret;
590         }
591         printf("Recovery master:%d\n",recmaster);
592
593         return 0;
594 }
595
596
597 struct natgw_node {
598         struct natgw_node *next;
599         const char *addr;
600 };
601
602 /*
603   display the list of nodes belonging to this natgw configuration
604  */
605 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
606 {
607         int i, ret;
608         const char *natgw_list;
609         int nlines;
610         char **lines;
611         struct natgw_node *natgw_nodes = NULL;
612         struct natgw_node *natgw_node;
613         struct ctdb_node_map *nodemap=NULL;
614
615
616         /* read the natgw nodes file into a linked list */
617         natgw_list = getenv("NATGW_NODES");
618         if (natgw_list == NULL) {
619                 natgw_list = "/etc/ctdb/natgw_nodes";
620         }
621         lines = file_lines_load(natgw_list, &nlines, ctdb);
622         if (lines == NULL) {
623                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
624                 return -1;
625         }
626         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
627                 nlines--;
628         }
629         for (i=0;i<nlines;i++) {
630                 char *node;
631
632                 node = lines[i];
633                 /* strip leading spaces */
634                 while((*node == ' ') || (*node == '\t')) {
635                         node++;
636                 }
637                 if (*node == '#') {
638                         continue;
639                 }
640                 if (strcmp(node, "") == 0) {
641                         continue;
642                 }
643                 natgw_node = talloc(ctdb, struct natgw_node);
644                 natgw_node->addr = talloc_strdup(natgw_node, node);
645                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
646                 natgw_node->next = natgw_nodes;
647                 natgw_nodes = natgw_node;
648         }
649
650         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
651         if (ret != 0) {
652                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
653                 return ret;
654         }
655
656         i=0;
657         while(i<nodemap->num) {
658                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
659                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
660                                 break;
661                         }
662                 }
663
664                 /* this node was not in the natgw so we just remove it from
665                  * the list
666                  */
667                 if ((natgw_node == NULL) 
668                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
669                         int j;
670
671                         for (j=i+1; j<nodemap->num; j++) {
672                                 nodemap->nodes[j-1] = nodemap->nodes[j];
673                         }
674                         nodemap->num--;
675                         continue;
676                 }
677
678                 i++;
679         }               
680
681         /* pick a node to be natgwmaster
682          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
683          */
684         for(i=0;i<nodemap->num;i++){
685                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
686                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
687                         break;
688                 }
689         }
690         /* we couldnt find any healthy node, try unhealthy ones */
691         if (i == nodemap->num) {
692                 for(i=0;i<nodemap->num;i++){
693                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
694                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
695                                 break;
696                         }
697                 }
698         }
699         /* unless all nodes are STOPPED, when we pick one anyway */
700         if (i == nodemap->num) {
701                 for(i=0;i<nodemap->num;i++){
702                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
703                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
704                                 break;
705                         }
706                 }
707                 /* or if we still can not find any */
708                 if (i == nodemap->num) {
709                         printf("-1 0.0.0.0\n");
710                 }
711         }
712
713         /* print the pruned list of nodes belonging to this natgw list */
714         for(i=0;i<nodemap->num;i++){
715                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
716                         continue;
717                 }
718                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
719                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
723                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
724                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
725         }
726
727         return 0;
728 }
729
730 /*
731   display the status of the scripts for monitoring (or other events)
732  */
733 static int control_one_scriptstatus(struct ctdb_context *ctdb,
734                                     enum ctdb_eventscript_call type)
735 {
736         struct ctdb_scripts_wire *script_status;
737         int ret, i;
738
739         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
740         if (ret != 0) {
741                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
742                 return ret;
743         }
744
745         if (script_status == NULL) {
746                 if (!options.machinereadable) {
747                         printf("%s cycle never run\n",
748                                ctdb_eventscript_call_names[type]);
749                 }
750                 return 0;
751         }
752
753         if (!options.machinereadable) {
754                 printf("%d scripts were executed last %s cycle\n",
755                        script_status->num_scripts,
756                        ctdb_eventscript_call_names[type]);
757         }
758         for (i=0; i<script_status->num_scripts; i++) {
759                 const char *status = NULL;
760
761                 switch (script_status->scripts[i].status) {
762                 case -ETIME:
763                         status = "TIMEDOUT";
764                         break;
765                 case -ENOEXEC:
766                         status = "DISABLED";
767                         break;
768                 case 0:
769                         status = "OK";
770                         break;
771                 default:
772                         if (script_status->scripts[i].status > 0)
773                                 status = "ERROR";
774                         break;
775                 }
776                 if (options.machinereadable) {
777                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
778                                ctdb_eventscript_call_names[type],
779                                script_status->scripts[i].name,
780                                script_status->scripts[i].status,
781                                status,
782                                (long)script_status->scripts[i].start.tv_sec,
783                                (long)script_status->scripts[i].start.tv_usec,
784                                (long)script_status->scripts[i].finished.tv_sec,
785                                (long)script_status->scripts[i].finished.tv_usec,
786                                script_status->scripts[i].output);
787                         continue;
788                 }
789                 if (status)
790                         printf("%-20s Status:%s    ",
791                                script_status->scripts[i].name, status);
792                 else
793                         /* Some other error, eg from stat. */
794                         printf("%-20s Status:CANNOT RUN (%s)",
795                                script_status->scripts[i].name,
796                                strerror(-script_status->scripts[i].status));
797
798                 if (script_status->scripts[i].status >= 0) {
799                         printf("Duration:%.3lf ",
800                         timeval_delta(&script_status->scripts[i].finished,
801                               &script_status->scripts[i].start));
802                 }
803                 if (script_status->scripts[i].status != -ENOEXEC) {
804                         printf("%s",
805                                ctime(&script_status->scripts[i].start.tv_sec));
806                         if (script_status->scripts[i].status != 0) {
807                                 printf("   OUTPUT:%s\n",
808                                        script_status->scripts[i].output);
809                         }
810                 } else {
811                         printf("\n");
812                 }
813         }
814         return 0;
815 }
816
817
818 static int control_scriptstatus(struct ctdb_context *ctdb,
819                                 int argc, const char **argv)
820 {
821         int ret;
822         enum ctdb_eventscript_call type, min, max;
823         const char *arg;
824
825         if (argc > 1) {
826                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
827                 return -1;
828         }
829
830         if (argc == 0)
831                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
832         else
833                 arg = argv[0];
834
835         for (type = 0; type < CTDB_EVENT_MAX; type++) {
836                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
837                         min = type;
838                         max = type+1;
839                         break;
840                 }
841         }
842         if (type == CTDB_EVENT_MAX) {
843                 if (strcmp(arg, "all") == 0) {
844                         min = 0;
845                         max = CTDB_EVENT_MAX;
846                 } else {
847                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
848                         return -1;
849                 }
850         }
851
852         if (options.machinereadable) {
853                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
854         }
855
856         for (type = min; type < max; type++) {
857                 ret = control_one_scriptstatus(ctdb, type);
858                 if (ret != 0) {
859                         return ret;
860                 }
861         }
862
863         return 0;
864 }
865
866 /*
867   enable an eventscript
868  */
869 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
870 {
871         int ret;
872
873         if (argc < 1) {
874                 usage();
875         }
876
877         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
878         if (ret != 0) {
879           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
880                 return ret;
881         }
882
883         return 0;
884 }
885
886 /*
887   disable an eventscript
888  */
889 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
890 {
891         int ret;
892
893         if (argc < 1) {
894                 usage();
895         }
896
897         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
898         if (ret != 0) {
899           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
900                 return ret;
901         }
902
903         return 0;
904 }
905
906 /*
907   display the pnn of the recovery master
908  */
909 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
910 {
911         int ret;
912         uint32_t recmaster;
913
914         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
915         if (ret != 0) {
916                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
917                 return ret;
918         }
919         printf("%d\n",recmaster);
920
921         return 0;
922 }
923
924 /*
925   get a list of all tickles for this pnn
926  */
927 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
928 {
929         struct ctdb_control_tcp_tickle_list *list;
930         ctdb_sock_addr addr;
931         int i, ret;
932
933         if (argc < 1) {
934                 usage();
935         }
936
937         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
938                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
939                 return -1;
940         }
941
942         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
943         if (ret == -1) {
944                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
945                 return -1;
946         }
947
948         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
949         printf("Num tickles:%u\n", list->tickles.num);
950         for (i=0;i<list->tickles.num;i++) {
951                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
952                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
953         }
954
955         talloc_free(list);
956         
957         return 0;
958 }
959
960
961
962 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
963 {
964         struct ctdb_all_public_ips *ips;
965         struct ctdb_public_ip ip;
966         int i, ret;
967         uint32_t *nodes;
968         uint32_t disable_time;
969         TDB_DATA data;
970         struct ctdb_node_map *nodemap=NULL;
971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
972
973         disable_time = 30;
974         data.dptr  = (uint8_t*)&disable_time;
975         data.dsize = sizeof(disable_time);
976         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
977         if (ret != 0) {
978                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
979                 return -1;
980         }
981
982
983
984         /* read the public ip list from the node */
985         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
986         if (ret != 0) {
987                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
988                 talloc_free(tmp_ctx);
989                 return -1;
990         }
991
992         for (i=0;i<ips->num;i++) {
993                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
994                         break;
995                 }
996         }
997         if (i==ips->num) {
998                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
999                         pnn, ctdb_addr_to_str(addr)));
1000                 talloc_free(tmp_ctx);
1001                 return -1;
1002         }
1003
1004         ip.pnn  = pnn;
1005         ip.addr = *addr;
1006
1007         data.dptr  = (uint8_t *)&ip;
1008         data.dsize = sizeof(ip);
1009
1010         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1013                 talloc_free(tmp_ctx);
1014                 return ret;
1015         }
1016
1017         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1018         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1019                                         nodes, 0,
1020                                         TIMELIMIT(),
1021                                         false, data,
1022                                         NULL, NULL,
1023                                         NULL);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036
1037         talloc_free(tmp_ctx);
1038         return 0;
1039 }
1040
1041 /*
1042   move/failover an ip address to a specific node
1043  */
1044 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1045 {
1046         uint32_t pnn;
1047         ctdb_sock_addr addr;
1048
1049         if (argc < 2) {
1050                 usage();
1051                 return -1;
1052         }
1053
1054         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1055                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1056                 return -1;
1057         }
1058
1059
1060         if (sscanf(argv[1], "%u", &pnn) != 1) {
1061                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1062                 return -1;
1063         }
1064
1065         if (move_ip(ctdb, &addr, pnn) != 0) {
1066                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1067                 return -1;
1068         }
1069
1070         return 0;
1071 }
1072
1073 void getips_store_callback(void *param, void *data)
1074 {
1075         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1076         struct ctdb_all_public_ips *ips = param;
1077         int i;
1078
1079         i = ips->num++;
1080         ips->ips[i].pnn  = node_ip->pnn;
1081         ips->ips[i].addr = node_ip->addr;
1082 }
1083
1084 void getips_count_callback(void *param, void *data)
1085 {
1086         uint32_t *count = param;
1087
1088         (*count)++;
1089 }
1090
1091 #define IP_KEYLEN       4
1092 static uint32_t *ip_key(ctdb_sock_addr *ip)
1093 {
1094         static uint32_t key[IP_KEYLEN];
1095
1096         bzero(key, sizeof(key));
1097
1098         switch (ip->sa.sa_family) {
1099         case AF_INET:
1100                 key[0]  = ip->ip.sin_addr.s_addr;
1101                 break;
1102         case AF_INET6:
1103                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1104                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1105                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1106                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1107                 break;
1108         default:
1109                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1110                 return key;
1111         }
1112
1113         return key;
1114 }
1115
1116 static void *add_ip_callback(void *parm, void *data)
1117 {
1118         return parm;
1119 }
1120
1121 static int
1122 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1123 {
1124         struct ctdb_all_public_ips *tmp_ips;
1125         struct ctdb_node_map *nodemap=NULL;
1126         trbt_tree_t *ip_tree;
1127         int i, j, len, ret;
1128         uint32_t count;
1129
1130         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1131         if (ret != 0) {
1132                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1133                 return ret;
1134         }
1135
1136         ip_tree = trbt_create(tmp_ctx, 0);
1137
1138         for(i=0;i<nodemap->num;i++){
1139                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1140                         continue;
1141                 }
1142                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1143                         continue;
1144                 }
1145
1146                 /* read the public ip list from this node */
1147                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1148                 if (ret != 0) {
1149                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1150                         return -1;
1151                 }
1152         
1153                 for (j=0; j<tmp_ips->num;j++) {
1154                         struct ctdb_public_ip *node_ip;
1155
1156                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1157                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1158                         node_ip->addr = tmp_ips->ips[j].addr;
1159
1160                         trbt_insertarray32_callback(ip_tree,
1161                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1162                                 add_ip_callback,
1163                                 node_ip);
1164                 }
1165                 talloc_free(tmp_ips);
1166         }
1167
1168         /* traverse */
1169         count = 0;
1170         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1171
1172         len = offsetof(struct ctdb_all_public_ips, ips) + 
1173                 count*sizeof(struct ctdb_public_ip);
1174         tmp_ips = talloc_zero_size(tmp_ctx, len);
1175         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1176
1177         *ips = tmp_ips;
1178
1179         return 0;
1180 }
1181
1182
1183 /* 
1184  * scans all other nodes and returns a pnn for another node that can host this 
1185  * ip address or -1
1186  */
1187 static int
1188 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1189 {
1190         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1191         struct ctdb_all_public_ips *ips;
1192         struct ctdb_node_map *nodemap=NULL;
1193         int i, j, ret;
1194
1195         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1196         if (ret != 0) {
1197                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1198                 talloc_free(tmp_ctx);
1199                 return ret;
1200         }
1201
1202         for(i=0;i<nodemap->num;i++){
1203                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1204                         continue;
1205                 }
1206                 if (nodemap->nodes[i].pnn == options.pnn) {
1207                         continue;
1208                 }
1209
1210                 /* read the public ip list from this node */
1211                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1212                 if (ret != 0) {
1213                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1214                         return -1;
1215                 }
1216
1217                 for (j=0;j<ips->num;j++) {
1218                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1219                                 talloc_free(tmp_ctx);
1220                                 return nodemap->nodes[i].pnn;
1221                         }
1222                 }
1223                 talloc_free(ips);
1224         }
1225
1226         talloc_free(tmp_ctx);
1227         return -1;
1228 }
1229
1230 /*
1231   add a public ip address to a node
1232  */
1233 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1234 {
1235         int i, ret;
1236         int len;
1237         uint32_t pnn;
1238         unsigned mask;
1239         ctdb_sock_addr addr;
1240         struct ctdb_control_ip_iface *pub;
1241         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1242         struct ctdb_all_public_ips *ips;
1243
1244
1245         if (argc != 2) {
1246                 talloc_free(tmp_ctx);
1247                 usage();
1248         }
1249
1250         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1251                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1252                 talloc_free(tmp_ctx);
1253                 return -1;
1254         }
1255
1256         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1257         if (ret != 0) {
1258                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1259                 talloc_free(tmp_ctx);
1260                 return ret;
1261         }
1262
1263
1264         /* check if some other node is already serving this ip, if not,
1265          * we will claim it
1266          */
1267         for (i=0;i<ips->num;i++) {
1268                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1269                         break;
1270                 }
1271         }
1272
1273         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1274         pub = talloc_size(tmp_ctx, len); 
1275         CTDB_NO_MEMORY(ctdb, pub);
1276
1277         pub->addr  = addr;
1278         pub->mask  = mask;
1279         pub->len   = strlen(argv[1])+1;
1280         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1281
1282         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1285                 talloc_free(tmp_ctx);
1286                 return ret;
1287         }
1288
1289         if (i == ips->num) {
1290                 /* no one has this ip so we claim it */
1291                 pnn  = options.pnn;
1292         } else {
1293                 pnn  = ips->ips[i].pnn;
1294         }
1295
1296         if (move_ip(ctdb, &addr, pnn) != 0) {
1297                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1298                 return -1;
1299         }
1300
1301         talloc_free(tmp_ctx);
1302         return 0;
1303 }
1304
1305 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1306
1307 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1308 {
1309         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1310         struct ctdb_node_map *nodemap=NULL;
1311         struct ctdb_all_public_ips *ips;
1312         int ret, i, j;
1313
1314         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1317                 return ret;
1318         }
1319
1320         /* remove it from the nodes that are not hosting the ip currently */
1321         for(i=0;i<nodemap->num;i++){
1322                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1323                         continue;
1324                 }
1325                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1326                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1327                         continue;
1328                 }
1329
1330                 for (j=0;j<ips->num;j++) {
1331                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1332                                 break;
1333                         }
1334                 }
1335                 if (j==ips->num) {
1336                         continue;
1337                 }
1338
1339                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1340                         continue;
1341                 }
1342
1343                 options.pnn = nodemap->nodes[i].pnn;
1344                 control_delip(ctdb, argc, argv);
1345         }
1346
1347
1348         /* remove it from every node (also the one hosting it) */
1349         for(i=0;i<nodemap->num;i++){
1350                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1351                         continue;
1352                 }
1353                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1354                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1355                         continue;
1356                 }
1357
1358                 for (j=0;j<ips->num;j++) {
1359                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1360                                 break;
1361                         }
1362                 }
1363                 if (j==ips->num) {
1364                         continue;
1365                 }
1366
1367                 options.pnn = nodemap->nodes[i].pnn;
1368                 control_delip(ctdb, argc, argv);
1369         }
1370
1371         talloc_free(tmp_ctx);
1372         return 0;
1373 }
1374         
1375 /*
1376   delete a public ip address from a node
1377  */
1378 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1379 {
1380         int i, ret;
1381         ctdb_sock_addr addr;
1382         struct ctdb_control_ip_iface pub;
1383         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1384         struct ctdb_all_public_ips *ips;
1385
1386         if (argc != 1) {
1387                 talloc_free(tmp_ctx);
1388                 usage();
1389         }
1390
1391         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1392                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1393                 return -1;
1394         }
1395
1396         if (options.pnn == CTDB_BROADCAST_ALL) {
1397                 return control_delip_all(ctdb, argc, argv, &addr);
1398         }
1399
1400         pub.addr  = addr;
1401         pub.mask  = 0;
1402         pub.len   = 0;
1403
1404         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1407                 talloc_free(tmp_ctx);
1408                 return ret;
1409         }
1410         
1411         for (i=0;i<ips->num;i++) {
1412                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1413                         break;
1414                 }
1415         }
1416
1417         if (i==ips->num) {
1418                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1419                         ctdb_addr_to_str(&addr)));
1420                 talloc_free(tmp_ctx);
1421                 return -1;
1422         }
1423
1424         if (ips->ips[i].pnn == options.pnn) {
1425                 ret = find_other_host_for_public_ip(ctdb, &addr);
1426                 if (ret != -1) {
1427                         if (move_ip(ctdb, &addr, ret) != 0) {
1428                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1429                                 return -1;
1430                         }
1431                 }
1432         }
1433
1434         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1437                 talloc_free(tmp_ctx);
1438                 return ret;
1439         }
1440
1441         talloc_free(tmp_ctx);
1442         return 0;
1443 }
1444
1445 /*
1446   kill a tcp connection
1447  */
1448 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1449 {
1450         int ret;
1451         struct ctdb_control_killtcp killtcp;
1452
1453         if (argc < 2) {
1454                 usage();
1455         }
1456
1457         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1458                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1459                 return -1;
1460         }
1461
1462         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1463                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1464                 return -1;
1465         }
1466
1467         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1468         if (ret != 0) {
1469                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1470                 return ret;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   send a gratious arp
1479  */
1480 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1481 {
1482         int ret;
1483         ctdb_sock_addr addr;
1484
1485         if (argc < 2) {
1486                 usage();
1487         }
1488
1489         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1490                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1491                 return -1;
1492         }
1493
1494         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1495         if (ret != 0) {
1496                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1497                 return ret;
1498         }
1499
1500         return 0;
1501 }
1502
1503 /*
1504   register a server id
1505  */
1506 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1507 {
1508         int ret;
1509         struct ctdb_server_id server_id;
1510
1511         if (argc < 3) {
1512                 usage();
1513         }
1514
1515         server_id.pnn       = strtoul(argv[0], NULL, 0);
1516         server_id.type      = strtoul(argv[1], NULL, 0);
1517         server_id.server_id = strtoul(argv[2], NULL, 0);
1518
1519         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1520         if (ret != 0) {
1521                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1522                 return ret;
1523         }
1524         return -1;
1525 }
1526
1527 /*
1528   unregister a server id
1529  */
1530 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1531 {
1532         int ret;
1533         struct ctdb_server_id server_id;
1534
1535         if (argc < 3) {
1536                 usage();
1537         }
1538
1539         server_id.pnn       = strtoul(argv[0], NULL, 0);
1540         server_id.type      = strtoul(argv[1], NULL, 0);
1541         server_id.server_id = strtoul(argv[2], NULL, 0);
1542
1543         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1544         if (ret != 0) {
1545                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1546                 return ret;
1547         }
1548         return -1;
1549 }
1550
1551 /*
1552   check if a server id exists
1553  */
1554 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1555 {
1556         uint32_t status;
1557         int ret;
1558         struct ctdb_server_id server_id;
1559
1560         if (argc < 3) {
1561                 usage();
1562         }
1563
1564         server_id.pnn       = strtoul(argv[0], NULL, 0);
1565         server_id.type      = strtoul(argv[1], NULL, 0);
1566         server_id.server_id = strtoul(argv[2], NULL, 0);
1567
1568         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1569         if (ret != 0) {
1570                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1571                 return ret;
1572         }
1573
1574         if (status) {
1575                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1576         } else {
1577                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1578         }
1579         return 0;
1580 }
1581
1582 /*
1583   get a list of all server ids that are registered on a node
1584  */
1585 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1586 {
1587         int i, ret;
1588         struct ctdb_server_id_list *server_ids;
1589
1590         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1593                 return ret;
1594         }
1595
1596         for (i=0; i<server_ids->num; i++) {
1597                 printf("Server id %d:%d:%d\n", 
1598                         server_ids->server_ids[i].pnn, 
1599                         server_ids->server_ids[i].type, 
1600                         server_ids->server_ids[i].server_id); 
1601         }
1602
1603         return -1;
1604 }
1605
1606 /*
1607   send a tcp tickle ack
1608  */
1609 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1610 {
1611         int ret;
1612         ctdb_sock_addr  src, dst;
1613
1614         if (argc < 2) {
1615                 usage();
1616         }
1617
1618         if (!parse_ip_port(argv[0], &src)) {
1619                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1620                 return -1;
1621         }
1622
1623         if (!parse_ip_port(argv[1], &dst)) {
1624                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1625                 return -1;
1626         }
1627
1628         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1629         if (ret==0) {
1630                 return 0;
1631         }
1632         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1633
1634         return -1;
1635 }
1636
1637
1638 /*
1639   display public ip status
1640  */
1641 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1642 {
1643         int i, ret;
1644         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1645         struct ctdb_all_public_ips *ips;
1646
1647         if (options.pnn == CTDB_BROADCAST_ALL) {
1648                 /* read the list of public ips from all nodes */
1649                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1650         } else {
1651                 /* read the public ip list from this node */
1652                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1653         }
1654         if (ret != 0) {
1655                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1656                 talloc_free(tmp_ctx);
1657                 return ret;
1658         }
1659
1660         if (options.machinereadable){
1661                 printf(":Public IP:Node:\n");
1662         } else {
1663                 if (options.pnn == CTDB_BROADCAST_ALL) {
1664                         printf("Public IPs on ALL nodes\n");
1665                 } else {
1666                         printf("Public IPs on node %u\n", options.pnn);
1667                 }
1668         }
1669
1670         for (i=1;i<=ips->num;i++) {
1671                 if (options.machinereadable){
1672                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1673                 } else {
1674                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1675                 }
1676         }
1677
1678         talloc_free(tmp_ctx);
1679         return 0;
1680 }
1681
1682 /*
1683   public ip info
1684  */
1685 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1686 {
1687         int i, ret;
1688         ctdb_sock_addr addr;
1689         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1690         struct ctdb_control_public_ip_info *info;
1691
1692         if (argc != 1) {
1693                 talloc_free(tmp_ctx);
1694                 usage();
1695         }
1696
1697         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1698                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1699                 return -1;
1700         }
1701
1702         /* read the public ip info from this node */
1703         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1704                                            tmp_ctx, &addr, &info);
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1707                                   argv[0], options.pnn));
1708                 talloc_free(tmp_ctx);
1709                 return ret;
1710         }
1711
1712         printf("Public IP[%s] info on node %u\n",
1713                ctdb_addr_to_str(&info->ip.addr),
1714                options.pnn);
1715
1716         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1717                ctdb_addr_to_str(&info->ip.addr),
1718                info->ip.pnn, info->num);
1719
1720         for (i=0; i<info->num; i++) {
1721                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1722
1723                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1724                        i+1, info->ifaces[i].name,
1725                        info->ifaces[i].link_state?"up":"down",
1726                        (unsigned int)info->ifaces[i].references,
1727                        (i==info->active_idx)?" (active)":"");
1728         }
1729
1730         talloc_free(tmp_ctx);
1731         return 0;
1732 }
1733
1734 /*
1735   display interfaces status
1736  */
1737 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1738 {
1739         int i, ret;
1740         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1741         struct ctdb_control_get_ifaces *ifaces;
1742
1743         /* read the public ip list from this node */
1744         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1745                                    tmp_ctx, &ifaces);
1746         if (ret != 0) {
1747                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1748                                   options.pnn));
1749                 talloc_free(tmp_ctx);
1750                 return ret;
1751         }
1752
1753         if (options.machinereadable){
1754                 printf(":Name:LinkStatus:References:\n");
1755         } else {
1756                 printf("Interfaces on node %u\n", options.pnn);
1757         }
1758
1759         for (i=0; i<ifaces->num; i++) {
1760                 if (options.machinereadable){
1761                         printf(":%s:%s:%u\n",
1762                                ifaces->ifaces[i].name,
1763                                ifaces->ifaces[i].link_state?"1":"0",
1764                                (unsigned int)ifaces->ifaces[i].references);
1765                 } else {
1766                         printf("name:%s link:%s references:%u\n",
1767                                ifaces->ifaces[i].name,
1768                                ifaces->ifaces[i].link_state?"up":"down",
1769                                (unsigned int)ifaces->ifaces[i].references);
1770                 }
1771         }
1772
1773         talloc_free(tmp_ctx);
1774         return 0;
1775 }
1776
1777
1778 /*
1779   set link status of an interface
1780  */
1781 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1782 {
1783         int ret;
1784         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1785         struct ctdb_control_iface_info info;
1786
1787         ZERO_STRUCT(info);
1788
1789         if (argc != 2) {
1790                 usage();
1791         }
1792
1793         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1794                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1795                                   argv[0]));
1796                 talloc_free(tmp_ctx);
1797                 return -1;
1798         }
1799         strcpy(info.name, argv[0]);
1800
1801         if (strcmp(argv[1], "up") == 0) {
1802                 info.link_state = 1;
1803         } else if (strcmp(argv[1], "down") == 0) {
1804                 info.link_state = 0;
1805         } else {
1806                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1807                                   argv[1]));
1808                 talloc_free(tmp_ctx);
1809                 return -1;
1810         }
1811
1812         /* read the public ip list from this node */
1813         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1814                                    tmp_ctx, &info);
1815         if (ret != 0) {
1816                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1817                                   argv[0], options.pnn));
1818                 talloc_free(tmp_ctx);
1819                 return ret;
1820         }
1821
1822         talloc_free(tmp_ctx);
1823         return 0;
1824 }
1825
1826 /*
1827   display pid of a ctdb daemon
1828  */
1829 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1830 {
1831         uint32_t pid;
1832         int ret;
1833
1834         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1837                 return ret;
1838         }
1839         printf("Pid:%d\n", pid);
1840
1841         return 0;
1842 }
1843
1844 /*
1845   handler for receiving the response to ipreallocate
1846 */
1847 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1848                              TDB_DATA data, void *private_data)
1849 {
1850         exit(0);
1851 }
1852
1853 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1854 {
1855         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1856
1857         event_add_timed(ctdb->ev, ctdb, 
1858                                 timeval_current_ofs(1, 0),
1859                                 ctdb_every_second, ctdb);
1860 }
1861
1862 /*
1863   ask the recovery daemon on the recovery master to perform a ip reallocation
1864  */
1865 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1866 {
1867         int i, ret;
1868         TDB_DATA data;
1869         struct takeover_run_reply rd;
1870         uint32_t recmaster;
1871         struct ctdb_node_map *nodemap=NULL;
1872         int retries=0;
1873         struct timeval tv = timeval_current();
1874
1875         /* we need some events to trigger so we can timeout and restart
1876            the loop
1877         */
1878         event_add_timed(ctdb->ev, ctdb, 
1879                                 timeval_current_ofs(1, 0),
1880                                 ctdb_every_second, ctdb);
1881
1882         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1883         if (rd.pnn == -1) {
1884                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1885                 return -1;
1886         }
1887         rd.srvid = getpid();
1888
1889         /* register a message port for receiveing the reply so that we
1890            can receive the reply
1891         */
1892         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1893
1894         data.dptr = (uint8_t *)&rd;
1895         data.dsize = sizeof(rd);
1896
1897 again:
1898         if (retries>5) {
1899                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1900                 exit(10);
1901         }
1902
1903         /* check that there are valid nodes available */
1904         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1905                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1906                 exit(10);
1907         }
1908         for (i=0; i<nodemap->num;i++) {
1909                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1910                         break;
1911                 }
1912         }
1913         if (i==nodemap->num) {
1914                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1915                 return 0;
1916         }
1917
1918
1919         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1920         if (ret != 0) {
1921                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1922                 return ret;
1923         }
1924
1925         /* verify the node exists */
1926         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1927                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1928                 exit(10);
1929         }
1930
1931
1932         /* check tha there are nodes available that can act as a recmaster */
1933         for (i=0; i<nodemap->num; i++) {
1934                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1935                         continue;
1936                 }
1937                 break;
1938         }
1939         if (i == nodemap->num) {
1940                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1941                 return 0;
1942         }
1943
1944         /* verify the recovery master is not STOPPED, nor BANNED */
1945         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1946                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1947                 retries++;
1948                 sleep(1);
1949                 goto again;
1950         } 
1951
1952         
1953         /* verify the recovery master is not STOPPED, nor BANNED */
1954         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1955                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1956                 retries++;
1957                 sleep(1);
1958                 goto again;
1959         } 
1960
1961         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1962         if (ret != 0) {
1963                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1964                 return -1;
1965         }
1966
1967         tv = timeval_current();
1968         /* this loop will terminate when we have received the reply */
1969         while (timeval_elapsed(&tv) < 3.0) {    
1970                 event_loop_once(ctdb->ev);
1971         }
1972
1973         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1974         retries++;
1975         sleep(1);
1976         goto again;
1977
1978         return 0;
1979 }
1980
1981
1982 /*
1983   disable a remote node
1984  */
1985 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1986 {
1987         int ret;
1988         struct ctdb_node_map *nodemap=NULL;
1989
1990         do {
1991                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1992                 if (ret != 0) {
1993                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1994                         return ret;
1995                 }
1996
1997                 sleep(1);
1998
1999                 /* read the nodemap and verify the change took effect */
2000                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2001                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2002                         exit(10);
2003                 }
2004
2005         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2006         ret = control_ipreallocate(ctdb, argc, argv);
2007         if (ret != 0) {
2008                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2009                 return ret;
2010         }
2011
2012         return 0;
2013 }
2014
2015 /*
2016   enable a disabled remote node
2017  */
2018 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2019 {
2020         int ret;
2021
2022         struct ctdb_node_map *nodemap=NULL;
2023
2024         do {
2025                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2026                 if (ret != 0) {
2027                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2028                         return ret;
2029                 }
2030
2031                 sleep(1);
2032
2033                 /* read the nodemap and verify the change took effect */
2034                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2035                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2036                         exit(10);
2037                 }
2038
2039         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2040         ret = control_ipreallocate(ctdb, argc, argv);
2041         if (ret != 0) {
2042                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2043                 return ret;
2044         }
2045
2046         return 0;
2047 }
2048
2049 /*
2050   stop a remote node
2051  */
2052 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2053 {
2054         int ret;
2055         struct ctdb_node_map *nodemap=NULL;
2056
2057         do {
2058                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2059                 if (ret != 0) {
2060                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2061                 }
2062         
2063                 sleep(1);
2064
2065                 /* read the nodemap and verify the change took effect */
2066                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2067                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2068                         exit(10);
2069                 }
2070
2071         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2072         ret = control_ipreallocate(ctdb, argc, argv);
2073         if (ret != 0) {
2074                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2075                 return ret;
2076         }
2077
2078         return 0;
2079 }
2080
2081 /*
2082   restart a stopped remote node
2083  */
2084 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2085 {
2086         int ret;
2087
2088         struct ctdb_node_map *nodemap=NULL;
2089
2090         do {
2091                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2092                 if (ret != 0) {
2093                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2094                         return ret;
2095                 }
2096         
2097                 sleep(1);
2098
2099                 /* read the nodemap and verify the change took effect */
2100                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2101                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2102                         exit(10);
2103                 }
2104
2105         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2106         ret = control_ipreallocate(ctdb, argc, argv);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2109                 return ret;
2110         }
2111
2112         return 0;
2113 }
2114
2115 static uint32_t get_generation(struct ctdb_context *ctdb)
2116 {
2117         struct ctdb_vnn_map *vnnmap=NULL;
2118         int ret;
2119
2120         /* wait until the recmaster is not in recovery mode */
2121         while (1) {
2122                 uint32_t recmode, recmaster;
2123                 
2124                 if (vnnmap != NULL) {
2125                         talloc_free(vnnmap);
2126                         vnnmap = NULL;
2127                 }
2128
2129                 /* get the recmaster */
2130                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2131                 if (ret != 0) {
2132                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2133                         exit(10);
2134                 }
2135
2136                 /* get recovery mode */
2137                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2138                 if (ret != 0) {
2139                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2140                         exit(10);
2141                 }
2142
2143                 /* get the current generation number */
2144                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2145                 if (ret != 0) {
2146                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2147                         exit(10);
2148                 }
2149
2150                 if ((recmode == CTDB_RECOVERY_NORMAL)
2151                 &&  (vnnmap->generation != 1)){
2152                         return vnnmap->generation;
2153                 }
2154                 sleep(1);
2155         }
2156 }
2157
2158 /*
2159   ban a node from the cluster
2160  */
2161 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2162 {
2163         int ret;
2164         struct ctdb_node_map *nodemap=NULL;
2165         struct ctdb_ban_time bantime;
2166
2167         if (argc < 1) {
2168                 usage();
2169         }
2170         
2171         /* verify the node exists */
2172         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2173         if (ret != 0) {
2174                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2175                 return ret;
2176         }
2177
2178         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2179                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2180                 return -1;
2181         }
2182
2183         bantime.pnn  = options.pnn;
2184         bantime.time = strtoul(argv[0], NULL, 0);
2185
2186         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2187         if (ret != 0) {
2188                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2189                 return -1;
2190         }       
2191
2192         ret = control_ipreallocate(ctdb, argc, argv);
2193         if (ret != 0) {
2194                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2195                 return ret;
2196         }
2197
2198         return 0;
2199 }
2200
2201
2202 /*
2203   unban a node from the cluster
2204  */
2205 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2206 {
2207         int ret;
2208         struct ctdb_node_map *nodemap=NULL;
2209         struct ctdb_ban_time bantime;
2210
2211         /* verify the node exists */
2212         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2213         if (ret != 0) {
2214                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2215                 return ret;
2216         }
2217
2218         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2219                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2220                 return -1;
2221         }
2222
2223         bantime.pnn  = options.pnn;
2224         bantime.time = 0;
2225
2226         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2227         if (ret != 0) {
2228                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2229                 return -1;
2230         }       
2231
2232         ret = control_ipreallocate(ctdb, argc, argv);
2233         if (ret != 0) {
2234                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2235                 return ret;
2236         }
2237
2238         return 0;
2239 }
2240
2241
2242 /*
2243   show ban information for a node
2244  */
2245 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2246 {
2247         int ret;
2248         struct ctdb_node_map *nodemap=NULL;
2249         struct ctdb_ban_time *bantime;
2250
2251         /* verify the node exists */
2252         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2253         if (ret != 0) {
2254                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2255                 return ret;
2256         }
2257
2258         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2259         if (ret != 0) {
2260                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2261                 return -1;
2262         }       
2263
2264         if (bantime->time == 0) {
2265                 printf("Node %u is not banned\n", bantime->pnn);
2266         } else {
2267                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2268         }
2269
2270         return 0;
2271 }
2272
2273 /*
2274   shutdown a daemon
2275  */
2276 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2277 {
2278         int ret;
2279
2280         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2283                 return ret;
2284         }
2285
2286         return 0;
2287 }
2288
2289 /*
2290   trigger a recovery
2291  */
2292 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2293 {
2294         int ret;
2295         uint32_t generation, next_generation;
2296
2297         /* record the current generation number */
2298         generation = get_generation(ctdb);
2299
2300         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2301         if (ret != 0) {
2302                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2303                 return ret;
2304         }
2305
2306         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2307         if (ret != 0) {
2308                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2309                 return ret;
2310         }
2311
2312         /* wait until we are in a new generation */
2313         while (1) {
2314                 next_generation = get_generation(ctdb);
2315                 if (next_generation != generation) {
2316                         return 0;
2317                 }
2318                 sleep(1);
2319         }
2320
2321         return 0;
2322 }
2323
2324
2325 /*
2326   display monitoring mode of a remote node
2327  */
2328 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2329 {
2330         uint32_t monmode;
2331         int ret;
2332
2333         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2334         if (ret != 0) {
2335                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2336                 return ret;
2337         }
2338         if (!options.machinereadable){
2339                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2340         } else {
2341                 printf(":mode:\n");
2342                 printf(":%d:\n",monmode);
2343         }
2344         return 0;
2345 }
2346
2347
2348 /*
2349   display capabilities of a remote node
2350  */
2351 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2352 {
2353         uint32_t capabilities;
2354         int ret;
2355
2356         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2357         if (ret != 0) {
2358                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2359                 return ret;
2360         }
2361         
2362         if (!options.machinereadable){
2363                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2364                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2365                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2366                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2367         } else {
2368                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2369                 printf(":%d:%d:%d:%d:\n",
2370                         !!(capabilities&CTDB_CAP_RECMASTER),
2371                         !!(capabilities&CTDB_CAP_LMASTER),
2372                         !!(capabilities&CTDB_CAP_LVS),
2373                         !!(capabilities&CTDB_CAP_NATGW));
2374         }
2375         return 0;
2376 }
2377
2378 /*
2379   display lvs configuration
2380  */
2381 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2382 {
2383         uint32_t *capabilities;
2384         struct ctdb_node_map *nodemap=NULL;
2385         int i, ret;
2386         int healthy_count = 0;
2387
2388         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2389         if (ret != 0) {
2390                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2391                 return ret;
2392         }
2393
2394         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2395         CTDB_NO_MEMORY(ctdb, capabilities);
2396         
2397         /* collect capabilities for all connected nodes */
2398         for (i=0; i<nodemap->num; i++) {
2399                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2400                         continue;
2401                 }
2402                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2403                         continue;
2404                 }
2405         
2406                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2407                 if (ret != 0) {
2408                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2409                         return ret;
2410                 }
2411
2412                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2413                         continue;
2414                 }
2415
2416                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2417                         healthy_count++;
2418                 }
2419         }
2420
2421         /* Print all LVS nodes */
2422         for (i=0; i<nodemap->num; i++) {
2423                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2424                         continue;
2425                 }
2426                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2427                         continue;
2428                 }
2429                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2430                         continue;
2431                 }
2432
2433                 if (healthy_count != 0) {
2434                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2435                                 continue;
2436                         }
2437                 }
2438
2439                 printf("%d:%s\n", i, 
2440                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2441         }
2442
2443         return 0;
2444 }
2445
2446 /*
2447   display who is the lvs master
2448  */
2449 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2450 {
2451         uint32_t *capabilities;
2452         struct ctdb_node_map *nodemap=NULL;
2453         int i, ret;
2454         int healthy_count = 0;
2455
2456         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2459                 return ret;
2460         }
2461
2462         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2463         CTDB_NO_MEMORY(ctdb, capabilities);
2464         
2465         /* collect capabilities for all connected nodes */
2466         for (i=0; i<nodemap->num; i++) {
2467                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2468                         continue;
2469                 }
2470                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2471                         continue;
2472                 }
2473         
2474                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2475                 if (ret != 0) {
2476                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2477                         return ret;
2478                 }
2479
2480                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2481                         continue;
2482                 }
2483
2484                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2485                         healthy_count++;
2486                 }
2487         }
2488
2489         /* find and show the lvsmaster */
2490         for (i=0; i<nodemap->num; i++) {
2491                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2492                         continue;
2493                 }
2494                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2495                         continue;
2496                 }
2497                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2498                         continue;
2499                 }
2500
2501                 if (healthy_count != 0) {
2502                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2503                                 continue;
2504                         }
2505                 }
2506
2507                 if (options.machinereadable){
2508                         printf("%d\n", i);
2509                 } else {
2510                         printf("Node %d is LVS master\n", i);
2511                 }
2512                 return 0;
2513         }
2514
2515         printf("There is no LVS master\n");
2516         return -1;
2517 }
2518
2519 /*
2520   disable monitoring on a  node
2521  */
2522 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2523 {
2524         
2525         int ret;
2526
2527         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2528         if (ret != 0) {
2529                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2530                 return ret;
2531         }
2532         printf("Monitoring mode:%s\n","DISABLED");
2533
2534         return 0;
2535 }
2536
2537 /*
2538   enable monitoring on a  node
2539  */
2540 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2541 {
2542         
2543         int ret;
2544
2545         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2546         if (ret != 0) {
2547                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2548                 return ret;
2549         }
2550         printf("Monitoring mode:%s\n","ACTIVE");
2551
2552         return 0;
2553 }
2554
2555 /*
2556   display remote list of keys/data for a db
2557  */
2558 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2559 {
2560         const char *db_name;
2561         struct ctdb_db_context *ctdb_db;
2562         int ret;
2563
2564         if (argc < 1) {
2565                 usage();
2566         }
2567
2568         db_name = argv[0];
2569
2570
2571         if (db_exists(ctdb, db_name)) {
2572                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2573                 return -1;
2574         }
2575
2576         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2577
2578         if (ctdb_db == NULL) {
2579                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2580                 return -1;
2581         }
2582
2583         /* traverse and dump the cluster tdb */
2584         ret = ctdb_dump_db(ctdb_db, stdout);
2585         if (ret == -1) {
2586                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2587                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2588                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2589                                   db_name));
2590                 return -1;
2591         }
2592         talloc_free(ctdb_db);
2593
2594         printf("Dumped %d records\n", ret);
2595         return 0;
2596 }
2597
2598
2599 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2600                              TDB_DATA data, void *private_data)
2601 {
2602         DEBUG(DEBUG_ERR,("Log data received\n"));
2603         if (data.dsize > 0) {
2604                 printf("%s", data.dptr);
2605         }
2606
2607         exit(0);
2608 }
2609
2610 /*
2611   display a list of log messages from the in memory ringbuffer
2612  */
2613 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2614 {
2615         int ret;
2616         int32_t res;
2617         struct ctdb_get_log_addr log_addr;
2618         TDB_DATA data;
2619         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2620         char *errmsg;
2621         struct timeval tv;
2622
2623         if (argc != 1) {
2624                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2625                 talloc_free(tmp_ctx);
2626                 return -1;
2627         }
2628
2629         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2630         log_addr.srvid = getpid();
2631         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2632                 log_addr.level = get_debug_by_desc(argv[0]);
2633         } else {
2634                 log_addr.level = strtol(argv[0], NULL, 0);
2635         }
2636
2637
2638         data.dptr = (unsigned char *)&log_addr;
2639         data.dsize = sizeof(log_addr);
2640
2641         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2642
2643         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2644         sleep(1);
2645
2646         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2647
2648         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2649                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2650         if (ret != 0 || res != 0) {
2651                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2652                 talloc_free(tmp_ctx);
2653                 return -1;
2654         }
2655
2656
2657         tv = timeval_current();
2658         /* this loop will terminate when we have received the reply */
2659         while (timeval_elapsed(&tv) < 3.0) {    
2660                 event_loop_once(ctdb->ev);
2661         }
2662
2663         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2664
2665         talloc_free(tmp_ctx);
2666         return 0;
2667 }
2668
2669 /*
2670   clear the in memory log area
2671  */
2672 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2673 {
2674         int ret;
2675         int32_t res;
2676         char *errmsg;
2677         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2678
2679         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2680                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2681         if (ret != 0 || res != 0) {
2682                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2683                 talloc_free(tmp_ctx);
2684                 return -1;
2685         }
2686
2687         talloc_free(tmp_ctx);
2688         return 0;
2689 }
2690
2691
2692
2693 /*
2694   display a list of the databases on a remote ctdb
2695  */
2696 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2697 {
2698         int i, ret;
2699         struct ctdb_dbid_map *dbmap=NULL;
2700
2701         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2702         if (ret != 0) {
2703                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2704                 return ret;
2705         }
2706
2707         if(options.machinereadable){
2708                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2709                 for(i=0;i<dbmap->num;i++){
2710                         const char *path;
2711                         const char *name;
2712                         const char *health;
2713                         bool persistent;
2714
2715                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2716                                             dbmap->dbs[i].dbid, ctdb, &path);
2717                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2718                                             dbmap->dbs[i].dbid, ctdb, &name);
2719                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2720                                               dbmap->dbs[i].dbid, ctdb, &health);
2721                         persistent = dbmap->dbs[i].persistent;
2722                         printf(":0x%08X:%s:%s:%d:%d:\n",
2723                                dbmap->dbs[i].dbid, name, path,
2724                                !!(persistent), !!(health));
2725                 }
2726                 return 0;
2727         }
2728
2729         printf("Number of databases:%d\n", dbmap->num);
2730         for(i=0;i<dbmap->num;i++){
2731                 const char *path;
2732                 const char *name;
2733                 const char *health;
2734                 bool persistent;
2735
2736                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2737                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2738                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2739                 persistent = dbmap->dbs[i].persistent;
2740                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2741                        dbmap->dbs[i].dbid, name, path,
2742                        persistent?" PERSISTENT":"",
2743                        health?" UNHEALTHY":"");
2744         }
2745
2746         return 0;
2747 }
2748
2749 /*
2750   display the status of a database on a remote ctdb
2751  */
2752 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2753 {
2754         int i, ret;
2755         struct ctdb_dbid_map *dbmap=NULL;
2756         const char *db_name;
2757
2758         if (argc < 1) {
2759                 usage();
2760         }
2761
2762         db_name = argv[0];
2763
2764         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2765         if (ret != 0) {
2766                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2767                 return ret;
2768         }
2769
2770         for(i=0;i<dbmap->num;i++){
2771                 const char *path;
2772                 const char *name;
2773                 const char *health;
2774                 bool persistent;
2775
2776                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2777                 if (strcmp(name, db_name) != 0) {
2778                         continue;
2779                 }
2780
2781                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2782                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2783                 persistent = dbmap->dbs[i].persistent;
2784                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2785                        dbmap->dbs[i].dbid, name, path,
2786                        persistent?"yes":"no",
2787                        health?health:"OK");
2788                 return 0;
2789         }
2790
2791         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2792         return 0;
2793 }
2794
2795 /*
2796   check if the local node is recmaster or not
2797   it will return 1 if this node is the recmaster and 0 if it is not
2798   or if the local ctdb daemon could not be contacted
2799  */
2800 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2801 {
2802         uint32_t mypnn, recmaster;
2803         int ret;
2804
2805         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2806         if (mypnn == -1) {
2807                 printf("Failed to get pnn of node\n");
2808                 return 1;
2809         }
2810
2811         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2812         if (ret != 0) {
2813                 printf("Failed to get the recmaster\n");
2814                 return 1;
2815         }
2816
2817         if (recmaster != mypnn) {
2818                 printf("this node is not the recmaster\n");
2819                 return 1;
2820         }
2821
2822         printf("this node is the recmaster\n");
2823         return 0;
2824 }
2825
2826 /*
2827   ping a node
2828  */
2829 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2830 {
2831         int ret;
2832         struct timeval tv = timeval_current();
2833         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2834         if (ret == -1) {
2835                 printf("Unable to get ping response from node %u\n", options.pnn);
2836                 return -1;
2837         } else {
2838                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2839                        options.pnn, timeval_elapsed(&tv), ret);
2840         }
2841         return 0;
2842 }
2843
2844
2845 /*
2846   get a tunable
2847  */
2848 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2849 {
2850         const char *name;
2851         uint32_t value;
2852         int ret;
2853
2854         if (argc < 1) {
2855                 usage();
2856         }
2857
2858         name = argv[0];
2859         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2860         if (ret == -1) {
2861                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2862                 return -1;
2863         }
2864
2865         printf("%-19s = %u\n", name, value);
2866         return 0;
2867 }
2868
2869 /*
2870   set a tunable
2871  */
2872 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2873 {
2874         const char *name;
2875         uint32_t value;
2876         int ret;
2877
2878         if (argc < 2) {
2879                 usage();
2880         }
2881
2882         name = argv[0];
2883         value = strtoul(argv[1], NULL, 0);
2884
2885         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2886         if (ret == -1) {
2887                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2888                 return -1;
2889         }
2890         return 0;
2891 }
2892
2893 /*
2894   list all tunables
2895  */
2896 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2897 {
2898         uint32_t count;
2899         const char **list;
2900         int ret, i;
2901
2902         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2903         if (ret == -1) {
2904                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2905                 return -1;
2906         }
2907
2908         for (i=0;i<count;i++) {
2909                 control_getvar(ctdb, 1, &list[i]);
2910         }
2911
2912         talloc_free(list);
2913         
2914         return 0;
2915 }
2916
2917 /*
2918   display debug level on a node
2919  */
2920 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2921 {
2922         int ret;
2923         int32_t level;
2924
2925         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2926         if (ret != 0) {
2927                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2928                 return ret;
2929         } else {
2930                 if (options.machinereadable){
2931                         printf(":Name:Level:\n");
2932                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2933                 } else {
2934                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2935                 }
2936         }
2937         return 0;
2938 }
2939
2940 /*
2941   display reclock file of a node
2942  */
2943 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2944 {
2945         int ret;
2946         const char *reclock;
2947
2948         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2949         if (ret != 0) {
2950                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2951                 return ret;
2952         } else {
2953                 if (options.machinereadable){
2954                         if (reclock != NULL) {
2955                                 printf("%s", reclock);
2956                         }
2957                 } else {
2958                         if (reclock == NULL) {
2959                                 printf("No reclock file used.\n");
2960                         } else {
2961                                 printf("Reclock file:%s\n", reclock);
2962                         }
2963                 }
2964         }
2965         return 0;
2966 }
2967
2968 /*
2969   set the reclock file of a node
2970  */
2971 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2972 {
2973         int ret;
2974         const char *reclock;
2975
2976         if (argc == 0) {
2977                 reclock = NULL;
2978         } else if (argc == 1) {
2979                 reclock = argv[0];
2980         } else {
2981                 usage();
2982         }
2983
2984         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2985         if (ret != 0) {
2986                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2987                 return ret;
2988         }
2989         return 0;
2990 }
2991
2992 /*
2993   set the natgw state on/off
2994  */
2995 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2996 {
2997         int ret;
2998         uint32_t natgwstate;
2999
3000         if (argc == 0) {
3001                 usage();
3002         }
3003
3004         if (!strcmp(argv[0], "on")) {
3005                 natgwstate = 1;
3006         } else if (!strcmp(argv[0], "off")) {
3007                 natgwstate = 0;
3008         } else {
3009                 usage();
3010         }
3011
3012         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3013         if (ret != 0) {
3014                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3015                 return ret;
3016         }
3017
3018         return 0;
3019 }
3020
3021 /*
3022   set the lmaster role on/off
3023  */
3024 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3025 {
3026         int ret;
3027         uint32_t lmasterrole;
3028
3029         if (argc == 0) {
3030                 usage();
3031         }
3032
3033         if (!strcmp(argv[0], "on")) {
3034                 lmasterrole = 1;
3035         } else if (!strcmp(argv[0], "off")) {
3036                 lmasterrole = 0;
3037         } else {
3038                 usage();
3039         }
3040
3041         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3042         if (ret != 0) {
3043                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3044                 return ret;
3045         }
3046
3047         return 0;
3048 }
3049
3050 /*
3051   set the recmaster role on/off
3052  */
3053 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3054 {
3055         int ret;
3056         uint32_t recmasterrole;
3057
3058         if (argc == 0) {
3059                 usage();
3060         }
3061
3062         if (!strcmp(argv[0], "on")) {
3063                 recmasterrole = 1;
3064         } else if (!strcmp(argv[0], "off")) {
3065                 recmasterrole = 0;
3066         } else {
3067                 usage();
3068         }
3069
3070         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3071         if (ret != 0) {
3072                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3073                 return ret;
3074         }
3075
3076         return 0;
3077 }
3078
3079 /*
3080   set debug level on a node or all nodes
3081  */
3082 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3083 {
3084         int i, ret;
3085         int32_t level;
3086
3087         if (argc == 0) {
3088                 printf("You must specify the debug level. Valid levels are:\n");
3089                 for (i=0; debug_levels[i].description != NULL; i++) {
3090                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3091                 }
3092
3093                 return 0;
3094         }
3095
3096         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3097                 level = get_debug_by_desc(argv[0]);
3098         } else {
3099                 level = strtol(argv[0], NULL, 0);
3100         }
3101
3102         for (i=0; debug_levels[i].description != NULL; i++) {
3103                 if (level == debug_levels[i].level) {
3104                         break;
3105                 }
3106         }
3107         if (debug_levels[i].description == NULL) {
3108                 printf("Invalid debug level, must be one of\n");
3109                 for (i=0; debug_levels[i].description != NULL; i++) {
3110                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3111                 }
3112                 return -1;
3113         }
3114
3115         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3116         if (ret != 0) {
3117                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3118         }
3119         return 0;
3120 }
3121
3122
3123 /*
3124   freeze a node
3125  */
3126 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3127 {
3128         int ret;
3129         uint32_t priority;
3130         
3131         if (argc == 1) {
3132                 priority = strtol(argv[0], NULL, 0);
3133         } else {
3134                 priority = 0;
3135         }
3136         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3137
3138         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3139         if (ret != 0) {
3140                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3141         }               
3142         return 0;
3143 }
3144
3145 /*
3146   thaw a node
3147  */
3148 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3149 {
3150         int ret;
3151         uint32_t priority;
3152         
3153         if (argc == 1) {
3154                 priority = strtol(argv[0], NULL, 0);
3155         } else {
3156                 priority = 0;
3157         }
3158         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3159
3160         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3161         if (ret != 0) {
3162                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3163         }               
3164         return 0;
3165 }
3166
3167
3168 /*
3169   attach to a database
3170  */
3171 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3172 {
3173         const char *db_name;
3174         struct ctdb_db_context *ctdb_db;
3175
3176         if (argc < 1) {
3177                 usage();
3178         }
3179         db_name = argv[0];
3180
3181         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3182         if (ctdb_db == NULL) {
3183                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3184                 return -1;
3185         }
3186
3187         return 0;
3188 }
3189
3190 /*
3191   set db priority
3192  */
3193 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3194 {
3195         struct ctdb_db_priority db_prio;
3196         int ret;
3197
3198         if (argc < 2) {
3199                 usage();
3200         }
3201
3202         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3203         db_prio.priority = strtoul(argv[1], NULL, 0);
3204
3205         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3206         if (ret != 0) {
3207                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3208                 return -1;
3209         }
3210
3211         return 0;
3212 }
3213
3214 /*
3215   get db priority
3216  */
3217 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3218 {
3219         uint32_t db_id, priority;
3220         int ret;
3221
3222         if (argc < 1) {
3223                 usage();
3224         }
3225
3226         db_id = strtoul(argv[0], NULL, 0);
3227
3228         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3229         if (ret != 0) {
3230                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3231                 return -1;
3232         }
3233
3234         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3235
3236         return 0;
3237 }
3238
3239 /*
3240   run an eventscript on a node
3241  */
3242 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3243 {
3244         TDB_DATA data;
3245         int ret;
3246         int32_t res;
3247         char *errmsg;
3248         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3249
3250         if (argc != 1) {
3251                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3252                 return -1;
3253         }
3254
3255         data.dptr = (unsigned char *)discard_const(argv[0]);
3256         data.dsize = strlen((char *)data.dptr) + 1;
3257
3258         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3259
3260         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3261                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3262         if (ret != 0 || res != 0) {
3263                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3264                 talloc_free(tmp_ctx);
3265                 return -1;
3266         }
3267         talloc_free(tmp_ctx);
3268         return 0;
3269 }
3270
3271 #define DB_VERSION 1
3272 #define MAX_DB_NAME 64
3273 struct db_file_header {
3274         unsigned long version;
3275         time_t timestamp;
3276         unsigned long persistent;
3277         unsigned long size;
3278         const char name[MAX_DB_NAME];
3279 };
3280
3281 struct backup_data {
3282         struct ctdb_marshall_buffer *records;
3283         uint32_t len;
3284         uint32_t total;
3285         bool traverse_error;
3286 };
3287
3288 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3289 {
3290         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3291         struct ctdb_rec_data *rec;
3292
3293         /* add the record */
3294         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3295         if (rec == NULL) {
3296                 bd->traverse_error = true;
3297                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3298                 return -1;
3299         }
3300         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3301         if (bd->records == NULL) {
3302                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3303                 bd->traverse_error = true;
3304                 return -1;
3305         }
3306         bd->records->count++;
3307         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3308         bd->len += rec->length;
3309         talloc_free(rec);
3310
3311         bd->total++;
3312         return 0;
3313 }
3314
3315 /*
3316  * backup a database to a file 
3317  */
3318 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3319 {
3320         int i, ret;
3321         struct ctdb_dbid_map *dbmap=NULL;
3322         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3323         struct db_file_header dbhdr;
3324         struct ctdb_db_context *ctdb_db;
3325         struct backup_data *bd;
3326         int fh = -1;
3327         int status = -1;
3328         const char *reason = NULL;
3329
3330         if (argc != 2) {
3331                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3332                 return -1;
3333         }
3334
3335         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3336         if (ret != 0) {
3337                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3338                 return ret;
3339         }
3340
3341         for(i=0;i<dbmap->num;i++){
3342                 const char *name;
3343
3344                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3345                 if(!strcmp(argv[0], name)){
3346                         talloc_free(discard_const(name));
3347                         break;
3348                 }
3349                 talloc_free(discard_const(name));
3350         }
3351         if (i == dbmap->num) {
3352                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3353                 talloc_free(tmp_ctx);
3354                 return -1;
3355         }
3356
3357         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3358                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3359         if (ret != 0) {
3360                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3361                                  argv[0]));
3362                 talloc_free(tmp_ctx);
3363                 return -1;
3364         }
3365         if (reason) {
3366                 uint32_t allow_unhealthy = 0;
3367
3368                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3369                                       "AllowUnhealthyDBRead",
3370                                       &allow_unhealthy);
3371
3372                 if (allow_unhealthy != 1) {
3373                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3374                                          argv[0], reason));
3375
3376                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3377                                          allow_unhealthy));
3378                         talloc_free(tmp_ctx);
3379                         return -1;
3380                 }
3381
3382                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3383                                      argv[0], argv[0]));
3384                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3385                                      "tunnable AllowUnhealthyDBRead = %u\n",
3386                                      allow_unhealthy));
3387         }
3388
3389         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3390         if (ctdb_db == NULL) {
3391                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3392                 talloc_free(tmp_ctx);
3393                 return -1;
3394         }
3395
3396
3397         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3398         if (ret == -1) {
3399                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3400                 talloc_free(tmp_ctx);
3401                 return -1;
3402         }
3403
3404
3405         bd = talloc_zero(tmp_ctx, struct backup_data);
3406         if (bd == NULL) {
3407                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3408                 talloc_free(tmp_ctx);
3409                 return -1;
3410         }
3411
3412         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3413         if (bd->records == NULL) {
3414                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3415                 talloc_free(tmp_ctx);
3416                 return -1;
3417         }
3418
3419         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3420         bd->records->db_id = ctdb_db->db_id;
3421         /* traverse the database collecting all records */
3422         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3423             bd->traverse_error) {
3424                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3425                 talloc_free(tmp_ctx);
3426                 return -1;              
3427         }
3428
3429         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3430
3431
3432         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3433         if (fh == -1) {
3434                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3435                 talloc_free(tmp_ctx);
3436                 return -1;
3437         }
3438
3439         dbhdr.version = DB_VERSION;
3440         dbhdr.timestamp = time(NULL);
3441         dbhdr.persistent = dbmap->dbs[i].persistent;
3442         dbhdr.size = bd->len;
3443         if (strlen(argv[0]) >= MAX_DB_NAME) {
3444                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3445                 goto done;
3446         }
3447         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3448         ret = write(fh, &dbhdr, sizeof(dbhdr));
3449         if (ret == -1) {
3450                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3451                 goto done;
3452         }
3453         ret = write(fh, bd->records, bd->len);
3454         if (ret == -1) {
3455                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3456                 goto done;
3457         }
3458
3459         status = 0;
3460 done:
3461         if (fh != -1) {
3462                 ret = close(fh);
3463                 if (ret == -1) {
3464                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3465                 }
3466         }
3467         talloc_free(tmp_ctx);
3468         return status;
3469 }
3470
3471 /*
3472  * restore a database from a file 
3473  */
3474 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3475 {
3476         int ret;
3477         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3478         TDB_DATA outdata;
3479         TDB_DATA data;
3480         struct db_file_header dbhdr;
3481         struct ctdb_db_context *ctdb_db;
3482         struct ctdb_node_map *nodemap=NULL;
3483         struct ctdb_vnn_map *vnnmap=NULL;
3484         int i, fh;
3485         struct ctdb_control_wipe_database w;
3486         uint32_t *nodes;
3487         uint32_t generation;
3488         struct tm *tm;
3489         char tbuf[100];
3490
3491         if (argc != 1) {
3492                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3493                 return -1;
3494         }
3495
3496         fh = open(argv[0], O_RDONLY);
3497         if (fh == -1) {
3498                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3499                 talloc_free(tmp_ctx);
3500                 return -1;
3501         }
3502
3503         read(fh, &dbhdr, sizeof(dbhdr));
3504         if (dbhdr.version != DB_VERSION) {
3505                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3506                 talloc_free(tmp_ctx);
3507                 return -1;
3508         }
3509
3510         outdata.dsize = dbhdr.size;
3511         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3512         if (outdata.dptr == NULL) {
3513                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3514                 close(fh);
3515                 talloc_free(tmp_ctx);
3516                 return -1;
3517         }               
3518         read(fh, outdata.dptr, outdata.dsize);
3519         close(fh);
3520
3521         tm = localtime(&dbhdr.timestamp);
3522         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3523         printf("Restoring database '%s' from backup @ %s\n",
3524                 dbhdr.name, tbuf);
3525
3526
3527         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3528         if (ctdb_db == NULL) {
3529                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3530                 talloc_free(tmp_ctx);
3531                 return -1;
3532         }
3533
3534         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3535         if (ret != 0) {
3536                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3537                 talloc_free(tmp_ctx);
3538                 return ret;
3539         }
3540
3541
3542         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3543         if (ret != 0) {
3544                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3545                 talloc_free(tmp_ctx);
3546                 return ret;
3547         }
3548
3549         /* freeze all nodes */
3550         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3551         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3552                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3553                                         nodes, i,
3554                                         TIMELIMIT(),
3555                                         false, tdb_null,
3556                                         NULL, NULL,
3557                                         NULL) != 0) {
3558                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3559                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3560                         talloc_free(tmp_ctx);
3561                         return -1;
3562                 }
3563         }
3564
3565         generation = vnnmap->generation;
3566         data.dptr = (void *)&generation;
3567         data.dsize = sizeof(generation);
3568
3569         /* start a cluster wide transaction */
3570         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3571         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3572                                         nodes, 0,
3573                                         TIMELIMIT(), false, data,
3574                                         NULL, NULL,
3575                                         NULL) != 0) {
3576                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3577                 return -1;
3578         }
3579
3580
3581         w.db_id = ctdb_db->db_id;
3582         w.transaction_id = generation;
3583
3584         data.dptr = (void *)&w;
3585         data.dsize = sizeof(w);
3586
3587         /* wipe all the remote databases. */
3588         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3589         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3590                                         nodes, 0,
3591                                         TIMELIMIT(), false, data,
3592                                         NULL, NULL,
3593                                         NULL) != 0) {
3594                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3595                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3596                 talloc_free(tmp_ctx);
3597                 return -1;
3598         }
3599         
3600         /* push the database */
3601         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3602         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3603                                         nodes, 0,
3604                                         TIMELIMIT(), false, outdata,
3605                                         NULL, NULL,
3606                                         NULL) != 0) {
3607                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3608                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3609                 talloc_free(tmp_ctx);
3610                 return -1;
3611         }
3612
3613         data.dptr = (void *)&ctdb_db->db_id;
3614         data.dsize = sizeof(ctdb_db->db_id);
3615
3616         /* mark the database as healthy */
3617         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3618         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3619                                         nodes, 0,
3620                                         TIMELIMIT(), false, data,
3621                                         NULL, NULL,
3622                                         NULL) != 0) {
3623                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3624                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3625                 talloc_free(tmp_ctx);
3626                 return -1;
3627         }
3628
3629         data.dptr = (void *)&generation;
3630         data.dsize = sizeof(generation);
3631
3632         /* commit all the changes */
3633         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3634                                         nodes, 0,
3635                                         TIMELIMIT(), false, data,
3636                                         NULL, NULL,
3637                                         NULL) != 0) {
3638                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3639                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3640                 talloc_free(tmp_ctx);
3641                 return -1;
3642         }
3643
3644
3645         /* thaw all nodes */
3646         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3647         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3648                                         nodes, 0,
3649                                         TIMELIMIT(),
3650                                         false, tdb_null,
3651                                         NULL, NULL,
3652                                         NULL) != 0) {
3653                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3654                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3655                 talloc_free(tmp_ctx);
3656                 return -1;
3657         }
3658
3659
3660         talloc_free(tmp_ctx);
3661         return 0;
3662 }
3663
3664 /*
3665  * dump a database backup from a file
3666  */
3667 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3668 {
3669         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3670         TDB_DATA outdata;
3671         struct db_file_header dbhdr;
3672         int i, fh;
3673         struct tm *tm;
3674         char tbuf[100];
3675         struct ctdb_rec_data *rec = NULL;
3676         struct ctdb_marshall_buffer *m;
3677
3678         if (argc != 1) {
3679                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3680                 return -1;
3681         }
3682
3683         fh = open(argv[0], O_RDONLY);
3684         if (fh == -1) {
3685                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3686                 talloc_free(tmp_ctx);
3687                 return -1;
3688         }
3689
3690         read(fh, &dbhdr, sizeof(dbhdr));
3691         if (dbhdr.version != DB_VERSION) {
3692                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3693                 talloc_free(tmp_ctx);
3694                 return -1;
3695         }
3696
3697         outdata.dsize = dbhdr.size;
3698         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3699         if (outdata.dptr == NULL) {
3700                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3701                 close(fh);
3702                 talloc_free(tmp_ctx);
3703                 return -1;
3704         }
3705         read(fh, outdata.dptr, outdata.dsize);
3706         close(fh);
3707         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3708
3709         tm = localtime(&dbhdr.timestamp);
3710         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3711         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3712                 dbhdr.name, m->db_id, tbuf);
3713
3714         for (i=0; i < m->count; i++) {
3715                 uint32_t reqid = 0;
3716                 TDB_DATA key, data;
3717
3718                 /* we do not want the header splitted, so we pass NULL*/
3719                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3720                                               NULL, &key, &data);
3721
3722                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3723         }
3724
3725         printf("Dumped %d records\n", i);
3726         talloc_free(tmp_ctx);
3727         return 0;
3728 }
3729
3730 /*
3731  * wipe a database from a file
3732  */
3733 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3734                           const char **argv)
3735 {
3736         int ret;
3737         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3738         TDB_DATA data;
3739         struct ctdb_db_context *ctdb_db;
3740         struct ctdb_node_map *nodemap = NULL;
3741         struct ctdb_vnn_map *vnnmap = NULL;
3742         int i;
3743         struct ctdb_control_wipe_database w;
3744         uint32_t *nodes;
3745         uint32_t generation;
3746         struct ctdb_dbid_map *dbmap = NULL;
3747
3748         if (argc != 1) {
3749                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3750                 return -1;
3751         }
3752
3753         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3754                                  &dbmap);
3755         if (ret != 0) {
3756                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3757                                   options.pnn));
3758                 return ret;
3759         }
3760
3761         for(i=0;i<dbmap->num;i++){
3762                 const char *name;
3763
3764                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3765                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3766                 if(!strcmp(argv[0], name)){
3767                         talloc_free(discard_const(name));
3768                         break;
3769                 }
3770                 talloc_free(discard_const(name));
3771         }
3772         if (i == dbmap->num) {
3773                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3774                                   argv[0]));
3775                 talloc_free(tmp_ctx);
3776                 return -1;
3777         }
3778
3779         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3780         if (ctdb_db == NULL) {
3781                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3782                                   argv[0]));
3783                 talloc_free(tmp_ctx);
3784                 return -1;
3785         }
3786
3787         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3788                                    &nodemap);
3789         if (ret != 0) {
3790                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3791                                   options.pnn));
3792                 talloc_free(tmp_ctx);
3793                 return ret;
3794         }
3795
3796         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3797                                   &vnnmap);
3798         if (ret != 0) {
3799                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3800                                   options.pnn));
3801                 talloc_free(tmp_ctx);
3802                 return ret;
3803         }
3804
3805         /* freeze all nodes */
3806         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3807         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3808                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3809                                                 nodes, i,
3810                                                 TIMELIMIT(),
3811                                                 false, tdb_null,
3812                                                 NULL, NULL,
3813                                                 NULL);
3814                 if (ret != 0) {
3815                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3816                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3817                                              CTDB_RECOVERY_ACTIVE);
3818                         talloc_free(tmp_ctx);
3819                         return -1;
3820                 }
3821         }
3822
3823         generation = vnnmap->generation;
3824         data.dptr = (void *)&generation;
3825         data.dsize = sizeof(generation);
3826
3827         /* start a cluster wide transaction */
3828         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3829         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3830                                         nodes, 0,
3831                                         TIMELIMIT(), false, data,
3832                                         NULL, NULL,
3833                                         NULL);
3834         if (ret!= 0) {
3835                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3836                                   "transactions.\n"));
3837                 return -1;
3838         }
3839
3840         w.db_id = ctdb_db->db_id;
3841         w.transaction_id = generation;
3842
3843         data.dptr = (void *)&w;
3844         data.dsize = sizeof(w);
3845
3846         /* wipe all the remote databases. */
3847         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3848         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3849                                         nodes, 0,
3850                                         TIMELIMIT(), false, data,
3851                                         NULL, NULL,
3852                                         NULL) != 0) {
3853                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3854                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3855                 talloc_free(tmp_ctx);
3856                 return -1;
3857         }
3858
3859         data.dptr = (void *)&ctdb_db->db_id;
3860         data.dsize = sizeof(ctdb_db->db_id);
3861
3862         /* mark the database as healthy */
3863         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3864         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3865                                         nodes, 0,
3866                                         TIMELIMIT(), false, data,
3867                                         NULL, NULL,
3868                                         NULL) != 0) {
3869                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3870                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3871                 talloc_free(tmp_ctx);
3872                 return -1;
3873         }
3874
3875         data.dptr = (void *)&generation;
3876         data.dsize = sizeof(generation);
3877
3878         /* commit all the changes */
3879         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3880                                         nodes, 0,
3881                                         TIMELIMIT(), false, data,
3882                                         NULL, NULL,
3883                                         NULL) != 0) {
3884                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3885                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3886                 talloc_free(tmp_ctx);
3887                 return -1;
3888         }
3889
3890         /* thaw all nodes */
3891         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3892         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3893                                         nodes, 0,
3894                                         TIMELIMIT(),
3895                                         false, tdb_null,
3896                                         NULL, NULL,
3897                                         NULL) != 0) {
3898                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3899                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3900                 talloc_free(tmp_ctx);
3901                 return -1;
3902         }
3903
3904         talloc_free(tmp_ctx);
3905         return 0;
3906 }
3907
3908 /*
3909  * set flags of a node in the nodemap
3910  */
3911 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3912 {
3913         int ret;
3914         int32_t status;
3915         int node;
3916         int flags;
3917         TDB_DATA data;
3918         struct ctdb_node_flag_change c;
3919
3920         if (argc != 2) {
3921                 usage();
3922                 return -1;
3923         }
3924
3925         if (sscanf(argv[0], "%d", &node) != 1) {
3926                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3927                 usage();
3928                 return -1;
3929         }
3930         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3931                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3932                 usage();
3933                 return -1;
3934         }
3935
3936         c.pnn       = node;
3937         c.old_flags = 0;
3938         c.new_flags = flags;
3939
3940         data.dsize = sizeof(c);
3941         data.dptr = (unsigned char *)&c;
3942
3943         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3944                            data, NULL, NULL, &status, NULL, NULL);
3945         if (ret != 0 || status != 0) {
3946                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3947                 return -1;
3948         }
3949         return 0;
3950 }
3951
3952 /*
3953   dump memory usage
3954  */
3955 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3956 {
3957         TDB_DATA data;
3958         int ret;
3959         int32_t res;
3960         char *errmsg;
3961         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3962         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3963                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3964         if (ret != 0 || res != 0) {
3965                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3966                 talloc_free(tmp_ctx);
3967                 return -1;
3968         }
3969         write(1, data.dptr, data.dsize);
3970         talloc_free(tmp_ctx);
3971         return 0;
3972 }
3973
3974 /*
3975   handler for memory dumps
3976 */
3977 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3978                              TDB_DATA data, void *private_data)
3979 {
3980         write(1, data.dptr, data.dsize);
3981         exit(0);
3982 }
3983
3984 /*
3985   dump memory usage on the recovery daemon
3986  */
3987 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3988 {
3989         int ret;
3990         TDB_DATA data;
3991         struct rd_memdump_reply rd;
3992
3993         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3994         if (rd.pnn == -1) {
3995                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3996                 return -1;
3997         }
3998         rd.srvid = getpid();
3999
4000         /* register a message port for receiveing the reply so that we
4001            can receive the reply
4002         */
4003         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4004
4005
4006         data.dptr = (uint8_t *)&rd;
4007         data.dsize = sizeof(rd);
4008
4009         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4010         if (ret != 0) {
4011                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4012                 return -1;
4013         }
4014
4015         /* this loop will terminate when we have received the reply */
4016         while (1) {     
4017                 event_loop_once(ctdb->ev);
4018         }
4019
4020         return 0;
4021 }
4022
4023 /*
4024   list all nodes in the cluster
4025   if the daemon is running, we read the data from the daemon.
4026   if the daemon is not running we parse the nodes file directly
4027  */
4028 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4029 {
4030         int i, ret;
4031         struct ctdb_node_map *nodemap=NULL;
4032
4033         if (ctdb != NULL) {
4034                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4035                 if (ret != 0) {
4036                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4037                         return ret;
4038                 }
4039
4040                 for(i=0;i<nodemap->num;i++){
4041                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4042                                 continue;
4043                         }
4044                         if (options.machinereadable){
4045                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4046                         } else {
4047                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4048                         }
4049                 }
4050         } else {
4051                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4052                 struct pnn_node *pnn_nodes;
4053                 struct pnn_node *pnn_node;
4054         
4055                 pnn_nodes = read_nodes_file(mem_ctx);
4056                 if (pnn_nodes == NULL) {
4057                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4058                         talloc_free(mem_ctx);
4059                         return -1;
4060                 }
4061
4062                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4063                         ctdb_sock_addr addr;
4064
4065                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4066                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4067                                 talloc_free(mem_ctx);
4068                                 return -1;
4069                         }
4070
4071                         if (options.machinereadable){
4072                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4073                         } else {
4074                                 printf("%s\n", pnn_node->addr);
4075                         }
4076                 }
4077                 talloc_free(mem_ctx);
4078         }
4079
4080         return 0;
4081 }
4082
4083 /*
4084   reload the nodes file on the local node
4085  */
4086 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4087 {
4088         int i, ret;
4089         int mypnn;
4090         struct ctdb_node_map *nodemap=NULL;
4091
4092         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4093         if (mypnn == -1) {
4094                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4095                 return -1;
4096         }
4097
4098         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4099         if (ret != 0) {
4100                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4101                 return ret;
4102         }
4103
4104         /* reload the nodes file on all remote nodes */
4105         for (i=0;i<nodemap->num;i++) {
4106                 if (nodemap->nodes[i].pnn == mypnn) {
4107                         continue;
4108                 }
4109                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4110                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4111                         nodemap->nodes[i].pnn);
4112                 if (ret != 0) {
4113                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4114                 }
4115         }
4116
4117         /* reload the nodes file on the local node */
4118         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4119         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4120         if (ret != 0) {
4121                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4122         }
4123
4124         /* initiate a recovery */
4125         control_recover(ctdb, argc, argv);
4126
4127         return 0;
4128 }
4129
4130
4131 static const struct {
4132         const char *name;
4133         int (*fn)(struct ctdb_context *, int, const char **);
4134         bool auto_all;
4135         bool without_daemon; /* can be run without daemon running ? */
4136         const char *msg;
4137         const char *args;
4138 } ctdb_commands[] = {
4139 #ifdef CTDB_VERS
4140         { "version",         control_version,           true,   false,  "show version of ctdb" },
4141 #endif
4142         { "status",          control_status,            true,   false,  "show node status" },
4143         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4144         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4145         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4146         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4147         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4148         { "statistics",      control_statistics,        false,  false, "show statistics" },
4149         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4150         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4151         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4152         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4153         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4154         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4155         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4156         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4157         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4158         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4159         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4160         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4161         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4162         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4163         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4164         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4165         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4166         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4167         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4168         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4169         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4170         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4171         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4172         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4173         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4174         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4175         { "stop",            control_stop,              true,   false,  "stop a node" },
4176         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4177         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4178         { "unban",           control_unban,             true,   false,  "unban a node" },
4179         { "showban",         control_showban,           true,   false,  "show ban information"},
4180         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4181         { "recover",         control_recover,           true,   false,  "force recovery" },
4182         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4183         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4184         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4185         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4186         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4187         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4188         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4189         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4190
4191         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4192         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4193         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4194         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4195         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4196         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4197         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4198         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4199         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4200         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4201         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4202         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4203         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4204         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4205         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4206         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4207         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4208         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4209         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4210         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4211         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4212         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4213         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4214         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4215         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4216         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4217         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4218         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4219         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4220         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4221 };
4222
4223 /*
4224   show usage message
4225  */
4226 static void usage(void)
4227 {
4228         int i;
4229         printf(
4230 "Usage: ctdb [options] <control>\n" \
4231 "Options:\n" \
4232 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4233 "   -Y                 generate machinereadable output\n"
4234 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4235         printf("Controls:\n");
4236         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4237                 printf("  %-15s %-27s  %s\n", 
4238                        ctdb_commands[i].name, 
4239                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4240                        ctdb_commands[i].msg);
4241         }
4242         exit(1);
4243 }
4244
4245
4246 static void ctdb_alarm(int sig)
4247 {
4248         printf("Maximum runtime exceeded - exiting\n");
4249         _exit(ERR_TIMEOUT);
4250 }
4251
4252 /*
4253   main program
4254 */
4255 int main(int argc, const char *argv[])
4256 {
4257         struct ctdb_context *ctdb;
4258         char *nodestring = NULL;
4259         struct poptOption popt_options[] = {
4260                 POPT_AUTOHELP
4261                 POPT_CTDB_CMDLINE
4262                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4263                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4264                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4265                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4266                 POPT_TABLEEND
4267         };
4268         int opt;
4269         const char **extra_argv;
4270         int extra_argc = 0;
4271         int ret=-1, i;
4272         poptContext pc;
4273         struct event_context *ev;
4274         const char *control;
4275
4276         setlinebuf(stdout);
4277         
4278         /* set some defaults */
4279         options.maxruntime = 0;
4280         options.timelimit = 3;
4281         options.pnn = CTDB_CURRENT_NODE;
4282
4283         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4284
4285         while ((opt = poptGetNextOpt(pc)) != -1) {
4286                 switch (opt) {
4287                 default:
4288                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4289                                 poptBadOption(pc, 0), poptStrerror(opt)));
4290                         exit(1);
4291                 }
4292         }
4293
4294         /* setup the remaining options for the main program to use */
4295         extra_argv = poptGetArgs(pc);
4296         if (extra_argv) {
4297                 extra_argv++;
4298                 while (extra_argv[extra_argc]) extra_argc++;
4299         }
4300
4301         if (extra_argc < 1) {
4302                 usage();
4303         }
4304
4305         if (options.maxruntime == 0) {
4306                 const char *ctdb_timeout;
4307                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4308                 if (ctdb_timeout != NULL) {
4309                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4310                 } else {
4311                         /* default timeout is 120 seconds */
4312                         options.maxruntime = 120;
4313                 }
4314         }
4315
4316         signal(SIGALRM, ctdb_alarm);
4317         alarm(options.maxruntime);
4318
4319         /* setup the node number to contact */
4320         if (nodestring != NULL) {
4321                 if (strcmp(nodestring, "all") == 0) {
4322                         options.pnn = CTDB_BROADCAST_ALL;
4323                 } else {
4324                         options.pnn = strtoul(nodestring, NULL, 0);
4325                 }
4326         }
4327
4328         control = extra_argv[0];
4329
4330         ev = event_context_init(NULL);
4331
4332         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4333                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4334                         int j;
4335
4336                         if (ctdb_commands[i].without_daemon == true) {
4337                                 close(2);
4338                         }
4339
4340                         /* initialise ctdb */
4341                         ctdb = ctdb_cmdline_client(ev);
4342
4343                         if (ctdb_commands[i].without_daemon == false) {
4344                                 if (ctdb == NULL) {
4345                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4346                                         exit(1);
4347                                 }
4348
4349                                 /* verify the node exists */
4350                                 verify_node(ctdb);
4351
4352                                 if (options.pnn == CTDB_CURRENT_NODE) {
4353                                         int pnn;
4354                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4355                                         if (pnn == -1) {
4356                                                 return -1;
4357                                         }
4358                                         options.pnn = pnn;
4359                                 }
4360                         }
4361
4362                         if (ctdb_commands[i].auto_all && 
4363                             options.pnn == CTDB_BROADCAST_ALL) {
4364                                 uint32_t *nodes;
4365                                 uint32_t num_nodes;
4366                                 ret = 0;
4367
4368                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4369                                 CTDB_NO_MEMORY(ctdb, nodes);
4370         
4371                                 for (j=0;j<num_nodes;j++) {
4372                                         options.pnn = nodes[j];
4373                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4374                                 }
4375                                 talloc_free(nodes);
4376                         } else {
4377                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4378                         }
4379                         break;
4380                 }
4381         }
4382
4383         if (i == ARRAY_SIZE(ctdb_commands)) {
4384                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4385                 exit(1);
4386         }
4387
4388         return ret;
4389 }