cd8a2598984f3e722c11a6c5cb37e300532eab4a
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         int machinereadable;
47         int verbose;
48         int maxruntime;
49 } options;
50
51 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
52 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
53
54 #ifdef CTDB_VERS
55 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
56 {
57 #define STR(x) #x
58 #define XSTR(x) STR(x)
59         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
60         return 0;
61 }
62 #endif
63
64
65 /*
66   verify that a node exists and is reachable
67  */
68 static void verify_node(struct ctdb_context *ctdb)
69 {
70         int ret;
71         struct ctdb_node_map *nodemap=NULL;
72
73         if (options.pnn == CTDB_CURRENT_NODE) {
74                 return;
75         }
76         if (options.pnn == CTDB_BROADCAST_ALL) {
77                 return;
78         }
79
80         /* verify the node exists */
81         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
82                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
83                 exit(10);
84         }
85         if (options.pnn >= nodemap->num) {
86                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
87                 exit(ERR_NONODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
94                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
95                 exit(ERR_DISNODE);
96         }
97
98         /* verify we can access the node */
99         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
100         if (ret == -1) {
101                 DEBUG(DEBUG_ERR,("Can not access node. Node is not operational.\n"));
102                 exit(10);
103         }
104 }
105
106 /*
107  check if a database exists
108 */
109 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
110 {
111         int i, ret;
112         struct ctdb_dbid_map *dbmap=NULL;
113
114         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
115         if (ret != 0) {
116                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
117                 return -1;
118         }
119
120         for(i=0;i<dbmap->num;i++){
121                 const char *name;
122
123                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
124                 if (!strcmp(name, db_name)) {
125                         if (persistent) {
126                                 *persistent = dbmap->dbs[i].persistent;
127                         }
128                         return 0;
129                 }
130         }
131
132         return -1;
133 }
134
135 /*
136   see if a process exists
137  */
138 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
139 {
140         uint32_t pnn, pid;
141         int ret;
142         if (argc < 1) {
143                 usage();
144         }
145
146         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
147                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
148                 return -1;
149         }
150
151         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
152         if (ret == 0) {
153                 printf("%u:%u exists\n", pnn, pid);
154         } else {
155                 printf("%u:%u does not exist\n", pnn, pid);
156         }
157         return ret;
158 }
159
160 /*
161   display statistics structure
162  */
163 static void show_statistics(struct ctdb_statistics *s, int show_header)
164 {
165         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
166         int i;
167         const char *prefix=NULL;
168         int preflen=0;
169         int tmp, days, hours, minutes, seconds;
170         const struct {
171                 const char *name;
172                 uint32_t offset;
173         } fields[] = {
174 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
175                 STATISTICS_FIELD(num_clients),
176                 STATISTICS_FIELD(frozen),
177                 STATISTICS_FIELD(recovering),
178                 STATISTICS_FIELD(num_recoveries),
179                 STATISTICS_FIELD(client_packets_sent),
180                 STATISTICS_FIELD(client_packets_recv),
181                 STATISTICS_FIELD(node_packets_sent),
182                 STATISTICS_FIELD(node_packets_recv),
183                 STATISTICS_FIELD(keepalive_packets_sent),
184                 STATISTICS_FIELD(keepalive_packets_recv),
185                 STATISTICS_FIELD(node.req_call),
186                 STATISTICS_FIELD(node.reply_call),
187                 STATISTICS_FIELD(node.req_dmaster),
188                 STATISTICS_FIELD(node.reply_dmaster),
189                 STATISTICS_FIELD(node.reply_error),
190                 STATISTICS_FIELD(node.req_message),
191                 STATISTICS_FIELD(node.req_control),
192                 STATISTICS_FIELD(node.reply_control),
193                 STATISTICS_FIELD(client.req_call),
194                 STATISTICS_FIELD(client.req_message),
195                 STATISTICS_FIELD(client.req_control),
196                 STATISTICS_FIELD(timeouts.call),
197                 STATISTICS_FIELD(timeouts.control),
198                 STATISTICS_FIELD(timeouts.traverse),
199                 STATISTICS_FIELD(total_calls),
200                 STATISTICS_FIELD(pending_calls),
201                 STATISTICS_FIELD(lockwait_calls),
202                 STATISTICS_FIELD(pending_lockwait_calls),
203                 STATISTICS_FIELD(childwrite_calls),
204                 STATISTICS_FIELD(pending_childwrite_calls),
205                 STATISTICS_FIELD(memory_used),
206                 STATISTICS_FIELD(max_hop_count),
207         };
208         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
209         seconds = tmp%60;
210         tmp    /= 60;
211         minutes = tmp%60;
212         tmp    /= 60;
213         hours   = tmp%24;
214         tmp    /= 24;
215         days    = tmp;
216
217         if (options.machinereadable){
218                 if (show_header) {
219                         printf("CTDB version:");
220                         printf("Current time of statistics:");
221                         printf("Statistics collected since:");
222                         for (i=0;i<ARRAY_SIZE(fields);i++) {
223                                 printf("%s:", fields[i].name);
224                         }
225                         printf("max_reclock_ctdbd:");
226                         printf("max_reclock_recd:");
227                         printf("max_call_latency:");
228                         printf("max_lockwait_latency:");
229                         printf("max_childwrite_latency:");
230                         printf("max_childwrite_latency:");
231                         printf("\n");
232                 }
233                 printf("%d:", CTDB_VERSION);
234                 printf("%d:", (int)s->statistics_current_time.tv_sec);
235                 printf("%d:", (int)s->statistics_start_time.tv_sec);
236                 for (i=0;i<ARRAY_SIZE(fields);i++) {
237                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
238                 }
239                 printf("%.6f:", s->reclock.ctdbd);
240                 printf("%.6f:", s->reclock.recd);
241                 printf("%.6f:", s->max_call_latency);
242                 printf("%.6f:", s->max_lockwait_latency);
243                 printf("%.6f:", s->max_childwrite_latency);
244                 printf("%.6f:", s->max_childwrite_latency);
245                 printf("\n");
246         } else {
247                 printf("CTDB version %u\n", CTDB_VERSION);
248                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
249                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
250
251                 for (i=0;i<ARRAY_SIZE(fields);i++) {
252                         if (strchr(fields[i].name, '.')) {
253                                 preflen = strcspn(fields[i].name, ".")+1;
254                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
255                                         prefix = fields[i].name;
256                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
257                                 }
258                         } else {
259                                 preflen = 0;
260                         }
261                         printf(" %*s%-22s%*s%10u\n", 
262                                preflen?4:0, "",
263                                fields[i].name+preflen, 
264                                preflen?0:4, "",
265                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
266                 }
267                 printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
268                 printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
269
270                 printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
271                 printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
272                 printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
273                 printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
274         }
275
276         talloc_free(tmp_ctx);
277 }
278
279 /*
280   display remote ctdb statistics combined from all nodes
281  */
282 static int control_statistics_all(struct ctdb_context *ctdb)
283 {
284         int ret, i;
285         struct ctdb_statistics statistics;
286         uint32_t *nodes;
287         uint32_t num_nodes;
288
289         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
290         CTDB_NO_MEMORY(ctdb, nodes);
291         
292         ZERO_STRUCT(statistics);
293
294         for (i=0;i<num_nodes;i++) {
295                 struct ctdb_statistics s1;
296                 int j;
297                 uint32_t *v1 = (uint32_t *)&s1;
298                 uint32_t *v2 = (uint32_t *)&statistics;
299                 uint32_t num_ints = 
300                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
301                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
302                 if (ret != 0) {
303                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
304                         return ret;
305                 }
306                 for (j=0;j<num_ints;j++) {
307                         v2[j] += v1[j];
308                 }
309                 statistics.max_hop_count = 
310                         MAX(statistics.max_hop_count, s1.max_hop_count);
311                 statistics.max_call_latency = 
312                         MAX(statistics.max_call_latency, s1.max_call_latency);
313                 statistics.max_lockwait_latency = 
314                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
315         }
316         talloc_free(nodes);
317         printf("Gathered statistics for %u nodes\n", num_nodes);
318         show_statistics(&statistics, 1);
319         return 0;
320 }
321
322 /*
323   display remote ctdb statistics
324  */
325 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
326 {
327         int ret;
328         struct ctdb_statistics statistics;
329
330         if (options.pnn == CTDB_BROADCAST_ALL) {
331                 return control_statistics_all(ctdb);
332         }
333
334         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
335         if (ret != 0) {
336                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
337                 return ret;
338         }
339         show_statistics(&statistics, 1);
340         return 0;
341 }
342
343
344 /*
345   reset remote ctdb statistics
346  */
347 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
348 {
349         int ret;
350
351         ret = ctdb_statistics_reset(ctdb, options.pnn);
352         if (ret != 0) {
353                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
354                 return ret;
355         }
356         return 0;
357 }
358
359
360 /*
361   display remote ctdb rolling statistics
362  */
363 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
364 {
365         int ret;
366         struct ctdb_statistics_wire *stats;
367         int i, num_records = -1;
368
369         if (argc ==1) {
370                 num_records = atoi(argv[0]) - 1;
371         }
372
373         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
374         if (ret != 0) {
375                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
376                 return ret;
377         }
378         for (i=0;i<stats->num;i++) {
379                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
380                         continue;
381                 }
382                 show_statistics(&stats->stats[i], i==0);
383                 if (i == num_records) {
384                         break;
385                 }
386         }
387         return 0;
388 }
389
390
391 /*
392   display uptime of remote node
393  */
394 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
395 {
396         int ret;
397         struct ctdb_uptime *uptime = NULL;
398         int tmp, days, hours, minutes, seconds;
399
400         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
401         if (ret != 0) {
402                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
403                 return ret;
404         }
405
406         if (options.machinereadable){
407                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
408                 printf(":%u:%u:%u:%lf\n",
409                         (unsigned int)uptime->current_time.tv_sec,
410                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
411                         (unsigned int)uptime->last_recovery_finished.tv_sec,
412                         timeval_delta(&uptime->last_recovery_finished,
413                                       &uptime->last_recovery_started)
414                 );
415                 return 0;
416         }
417
418         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
419
420         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
421         seconds = tmp%60;
422         tmp    /= 60;
423         minutes = tmp%60;
424         tmp    /= 60;
425         hours   = tmp%24;
426         tmp    /= 24;
427         days    = tmp;
428         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
429
430         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
431         seconds = tmp%60;
432         tmp    /= 60;
433         minutes = tmp%60;
434         tmp    /= 60;
435         hours   = tmp%24;
436         tmp    /= 24;
437         days    = tmp;
438         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
439         
440         printf("Duration of last recovery/failover: %lf seconds\n",
441                 timeval_delta(&uptime->last_recovery_finished,
442                               &uptime->last_recovery_started));
443
444         return 0;
445 }
446
447 /*
448   show the PNN of the current node
449  */
450 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
451 {
452         uint32_t mypnn;
453         bool ret;
454
455         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
456         if (!ret) {
457                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
458                 return -1;
459         }
460
461         printf("PNN:%d\n", mypnn);
462         return 0;
463 }
464
465
466 struct pnn_node {
467         struct pnn_node *next;
468         const char *addr;
469         int pnn;
470 };
471
472 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
473 {
474         const char *nodes_list;
475         int nlines;
476         char **lines;
477         int i, pnn;
478         struct pnn_node *pnn_nodes = NULL;
479         struct pnn_node *pnn_node;
480         struct pnn_node *tmp_node;
481
482         /* read the nodes file */
483         nodes_list = getenv("CTDB_NODES");
484         if (nodes_list == NULL) {
485                 nodes_list = "/etc/ctdb/nodes";
486         }
487         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
488         if (lines == NULL) {
489                 return NULL;
490         }
491         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
492                 nlines--;
493         }
494         for (i=0, pnn=0; i<nlines; i++) {
495                 char *node;
496
497                 node = lines[i];
498                 /* strip leading spaces */
499                 while((*node == ' ') || (*node == '\t')) {
500                         node++;
501                 }
502                 if (*node == '#') {
503                         pnn++;
504                         continue;
505                 }
506                 if (strcmp(node, "") == 0) {
507                         continue;
508                 }
509                 pnn_node = talloc(mem_ctx, struct pnn_node);
510                 pnn_node->pnn = pnn++;
511                 pnn_node->addr = talloc_strdup(pnn_node, node);
512                 pnn_node->next = pnn_nodes;
513                 pnn_nodes = pnn_node;
514         }
515
516         /* swap them around so we return them in incrementing order */
517         pnn_node = pnn_nodes;
518         pnn_nodes = NULL;
519         while (pnn_node) {
520                 tmp_node = pnn_node;
521                 pnn_node = pnn_node->next;
522
523                 tmp_node->next = pnn_nodes;
524                 pnn_nodes = tmp_node;
525         }
526
527         return pnn_nodes;
528 }
529
530 /*
531   show the PNN of the current node
532   discover the pnn by loading the nodes file and try to bind to all
533   addresses one at a time until the ip address is found.
534  */
535 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
536 {
537         TALLOC_CTX *mem_ctx = talloc_new(NULL);
538         struct pnn_node *pnn_nodes;
539         struct pnn_node *pnn_node;
540
541         pnn_nodes = read_nodes_file(mem_ctx);
542         if (pnn_nodes == NULL) {
543                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
544                 talloc_free(mem_ctx);
545                 return -1;
546         }
547
548         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
549                 ctdb_sock_addr addr;
550
551                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
552                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
553                         talloc_free(mem_ctx);
554                         return -1;
555                 }
556
557                 if (ctdb_sys_have_ip(&addr)) {
558                         printf("PNN:%d\n", pnn_node->pnn);
559                         talloc_free(mem_ctx);
560                         return 0;
561                 }
562         }
563
564         printf("Failed to detect which PNN this node is\n");
565         talloc_free(mem_ctx);
566         return -1;
567 }
568
569 /*
570   display remote ctdb status
571  */
572 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
573 {
574         int i, ret;
575         struct ctdb_vnn_map *vnnmap=NULL;
576         struct ctdb_node_map *nodemap=NULL;
577         uint32_t recmode, recmaster;
578         int mypnn;
579
580         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
581         if (mypnn == -1) {
582                 return -1;
583         }
584
585         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
586         if (ret != 0) {
587                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
588                 return ret;
589         }
590
591         if(options.machinereadable){
592                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
593                 for(i=0;i<nodemap->num;i++){
594                         int partially_online = 0;
595                         int j;
596
597                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
598                                 continue;
599                         }
600                         if (nodemap->nodes[i].flags == 0) {
601                                 struct ctdb_control_get_ifaces *ifaces;
602
603                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
604                                                            nodemap->nodes[i].pnn,
605                                                            ctdb, &ifaces);
606                                 if (ret == 0) {
607                                         for (j=0; j < ifaces->num; j++) {
608                                                 if (ifaces->ifaces[j].link_state != 0) {
609                                                         continue;
610                                                 }
611                                                 partially_online = 1;
612                                                 break;
613                                         }
614                                         talloc_free(ifaces);
615                                 }
616                         }
617                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
618                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
619                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
620                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
621                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
622                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
623                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
624                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
625                                partially_online);
626                 }
627                 return 0;
628         }
629
630         printf("Number of nodes:%d\n", nodemap->num);
631         for(i=0;i<nodemap->num;i++){
632                 static const struct {
633                         uint32_t flag;
634                         const char *name;
635                 } flag_names[] = {
636                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
637                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
638                         { NODE_FLAGS_BANNED,                "BANNED" },
639                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
640                         { NODE_FLAGS_DELETED,               "DELETED" },
641                         { NODE_FLAGS_STOPPED,               "STOPPED" },
642                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
643                 };
644                 char *flags_str = NULL;
645                 int j;
646
647                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
648                         continue;
649                 }
650                 if (nodemap->nodes[i].flags == 0) {
651                         struct ctdb_control_get_ifaces *ifaces;
652
653                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
654                                                    nodemap->nodes[i].pnn,
655                                                    ctdb, &ifaces);
656                         if (ret == 0) {
657                                 for (j=0; j < ifaces->num; j++) {
658                                         if (ifaces->ifaces[j].link_state != 0) {
659                                                 continue;
660                                         }
661                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
662                                         break;
663                                 }
664                                 talloc_free(ifaces);
665                         }
666                 }
667                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
668                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
669                                 if (flags_str == NULL) {
670                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
671                                 } else {
672                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
673                                                                            flag_names[j].name);
674                                 }
675                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
676                         }
677                 }
678                 if (flags_str == NULL) {
679                         flags_str = talloc_strdup(ctdb, "OK");
680                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
681                 }
682                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
683                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
684                        flags_str,
685                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
686                 talloc_free(flags_str);
687         }
688
689         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
690         if (ret != 0) {
691                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
692                 return ret;
693         }
694         if (vnnmap->generation == INVALID_GENERATION) {
695                 printf("Generation:INVALID\n");
696         } else {
697                 printf("Generation:%d\n",vnnmap->generation);
698         }
699         printf("Size:%d\n",vnnmap->size);
700         for(i=0;i<vnnmap->size;i++){
701                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
702         }
703
704         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
705         if (ret != 0) {
706                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
707                 return ret;
708         }
709         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
710
711         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
712         if (ret != 0) {
713                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
714                 return ret;
715         }
716         printf("Recovery master:%d\n",recmaster);
717
718         return 0;
719 }
720
721
722 struct natgw_node {
723         struct natgw_node *next;
724         const char *addr;
725 };
726
727 /*
728   display the list of nodes belonging to this natgw configuration
729  */
730 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
731 {
732         int i, ret;
733         uint32_t capabilities;
734         const char *natgw_list;
735         int nlines;
736         char **lines;
737         struct natgw_node *natgw_nodes = NULL;
738         struct natgw_node *natgw_node;
739         struct ctdb_node_map *nodemap=NULL;
740
741
742         /* read the natgw nodes file into a linked list */
743         natgw_list = getenv("NATGW_NODES");
744         if (natgw_list == NULL) {
745                 natgw_list = "/etc/ctdb/natgw_nodes";
746         }
747         lines = file_lines_load(natgw_list, &nlines, ctdb);
748         if (lines == NULL) {
749                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
750                 return -1;
751         }
752         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
753                 nlines--;
754         }
755         for (i=0;i<nlines;i++) {
756                 char *node;
757
758                 node = lines[i];
759                 /* strip leading spaces */
760                 while((*node == ' ') || (*node == '\t')) {
761                         node++;
762                 }
763                 if (*node == '#') {
764                         continue;
765                 }
766                 if (strcmp(node, "") == 0) {
767                         continue;
768                 }
769                 natgw_node = talloc(ctdb, struct natgw_node);
770                 natgw_node->addr = talloc_strdup(natgw_node, node);
771                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
772                 natgw_node->next = natgw_nodes;
773                 natgw_nodes = natgw_node;
774         }
775
776         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
777         if (ret != 0) {
778                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
779                 return ret;
780         }
781
782         i=0;
783         while(i<nodemap->num) {
784                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
785                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
786                                 break;
787                         }
788                 }
789
790                 /* this node was not in the natgw so we just remove it from
791                  * the list
792                  */
793                 if ((natgw_node == NULL) 
794                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
795                         int j;
796
797                         for (j=i+1; j<nodemap->num; j++) {
798                                 nodemap->nodes[j-1] = nodemap->nodes[j];
799                         }
800                         nodemap->num--;
801                         continue;
802                 }
803
804                 i++;
805         }               
806
807         /* pick a node to be natgwmaster
808          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
809          */
810         for(i=0;i<nodemap->num;i++){
811                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
812                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
813                         if (ret != 0) {
814                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
815                                 return ret;
816                         }
817                         if (!(capabilities&CTDB_CAP_NATGW)) {
818                                 continue;
819                         }
820                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
821                         break;
822                 }
823         }
824         /* we couldnt find any healthy node, try unhealthy ones */
825         if (i == nodemap->num) {
826                 for(i=0;i<nodemap->num;i++){
827                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
828                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
829                                 if (ret != 0) {
830                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
831                                         return ret;
832                                 }
833                                 if (!(capabilities&CTDB_CAP_NATGW)) {
834                                         continue;
835                                 }
836                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
837                                 break;
838                         }
839                 }
840         }
841         /* unless all nodes are STOPPED, when we pick one anyway */
842         if (i == nodemap->num) {
843                 for(i=0;i<nodemap->num;i++){
844                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
845                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
846                                 if (ret != 0) {
847                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
848                                         return ret;
849                                 }
850                                 if (!(capabilities&CTDB_CAP_NATGW)) {
851                                         continue;
852                                 }
853                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
854                                 break;
855                         }
856                 }
857                 /* or if we still can not find any */
858                 if (i == nodemap->num) {
859                         printf("-1 0.0.0.0\n");
860                 }
861         }
862
863         /* print the pruned list of nodes belonging to this natgw list */
864         for(i=0;i<nodemap->num;i++){
865                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
866                         continue;
867                 }
868                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
869                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
870                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
871                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
872                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
873                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
874                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
875         }
876
877         return 0;
878 }
879
880 /*
881   display the status of the scripts for monitoring (or other events)
882  */
883 static int control_one_scriptstatus(struct ctdb_context *ctdb,
884                                     enum ctdb_eventscript_call type)
885 {
886         struct ctdb_scripts_wire *script_status;
887         int ret, i;
888
889         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
890         if (ret != 0) {
891                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
892                 return ret;
893         }
894
895         if (script_status == NULL) {
896                 if (!options.machinereadable) {
897                         printf("%s cycle never run\n",
898                                ctdb_eventscript_call_names[type]);
899                 }
900                 return 0;
901         }
902
903         if (!options.machinereadable) {
904                 printf("%d scripts were executed last %s cycle\n",
905                        script_status->num_scripts,
906                        ctdb_eventscript_call_names[type]);
907         }
908         for (i=0; i<script_status->num_scripts; i++) {
909                 const char *status = NULL;
910
911                 switch (script_status->scripts[i].status) {
912                 case -ETIME:
913                         status = "TIMEDOUT";
914                         break;
915                 case -ENOEXEC:
916                         status = "DISABLED";
917                         break;
918                 case 0:
919                         status = "OK";
920                         break;
921                 default:
922                         if (script_status->scripts[i].status > 0)
923                                 status = "ERROR";
924                         break;
925                 }
926                 if (options.machinereadable) {
927                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
928                                ctdb_eventscript_call_names[type],
929                                script_status->scripts[i].name,
930                                script_status->scripts[i].status,
931                                status,
932                                (long)script_status->scripts[i].start.tv_sec,
933                                (long)script_status->scripts[i].start.tv_usec,
934                                (long)script_status->scripts[i].finished.tv_sec,
935                                (long)script_status->scripts[i].finished.tv_usec,
936                                script_status->scripts[i].output);
937                         continue;
938                 }
939                 if (status)
940                         printf("%-20s Status:%s    ",
941                                script_status->scripts[i].name, status);
942                 else
943                         /* Some other error, eg from stat. */
944                         printf("%-20s Status:CANNOT RUN (%s)",
945                                script_status->scripts[i].name,
946                                strerror(-script_status->scripts[i].status));
947
948                 if (script_status->scripts[i].status >= 0) {
949                         printf("Duration:%.3lf ",
950                         timeval_delta(&script_status->scripts[i].finished,
951                               &script_status->scripts[i].start));
952                 }
953                 if (script_status->scripts[i].status != -ENOEXEC) {
954                         printf("%s",
955                                ctime(&script_status->scripts[i].start.tv_sec));
956                         if (script_status->scripts[i].status != 0) {
957                                 printf("   OUTPUT:%s\n",
958                                        script_status->scripts[i].output);
959                         }
960                 } else {
961                         printf("\n");
962                 }
963         }
964         return 0;
965 }
966
967
968 static int control_scriptstatus(struct ctdb_context *ctdb,
969                                 int argc, const char **argv)
970 {
971         int ret;
972         enum ctdb_eventscript_call type, min, max;
973         const char *arg;
974
975         if (argc > 1) {
976                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
977                 return -1;
978         }
979
980         if (argc == 0)
981                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
982         else
983                 arg = argv[0];
984
985         for (type = 0; type < CTDB_EVENT_MAX; type++) {
986                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
987                         min = type;
988                         max = type+1;
989                         break;
990                 }
991         }
992         if (type == CTDB_EVENT_MAX) {
993                 if (strcmp(arg, "all") == 0) {
994                         min = 0;
995                         max = CTDB_EVENT_MAX;
996                 } else {
997                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
998                         return -1;
999                 }
1000         }
1001
1002         if (options.machinereadable) {
1003                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1004         }
1005
1006         for (type = min; type < max; type++) {
1007                 ret = control_one_scriptstatus(ctdb, type);
1008                 if (ret != 0) {
1009                         return ret;
1010                 }
1011         }
1012
1013         return 0;
1014 }
1015
1016 /*
1017   enable an eventscript
1018  */
1019 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1020 {
1021         int ret;
1022
1023         if (argc < 1) {
1024                 usage();
1025         }
1026
1027         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1028         if (ret != 0) {
1029           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1030                 return ret;
1031         }
1032
1033         return 0;
1034 }
1035
1036 /*
1037   disable an eventscript
1038  */
1039 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1040 {
1041         int ret;
1042
1043         if (argc < 1) {
1044                 usage();
1045         }
1046
1047         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1048         if (ret != 0) {
1049           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1050                 return ret;
1051         }
1052
1053         return 0;
1054 }
1055
1056 /*
1057   display the pnn of the recovery master
1058  */
1059 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1060 {
1061         int ret;
1062         uint32_t recmaster;
1063
1064         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1065         if (ret != 0) {
1066                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1067                 return ret;
1068         }
1069         printf("%d\n",recmaster);
1070
1071         return 0;
1072 }
1073
1074 /*
1075   add a tickle to a public address
1076  */
1077 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1078 {
1079         struct ctdb_tcp_connection t;
1080         TDB_DATA data;
1081         int ret;
1082
1083         if (argc < 2) {
1084                 usage();
1085         }
1086
1087         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1088                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1089                 return -1;
1090         }
1091         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1092                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1093                 return -1;
1094         }
1095
1096         data.dptr = (uint8_t *)&t;
1097         data.dsize = sizeof(t);
1098
1099         /* tell all nodes about this tcp connection */
1100         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1101                            0, data, ctdb, NULL, NULL, NULL, NULL);
1102         if (ret != 0) {
1103                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1104                 return -1;
1105         }
1106         
1107         return 0;
1108 }
1109
1110
1111 /*
1112   delete a tickle from a node
1113  */
1114 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1115 {
1116         struct ctdb_tcp_connection t;
1117         TDB_DATA data;
1118         int ret;
1119
1120         if (argc < 2) {
1121                 usage();
1122         }
1123
1124         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1125                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1126                 return -1;
1127         }
1128         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1129                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1130                 return -1;
1131         }
1132
1133         data.dptr = (uint8_t *)&t;
1134         data.dsize = sizeof(t);
1135
1136         /* tell all nodes about this tcp connection */
1137         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1138                            0, data, ctdb, NULL, NULL, NULL, NULL);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1141                 return -1;
1142         }
1143         
1144         return 0;
1145 }
1146
1147
1148 /*
1149   get a list of all tickles for this pnn
1150  */
1151 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1152 {
1153         struct ctdb_control_tcp_tickle_list *list;
1154         ctdb_sock_addr addr;
1155         int i, ret;
1156         unsigned port = 0;
1157
1158         if (argc < 1) {
1159                 usage();
1160         }
1161
1162         if (argc == 2) {
1163                 port = atoi(argv[1]);
1164         }
1165
1166         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1167                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1168                 return -1;
1169         }
1170
1171         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1172         if (ret == -1) {
1173                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1174                 return -1;
1175         }
1176
1177         if (options.machinereadable){
1178                 printf(":source ip:port:destination ip:port:\n");
1179                 for (i=0;i<list->tickles.num;i++) {
1180                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1181                                 continue;
1182                         }
1183                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1184                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1185                 }
1186         } else {
1187                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1188                 printf("Num tickles:%u\n", list->tickles.num);
1189                 for (i=0;i<list->tickles.num;i++) {
1190                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1191                                 continue;
1192                         }
1193                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1194                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1195                 }
1196         }
1197
1198         talloc_free(list);
1199         
1200         return 0;
1201 }
1202
1203
1204 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1205 {
1206         struct ctdb_all_public_ips *ips;
1207         struct ctdb_public_ip ip;
1208         int i, ret;
1209         uint32_t *nodes;
1210         uint32_t disable_time;
1211         TDB_DATA data;
1212         struct ctdb_node_map *nodemap=NULL;
1213         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1214
1215         disable_time = 30;
1216         data.dptr  = (uint8_t*)&disable_time;
1217         data.dsize = sizeof(disable_time);
1218         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1219         if (ret != 0) {
1220                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1221                 return -1;
1222         }
1223
1224
1225
1226         /* read the public ip list from the node */
1227         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1228         if (ret != 0) {
1229                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1230                 talloc_free(tmp_ctx);
1231                 return -1;
1232         }
1233
1234         for (i=0;i<ips->num;i++) {
1235                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1236                         break;
1237                 }
1238         }
1239         if (i==ips->num) {
1240                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1241                         pnn, ctdb_addr_to_str(addr)));
1242                 talloc_free(tmp_ctx);
1243                 return -1;
1244         }
1245
1246         ip.pnn  = pnn;
1247         ip.addr = *addr;
1248
1249         data.dptr  = (uint8_t *)&ip;
1250         data.dsize = sizeof(ip);
1251
1252         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1253         if (ret != 0) {
1254                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1255                 talloc_free(tmp_ctx);
1256                 return ret;
1257         }
1258
1259         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1260         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1261                                         nodes, 0,
1262                                         LONGTIMELIMIT(),
1263                                         false, data,
1264                                         NULL, NULL,
1265                                         NULL);
1266         if (ret != 0) {
1267                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1268                 talloc_free(tmp_ctx);
1269                 return -1;
1270         }
1271
1272         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1273         if (ret != 0) {
1274                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1275                 talloc_free(tmp_ctx);
1276                 return -1;
1277         }
1278
1279         /* update the recovery daemon so it now knows to expect the new
1280            node assignment for this ip.
1281         */
1282         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1285                 return -1;
1286         }
1287
1288         talloc_free(tmp_ctx);
1289         return 0;
1290 }
1291
1292 /*
1293   move/failover an ip address to a specific node
1294  */
1295 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1296 {
1297         uint32_t pnn;
1298         ctdb_sock_addr addr;
1299
1300         if (argc < 2) {
1301                 usage();
1302                 return -1;
1303         }
1304
1305         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1306                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1307                 return -1;
1308         }
1309
1310
1311         if (sscanf(argv[1], "%u", &pnn) != 1) {
1312                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1313                 return -1;
1314         }
1315
1316         if (move_ip(ctdb, &addr, pnn) != 0) {
1317                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1318                 return -1;
1319         }
1320
1321         return 0;
1322 }
1323
1324 void getips_store_callback(void *param, void *data)
1325 {
1326         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1327         struct ctdb_all_public_ips *ips = param;
1328         int i;
1329
1330         i = ips->num++;
1331         ips->ips[i].pnn  = node_ip->pnn;
1332         ips->ips[i].addr = node_ip->addr;
1333 }
1334
1335 void getips_count_callback(void *param, void *data)
1336 {
1337         uint32_t *count = param;
1338
1339         (*count)++;
1340 }
1341
1342 #define IP_KEYLEN       4
1343 static uint32_t *ip_key(ctdb_sock_addr *ip)
1344 {
1345         static uint32_t key[IP_KEYLEN];
1346
1347         bzero(key, sizeof(key));
1348
1349         switch (ip->sa.sa_family) {
1350         case AF_INET:
1351                 key[0]  = ip->ip.sin_addr.s_addr;
1352                 break;
1353         case AF_INET6:
1354                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1355                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1356                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1357                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1358                 break;
1359         default:
1360                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1361                 return key;
1362         }
1363
1364         return key;
1365 }
1366
1367 static void *add_ip_callback(void *parm, void *data)
1368 {
1369         return parm;
1370 }
1371
1372 static int
1373 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1374 {
1375         struct ctdb_all_public_ips *tmp_ips;
1376         struct ctdb_node_map *nodemap=NULL;
1377         trbt_tree_t *ip_tree;
1378         int i, j, len, ret;
1379         uint32_t count;
1380
1381         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1382         if (ret != 0) {
1383                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1384                 return ret;
1385         }
1386
1387         ip_tree = trbt_create(tmp_ctx, 0);
1388
1389         for(i=0;i<nodemap->num;i++){
1390                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1391                         continue;
1392                 }
1393                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1394                         continue;
1395                 }
1396
1397                 /* read the public ip list from this node */
1398                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1399                 if (ret != 0) {
1400                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1401                         return -1;
1402                 }
1403         
1404                 for (j=0; j<tmp_ips->num;j++) {
1405                         struct ctdb_public_ip *node_ip;
1406
1407                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1408                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1409                         node_ip->addr = tmp_ips->ips[j].addr;
1410
1411                         trbt_insertarray32_callback(ip_tree,
1412                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1413                                 add_ip_callback,
1414                                 node_ip);
1415                 }
1416                 talloc_free(tmp_ips);
1417         }
1418
1419         /* traverse */
1420         count = 0;
1421         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1422
1423         len = offsetof(struct ctdb_all_public_ips, ips) + 
1424                 count*sizeof(struct ctdb_public_ip);
1425         tmp_ips = talloc_zero_size(tmp_ctx, len);
1426         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1427
1428         *ips = tmp_ips;
1429
1430         return 0;
1431 }
1432
1433
1434 /* 
1435  * scans all other nodes and returns a pnn for another node that can host this 
1436  * ip address or -1
1437  */
1438 static int
1439 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1440 {
1441         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1442         struct ctdb_all_public_ips *ips;
1443         struct ctdb_node_map *nodemap=NULL;
1444         int i, j, ret;
1445
1446         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1447         if (ret != 0) {
1448                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1449                 talloc_free(tmp_ctx);
1450                 return ret;
1451         }
1452
1453         for(i=0;i<nodemap->num;i++){
1454                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1455                         continue;
1456                 }
1457                 if (nodemap->nodes[i].pnn == options.pnn) {
1458                         continue;
1459                 }
1460
1461                 /* read the public ip list from this node */
1462                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1463                 if (ret != 0) {
1464                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1465                         return -1;
1466                 }
1467
1468                 for (j=0;j<ips->num;j++) {
1469                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1470                                 talloc_free(tmp_ctx);
1471                                 return nodemap->nodes[i].pnn;
1472                         }
1473                 }
1474                 talloc_free(ips);
1475         }
1476
1477         talloc_free(tmp_ctx);
1478         return -1;
1479 }
1480
1481 /*
1482   add a public ip address to a node
1483  */
1484 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1485 {
1486         int i, ret;
1487         int len;
1488         uint32_t pnn;
1489         unsigned mask;
1490         ctdb_sock_addr addr;
1491         struct ctdb_control_ip_iface *pub;
1492         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1493         struct ctdb_all_public_ips *ips;
1494
1495
1496         if (argc != 2) {
1497                 talloc_free(tmp_ctx);
1498                 usage();
1499         }
1500
1501         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1502                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1503                 talloc_free(tmp_ctx);
1504                 return -1;
1505         }
1506
1507         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1508         if (ret != 0) {
1509                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1510                 talloc_free(tmp_ctx);
1511                 return ret;
1512         }
1513
1514
1515         /* check if some other node is already serving this ip, if not,
1516          * we will claim it
1517          */
1518         for (i=0;i<ips->num;i++) {
1519                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1520                         break;
1521                 }
1522         }
1523
1524         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1525         pub = talloc_size(tmp_ctx, len); 
1526         CTDB_NO_MEMORY(ctdb, pub);
1527
1528         pub->addr  = addr;
1529         pub->mask  = mask;
1530         pub->len   = strlen(argv[1])+1;
1531         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1532
1533         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1534         if (ret != 0) {
1535                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1536                 talloc_free(tmp_ctx);
1537                 return ret;
1538         }
1539
1540         if (i == ips->num) {
1541                 /* no one has this ip so we claim it */
1542                 pnn  = options.pnn;
1543         } else {
1544                 pnn  = ips->ips[i].pnn;
1545         }
1546
1547         if (move_ip(ctdb, &addr, pnn) != 0) {
1548                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1549                 return -1;
1550         }
1551
1552         talloc_free(tmp_ctx);
1553         return 0;
1554 }
1555
1556 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1557
1558 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1559 {
1560         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1561         struct ctdb_node_map *nodemap=NULL;
1562         struct ctdb_all_public_ips *ips;
1563         int ret, i, j;
1564
1565         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1566         if (ret != 0) {
1567                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1568                 return ret;
1569         }
1570
1571         /* remove it from the nodes that are not hosting the ip currently */
1572         for(i=0;i<nodemap->num;i++){
1573                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1574                         continue;
1575                 }
1576                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1577                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1578                         continue;
1579                 }
1580
1581                 for (j=0;j<ips->num;j++) {
1582                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1583                                 break;
1584                         }
1585                 }
1586                 if (j==ips->num) {
1587                         continue;
1588                 }
1589
1590                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1591                         continue;
1592                 }
1593
1594                 options.pnn = nodemap->nodes[i].pnn;
1595                 control_delip(ctdb, argc, argv);
1596         }
1597
1598
1599         /* remove it from every node (also the one hosting it) */
1600         for(i=0;i<nodemap->num;i++){
1601                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1602                         continue;
1603                 }
1604                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1605                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1606                         continue;
1607                 }
1608
1609                 for (j=0;j<ips->num;j++) {
1610                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1611                                 break;
1612                         }
1613                 }
1614                 if (j==ips->num) {
1615                         continue;
1616                 }
1617
1618                 options.pnn = nodemap->nodes[i].pnn;
1619                 control_delip(ctdb, argc, argv);
1620         }
1621
1622         talloc_free(tmp_ctx);
1623         return 0;
1624 }
1625         
1626 /*
1627   delete a public ip address from a node
1628  */
1629 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1630 {
1631         int i, ret;
1632         ctdb_sock_addr addr;
1633         struct ctdb_control_ip_iface pub;
1634         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1635         struct ctdb_all_public_ips *ips;
1636
1637         if (argc != 1) {
1638                 talloc_free(tmp_ctx);
1639                 usage();
1640         }
1641
1642         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1643                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1644                 return -1;
1645         }
1646
1647         if (options.pnn == CTDB_BROADCAST_ALL) {
1648                 return control_delip_all(ctdb, argc, argv, &addr);
1649         }
1650
1651         pub.addr  = addr;
1652         pub.mask  = 0;
1653         pub.len   = 0;
1654
1655         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1656         if (ret != 0) {
1657                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1658                 talloc_free(tmp_ctx);
1659                 return ret;
1660         }
1661         
1662         for (i=0;i<ips->num;i++) {
1663                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1664                         break;
1665                 }
1666         }
1667
1668         if (i==ips->num) {
1669                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1670                         ctdb_addr_to_str(&addr)));
1671                 talloc_free(tmp_ctx);
1672                 return -1;
1673         }
1674
1675         if (ips->ips[i].pnn == options.pnn) {
1676                 ret = find_other_host_for_public_ip(ctdb, &addr);
1677                 if (ret != -1) {
1678                         if (move_ip(ctdb, &addr, ret) != 0) {
1679                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1680                                 return -1;
1681                         }
1682                 }
1683         }
1684
1685         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1686         if (ret != 0) {
1687                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1688                 talloc_free(tmp_ctx);
1689                 return ret;
1690         }
1691
1692         talloc_free(tmp_ctx);
1693         return 0;
1694 }
1695
1696 /*
1697   kill a tcp connection
1698  */
1699 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1700 {
1701         int ret;
1702         struct ctdb_control_killtcp killtcp;
1703
1704         if (argc < 2) {
1705                 usage();
1706         }
1707
1708         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1709                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1710                 return -1;
1711         }
1712
1713         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1714                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1715                 return -1;
1716         }
1717
1718         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1719         if (ret != 0) {
1720                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1721                 return ret;
1722         }
1723
1724         return 0;
1725 }
1726
1727
1728 /*
1729   send a gratious arp
1730  */
1731 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1732 {
1733         int ret;
1734         ctdb_sock_addr addr;
1735
1736         if (argc < 2) {
1737                 usage();
1738         }
1739
1740         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1741                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1742                 return -1;
1743         }
1744
1745         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1746         if (ret != 0) {
1747                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1748                 return ret;
1749         }
1750
1751         return 0;
1752 }
1753
1754 /*
1755   register a server id
1756  */
1757 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1758 {
1759         int ret;
1760         struct ctdb_server_id server_id;
1761
1762         if (argc < 3) {
1763                 usage();
1764         }
1765
1766         server_id.pnn       = strtoul(argv[0], NULL, 0);
1767         server_id.type      = strtoul(argv[1], NULL, 0);
1768         server_id.server_id = strtoul(argv[2], NULL, 0);
1769
1770         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1771         if (ret != 0) {
1772                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1773                 return ret;
1774         }
1775         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1776         sleep(999);
1777         return -1;
1778 }
1779
1780 /*
1781   unregister a server id
1782  */
1783 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1784 {
1785         int ret;
1786         struct ctdb_server_id server_id;
1787
1788         if (argc < 3) {
1789                 usage();
1790         }
1791
1792         server_id.pnn       = strtoul(argv[0], NULL, 0);
1793         server_id.type      = strtoul(argv[1], NULL, 0);
1794         server_id.server_id = strtoul(argv[2], NULL, 0);
1795
1796         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1799                 return ret;
1800         }
1801         return -1;
1802 }
1803
1804 /*
1805   check if a server id exists
1806  */
1807 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1808 {
1809         uint32_t status;
1810         int ret;
1811         struct ctdb_server_id server_id;
1812
1813         if (argc < 3) {
1814                 usage();
1815         }
1816
1817         server_id.pnn       = strtoul(argv[0], NULL, 0);
1818         server_id.type      = strtoul(argv[1], NULL, 0);
1819         server_id.server_id = strtoul(argv[2], NULL, 0);
1820
1821         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1822         if (ret != 0) {
1823                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1824                 return ret;
1825         }
1826
1827         if (status) {
1828                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1829         } else {
1830                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1831         }
1832         return 0;
1833 }
1834
1835 /*
1836   get a list of all server ids that are registered on a node
1837  */
1838 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1839 {
1840         int i, ret;
1841         struct ctdb_server_id_list *server_ids;
1842
1843         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1846                 return ret;
1847         }
1848
1849         for (i=0; i<server_ids->num; i++) {
1850                 printf("Server id %d:%d:%d\n", 
1851                         server_ids->server_ids[i].pnn, 
1852                         server_ids->server_ids[i].type, 
1853                         server_ids->server_ids[i].server_id); 
1854         }
1855
1856         return -1;
1857 }
1858
1859 /*
1860   send a tcp tickle ack
1861  */
1862 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1863 {
1864         int ret;
1865         ctdb_sock_addr  src, dst;
1866
1867         if (argc < 2) {
1868                 usage();
1869         }
1870
1871         if (!parse_ip_port(argv[0], &src)) {
1872                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1873                 return -1;
1874         }
1875
1876         if (!parse_ip_port(argv[1], &dst)) {
1877                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1878                 return -1;
1879         }
1880
1881         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1882         if (ret==0) {
1883                 return 0;
1884         }
1885         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1886
1887         return -1;
1888 }
1889
1890
1891 /*
1892   display public ip status
1893  */
1894 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1895 {
1896         int i, ret;
1897         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1898         struct ctdb_all_public_ips *ips;
1899
1900         if (options.pnn == CTDB_BROADCAST_ALL) {
1901                 /* read the list of public ips from all nodes */
1902                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1903         } else {
1904                 /* read the public ip list from this node */
1905                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1906         }
1907         if (ret != 0) {
1908                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1909                 talloc_free(tmp_ctx);
1910                 return ret;
1911         }
1912
1913         if (options.machinereadable){
1914                 printf(":Public IP:Node:");
1915                 if (options.verbose){
1916                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
1917                 }
1918                 printf("\n");
1919         } else {
1920                 if (options.pnn == CTDB_BROADCAST_ALL) {
1921                         printf("Public IPs on ALL nodes\n");
1922                 } else {
1923                         printf("Public IPs on node %u\n", options.pnn);
1924                 }
1925         }
1926
1927         for (i=1;i<=ips->num;i++) {
1928                 struct ctdb_control_public_ip_info *info = NULL;
1929                 int32_t pnn;
1930                 char *aciface = NULL;
1931                 char *avifaces = NULL;
1932                 char *cifaces = NULL;
1933
1934                 if (options.pnn == CTDB_BROADCAST_ALL) {
1935                         pnn = ips->ips[ips->num-i].pnn;
1936                 } else {
1937                         pnn = options.pnn;
1938                 }
1939
1940                 if (pnn != -1) {
1941                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1942                                                    &ips->ips[ips->num-i].addr, &info);
1943                 } else {
1944                         ret = -1;
1945                 }
1946
1947                 if (ret == 0) {
1948                         int j;
1949                         for (j=0; j < info->num; j++) {
1950                                 if (cifaces == NULL) {
1951                                         cifaces = talloc_strdup(info,
1952                                                                 info->ifaces[j].name);
1953                                 } else {
1954                                         cifaces = talloc_asprintf_append(cifaces,
1955                                                                          ",%s",
1956                                                                          info->ifaces[j].name);
1957                                 }
1958
1959                                 if (info->active_idx == j) {
1960                                         aciface = info->ifaces[j].name;
1961                                 }
1962
1963                                 if (info->ifaces[j].link_state == 0) {
1964                                         continue;
1965                                 }
1966
1967                                 if (avifaces == NULL) {
1968                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1969                                 } else {
1970                                         avifaces = talloc_asprintf_append(avifaces,
1971                                                                           ",%s",
1972                                                                           info->ifaces[j].name);
1973                                 }
1974                         }
1975                 }
1976
1977                 if (options.machinereadable){
1978                         printf(":%s:%d:",
1979                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1980                                 ips->ips[ips->num-i].pnn);
1981                         if (options.verbose){
1982                                 printf("%s:%s:%s:",
1983                                         aciface?aciface:"",
1984                                         avifaces?avifaces:"",
1985                                         cifaces?cifaces:"");
1986                         }
1987                         printf("\n");
1988                 } else {
1989                         if (options.verbose) {
1990                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1991                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1992                                         ips->ips[ips->num-i].pnn,
1993                                         aciface?aciface:"",
1994                                         avifaces?avifaces:"",
1995                                         cifaces?cifaces:"");
1996                         } else {
1997                                 printf("%s %d\n",
1998                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1999                                         ips->ips[ips->num-i].pnn);
2000                         }
2001                 }
2002                 talloc_free(info);
2003         }
2004
2005         talloc_free(tmp_ctx);
2006         return 0;
2007 }
2008
2009 /*
2010   public ip info
2011  */
2012 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2013 {
2014         int i, ret;
2015         ctdb_sock_addr addr;
2016         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2017         struct ctdb_control_public_ip_info *info;
2018
2019         if (argc != 1) {
2020                 talloc_free(tmp_ctx);
2021                 usage();
2022         }
2023
2024         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2025                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2026                 return -1;
2027         }
2028
2029         /* read the public ip info from this node */
2030         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2031                                            tmp_ctx, &addr, &info);
2032         if (ret != 0) {
2033                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2034                                   argv[0], options.pnn));
2035                 talloc_free(tmp_ctx);
2036                 return ret;
2037         }
2038
2039         printf("Public IP[%s] info on node %u\n",
2040                ctdb_addr_to_str(&info->ip.addr),
2041                options.pnn);
2042
2043         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2044                ctdb_addr_to_str(&info->ip.addr),
2045                info->ip.pnn, info->num);
2046
2047         for (i=0; i<info->num; i++) {
2048                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2049
2050                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2051                        i+1, info->ifaces[i].name,
2052                        info->ifaces[i].link_state?"up":"down",
2053                        (unsigned int)info->ifaces[i].references,
2054                        (i==info->active_idx)?" (active)":"");
2055         }
2056
2057         talloc_free(tmp_ctx);
2058         return 0;
2059 }
2060
2061 /*
2062   display interfaces status
2063  */
2064 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2065 {
2066         int i, ret;
2067         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2068         struct ctdb_control_get_ifaces *ifaces;
2069
2070         /* read the public ip list from this node */
2071         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
2072                                    tmp_ctx, &ifaces);
2073         if (ret != 0) {
2074                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2075                                   options.pnn));
2076                 talloc_free(tmp_ctx);
2077                 return ret;
2078         }
2079
2080         if (options.machinereadable){
2081                 printf(":Name:LinkStatus:References:\n");
2082         } else {
2083                 printf("Interfaces on node %u\n", options.pnn);
2084         }
2085
2086         for (i=0; i<ifaces->num; i++) {
2087                 if (options.machinereadable){
2088                         printf(":%s:%s:%u\n",
2089                                ifaces->ifaces[i].name,
2090                                ifaces->ifaces[i].link_state?"1":"0",
2091                                (unsigned int)ifaces->ifaces[i].references);
2092                 } else {
2093                         printf("name:%s link:%s references:%u\n",
2094                                ifaces->ifaces[i].name,
2095                                ifaces->ifaces[i].link_state?"up":"down",
2096                                (unsigned int)ifaces->ifaces[i].references);
2097                 }
2098         }
2099
2100         talloc_free(tmp_ctx);
2101         return 0;
2102 }
2103
2104
2105 /*
2106   set link status of an interface
2107  */
2108 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2109 {
2110         int ret;
2111         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2112         struct ctdb_control_iface_info info;
2113
2114         ZERO_STRUCT(info);
2115
2116         if (argc != 2) {
2117                 usage();
2118         }
2119
2120         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2121                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2122                                   argv[0]));
2123                 talloc_free(tmp_ctx);
2124                 return -1;
2125         }
2126         strcpy(info.name, argv[0]);
2127
2128         if (strcmp(argv[1], "up") == 0) {
2129                 info.link_state = 1;
2130         } else if (strcmp(argv[1], "down") == 0) {
2131                 info.link_state = 0;
2132         } else {
2133                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2134                                   argv[1]));
2135                 talloc_free(tmp_ctx);
2136                 return -1;
2137         }
2138
2139         /* read the public ip list from this node */
2140         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2141                                    tmp_ctx, &info);
2142         if (ret != 0) {
2143                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2144                                   argv[0], options.pnn));
2145                 talloc_free(tmp_ctx);
2146                 return ret;
2147         }
2148
2149         talloc_free(tmp_ctx);
2150         return 0;
2151 }
2152
2153 /*
2154   display pid of a ctdb daemon
2155  */
2156 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2157 {
2158         uint32_t pid;
2159         int ret;
2160
2161         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2162         if (ret != 0) {
2163                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2164                 return ret;
2165         }
2166         printf("Pid:%d\n", pid);
2167
2168         return 0;
2169 }
2170
2171 static uint32_t ipreallocate_finished;
2172
2173 /*
2174   handler for receiving the response to ipreallocate
2175 */
2176 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2177                              TDB_DATA data, void *private_data)
2178 {
2179         ipreallocate_finished = 1;
2180 }
2181
2182 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2183 {
2184         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2185
2186         event_add_timed(ctdb->ev, ctdb, 
2187                                 timeval_current_ofs(1, 0),
2188                                 ctdb_every_second, ctdb);
2189 }
2190
2191 /*
2192   ask the recovery daemon on the recovery master to perform a ip reallocation
2193  */
2194 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2195 {
2196         int i, ret;
2197         TDB_DATA data;
2198         struct takeover_run_reply rd;
2199         uint32_t recmaster;
2200         struct ctdb_node_map *nodemap=NULL;
2201         int retries=0;
2202         struct timeval tv = timeval_current();
2203
2204         /* we need some events to trigger so we can timeout and restart
2205            the loop
2206         */
2207         event_add_timed(ctdb->ev, ctdb, 
2208                                 timeval_current_ofs(1, 0),
2209                                 ctdb_every_second, ctdb);
2210
2211         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2212         if (rd.pnn == -1) {
2213                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
2214                 return -1;
2215         }
2216         rd.srvid = getpid();
2217
2218         /* register a message port for receiveing the reply so that we
2219            can receive the reply
2220         */
2221         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2222
2223         data.dptr = (uint8_t *)&rd;
2224         data.dsize = sizeof(rd);
2225
2226 again:
2227         /* check that there are valid nodes available */
2228         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2229                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2230                 return -1;
2231         }
2232         for (i=0; i<nodemap->num;i++) {
2233                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2234                         break;
2235                 }
2236         }
2237         if (i==nodemap->num) {
2238                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2239                 return 0;
2240         }
2241
2242
2243         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2246                 return ret;
2247         }
2248
2249         /* verify the node exists */
2250         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2251                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2252                 return -1;
2253         }
2254
2255
2256         /* check tha there are nodes available that can act as a recmaster */
2257         for (i=0; i<nodemap->num; i++) {
2258                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2259                         continue;
2260                 }
2261                 break;
2262         }
2263         if (i == nodemap->num) {
2264                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2265                 return 0;
2266         }
2267
2268         /* verify the recovery master is not STOPPED, nor BANNED */
2269         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2270                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2271                 retries++;
2272                 sleep(1);
2273                 goto again;
2274         } 
2275
2276         
2277         /* verify the recovery master is not STOPPED, nor BANNED */
2278         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2279                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2280                 retries++;
2281                 sleep(1);
2282                 goto again;
2283         } 
2284
2285         ipreallocate_finished = 0;
2286         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2289                 return -1;
2290         }
2291
2292         tv = timeval_current();
2293         /* this loop will terminate when we have received the reply */
2294         while (timeval_elapsed(&tv) < 3.0) {
2295                 event_loop_once(ctdb->ev);
2296         }
2297         if (ipreallocate_finished == 1) {
2298                 return 0;
2299         }
2300
2301         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2302         retries++;
2303         sleep(1);
2304         goto again;
2305
2306         return 0;
2307 }
2308
2309
2310 /*
2311   disable a remote node
2312  */
2313 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2314 {
2315         int ret;
2316         struct ctdb_node_map *nodemap=NULL;
2317
2318         /* check if the node is already disabled */
2319         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2320                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2321                 exit(10);
2322         }
2323         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2324                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2325                 return 0;
2326         }
2327
2328         do {
2329                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2330                 if (ret != 0) {
2331                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2332                         return ret;
2333                 }
2334
2335                 sleep(1);
2336
2337                 /* read the nodemap and verify the change took effect */
2338                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2339                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2340                         exit(10);
2341                 }
2342
2343         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2344         ret = control_ipreallocate(ctdb, argc, argv);
2345         if (ret != 0) {
2346                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2347                 return ret;
2348         }
2349
2350         return 0;
2351 }
2352
2353 /*
2354   enable a disabled remote node
2355  */
2356 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2357 {
2358         int ret;
2359
2360         struct ctdb_node_map *nodemap=NULL;
2361
2362
2363         /* check if the node is already enabled */
2364         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2365                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2366                 exit(10);
2367         }
2368         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2369                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2370                 return 0;
2371         }
2372
2373         do {
2374                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2375                 if (ret != 0) {
2376                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2377                         return ret;
2378                 }
2379
2380                 sleep(1);
2381
2382                 /* read the nodemap and verify the change took effect */
2383                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2384                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2385                         exit(10);
2386                 }
2387
2388         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2389
2390         ret = control_ipreallocate(ctdb, argc, argv);
2391         if (ret != 0) {
2392                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2393                 return ret;
2394         }
2395
2396         return 0;
2397 }
2398
2399 /*
2400   stop a remote node
2401  */
2402 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2403 {
2404         int ret;
2405         struct ctdb_node_map *nodemap=NULL;
2406
2407         do {
2408                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2409                 if (ret != 0) {
2410                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2411                 }
2412         
2413                 sleep(1);
2414
2415                 /* read the nodemap and verify the change took effect */
2416                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2417                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2418                         exit(10);
2419                 }
2420
2421         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2422         ret = control_ipreallocate(ctdb, argc, argv);
2423         if (ret != 0) {
2424                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2425                 return ret;
2426         }
2427
2428         return 0;
2429 }
2430
2431 /*
2432   restart a stopped remote node
2433  */
2434 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2435 {
2436         int ret;
2437
2438         struct ctdb_node_map *nodemap=NULL;
2439
2440         do {
2441                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2442                 if (ret != 0) {
2443                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2444                         return ret;
2445                 }
2446         
2447                 sleep(1);
2448
2449                 /* read the nodemap and verify the change took effect */
2450                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2451                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2452                         exit(10);
2453                 }
2454
2455         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2456         ret = control_ipreallocate(ctdb, argc, argv);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2459                 return ret;
2460         }
2461
2462         return 0;
2463 }
2464
2465 static uint32_t get_generation(struct ctdb_context *ctdb)
2466 {
2467         struct ctdb_vnn_map *vnnmap=NULL;
2468         int ret;
2469
2470         /* wait until the recmaster is not in recovery mode */
2471         while (1) {
2472                 uint32_t recmode, recmaster;
2473                 
2474                 if (vnnmap != NULL) {
2475                         talloc_free(vnnmap);
2476                         vnnmap = NULL;
2477                 }
2478
2479                 /* get the recmaster */
2480                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2481                 if (ret != 0) {
2482                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2483                         exit(10);
2484                 }
2485
2486                 /* get recovery mode */
2487                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2488                 if (ret != 0) {
2489                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2490                         exit(10);
2491                 }
2492
2493                 /* get the current generation number */
2494                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2495                 if (ret != 0) {
2496                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2497                         exit(10);
2498                 }
2499
2500                 if ((recmode == CTDB_RECOVERY_NORMAL)
2501                 &&  (vnnmap->generation != 1)){
2502                         return vnnmap->generation;
2503                 }
2504                 sleep(1);
2505         }
2506 }
2507
2508 /*
2509   ban a node from the cluster
2510  */
2511 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2512 {
2513         int ret;
2514         struct ctdb_node_map *nodemap=NULL;
2515         struct ctdb_ban_time bantime;
2516
2517         if (argc < 1) {
2518                 usage();
2519         }
2520         
2521         /* verify the node exists */
2522         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2523         if (ret != 0) {
2524                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2525                 return ret;
2526         }
2527
2528         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2529                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2530                 return -1;
2531         }
2532
2533         bantime.pnn  = options.pnn;
2534         bantime.time = strtoul(argv[0], NULL, 0);
2535
2536         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2537         if (ret != 0) {
2538                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2539                 return -1;
2540         }       
2541
2542         ret = control_ipreallocate(ctdb, argc, argv);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2545                 return ret;
2546         }
2547
2548         return 0;
2549 }
2550
2551
2552 /*
2553   unban a node from the cluster
2554  */
2555 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2556 {
2557         int ret;
2558         struct ctdb_node_map *nodemap=NULL;
2559         struct ctdb_ban_time bantime;
2560
2561         /* verify the node exists */
2562         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2563         if (ret != 0) {
2564                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2565                 return ret;
2566         }
2567
2568         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2569                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2570                 return -1;
2571         }
2572
2573         bantime.pnn  = options.pnn;
2574         bantime.time = 0;
2575
2576         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2577         if (ret != 0) {
2578                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2579                 return -1;
2580         }       
2581
2582         ret = control_ipreallocate(ctdb, argc, argv);
2583         if (ret != 0) {
2584                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2585                 return ret;
2586         }
2587
2588         return 0;
2589 }
2590
2591
2592 /*
2593   show ban information for a node
2594  */
2595 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2596 {
2597         int ret;
2598         struct ctdb_node_map *nodemap=NULL;
2599         struct ctdb_ban_time *bantime;
2600
2601         /* verify the node exists */
2602         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2603         if (ret != 0) {
2604                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2605                 return ret;
2606         }
2607
2608         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2609         if (ret != 0) {
2610                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2611                 return -1;
2612         }       
2613
2614         if (bantime->time == 0) {
2615                 printf("Node %u is not banned\n", bantime->pnn);
2616         } else {
2617                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2618         }
2619
2620         return 0;
2621 }
2622
2623 /*
2624   shutdown a daemon
2625  */
2626 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2627 {
2628         int ret;
2629
2630         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2631         if (ret != 0) {
2632                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2633                 return ret;
2634         }
2635
2636         return 0;
2637 }
2638
2639 /*
2640   trigger a recovery
2641  */
2642 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2643 {
2644         int ret;
2645         uint32_t generation, next_generation;
2646
2647         /* record the current generation number */
2648         generation = get_generation(ctdb);
2649
2650         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2651         if (ret != 0) {
2652                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2653                 return ret;
2654         }
2655
2656         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2657         if (ret != 0) {
2658                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2659                 return ret;
2660         }
2661
2662         /* wait until we are in a new generation */
2663         while (1) {
2664                 next_generation = get_generation(ctdb);
2665                 if (next_generation != generation) {
2666                         return 0;
2667                 }
2668                 sleep(1);
2669         }
2670
2671         return 0;
2672 }
2673
2674
2675 /*
2676   display monitoring mode of a remote node
2677  */
2678 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2679 {
2680         uint32_t monmode;
2681         int ret;
2682
2683         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2684         if (ret != 0) {
2685                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2686                 return ret;
2687         }
2688         if (!options.machinereadable){
2689                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2690         } else {
2691                 printf(":mode:\n");
2692                 printf(":%d:\n",monmode);
2693         }
2694         return 0;
2695 }
2696
2697
2698 /*
2699   display capabilities of a remote node
2700  */
2701 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2702 {
2703         uint32_t capabilities;
2704         int ret;
2705
2706         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2707         if (ret != 0) {
2708                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2709                 return ret;
2710         }
2711         
2712         if (!options.machinereadable){
2713                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2714                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2715                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2716                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2717         } else {
2718                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2719                 printf(":%d:%d:%d:%d:\n",
2720                         !!(capabilities&CTDB_CAP_RECMASTER),
2721                         !!(capabilities&CTDB_CAP_LMASTER),
2722                         !!(capabilities&CTDB_CAP_LVS),
2723                         !!(capabilities&CTDB_CAP_NATGW));
2724         }
2725         return 0;
2726 }
2727
2728 /*
2729   display lvs configuration
2730  */
2731 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2732 {
2733         uint32_t *capabilities;
2734         struct ctdb_node_map *nodemap=NULL;
2735         int i, ret;
2736         int healthy_count = 0;
2737
2738         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2739         if (ret != 0) {
2740                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2741                 return ret;
2742         }
2743
2744         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2745         CTDB_NO_MEMORY(ctdb, capabilities);
2746         
2747         /* collect capabilities for all connected nodes */
2748         for (i=0; i<nodemap->num; i++) {
2749                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2750                         continue;
2751                 }
2752                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2753                         continue;
2754                 }
2755         
2756                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2757                 if (ret != 0) {
2758                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2759                         return ret;
2760                 }
2761
2762                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2763                         continue;
2764                 }
2765
2766                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2767                         healthy_count++;
2768                 }
2769         }
2770
2771         /* Print all LVS nodes */
2772         for (i=0; i<nodemap->num; i++) {
2773                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2774                         continue;
2775                 }
2776                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2777                         continue;
2778                 }
2779                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2780                         continue;
2781                 }
2782
2783                 if (healthy_count != 0) {
2784                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2785                                 continue;
2786                         }
2787                 }
2788
2789                 printf("%d:%s\n", i, 
2790                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2791         }
2792
2793         return 0;
2794 }
2795
2796 /*
2797   display who is the lvs master
2798  */
2799 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2800 {
2801         uint32_t *capabilities;
2802         struct ctdb_node_map *nodemap=NULL;
2803         int i, ret;
2804         int healthy_count = 0;
2805
2806         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2807         if (ret != 0) {
2808                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2809                 return ret;
2810         }
2811
2812         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2813         CTDB_NO_MEMORY(ctdb, capabilities);
2814         
2815         /* collect capabilities for all connected nodes */
2816         for (i=0; i<nodemap->num; i++) {
2817                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2818                         continue;
2819                 }
2820                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2821                         continue;
2822                 }
2823         
2824                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2825                 if (ret != 0) {
2826                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2827                         return ret;
2828                 }
2829
2830                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2831                         continue;
2832                 }
2833
2834                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2835                         healthy_count++;
2836                 }
2837         }
2838
2839         /* find and show the lvsmaster */
2840         for (i=0; i<nodemap->num; i++) {
2841                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2842                         continue;
2843                 }
2844                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2845                         continue;
2846                 }
2847                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2848                         continue;
2849                 }
2850
2851                 if (healthy_count != 0) {
2852                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2853                                 continue;
2854                         }
2855                 }
2856
2857                 if (options.machinereadable){
2858                         printf("%d\n", i);
2859                 } else {
2860                         printf("Node %d is LVS master\n", i);
2861                 }
2862                 return 0;
2863         }
2864
2865         printf("There is no LVS master\n");
2866         return -1;
2867 }
2868
2869 /*
2870   disable monitoring on a  node
2871  */
2872 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2873 {
2874         
2875         int ret;
2876
2877         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2878         if (ret != 0) {
2879                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2880                 return ret;
2881         }
2882         printf("Monitoring mode:%s\n","DISABLED");
2883
2884         return 0;
2885 }
2886
2887 /*
2888   enable monitoring on a  node
2889  */
2890 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2891 {
2892         
2893         int ret;
2894
2895         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2896         if (ret != 0) {
2897                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2898                 return ret;
2899         }
2900         printf("Monitoring mode:%s\n","ACTIVE");
2901
2902         return 0;
2903 }
2904
2905 /*
2906   display remote list of keys/data for a db
2907  */
2908 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2909 {
2910         const char *db_name;
2911         struct ctdb_db_context *ctdb_db;
2912         int ret;
2913         bool persistent;
2914
2915         if (argc < 1) {
2916                 usage();
2917         }
2918
2919         db_name = argv[0];
2920
2921
2922         if (db_exists(ctdb, db_name, &persistent)) {
2923                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2924                 return -1;
2925         }
2926
2927         ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
2928
2929         if (ctdb_db == NULL) {
2930                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2931                 return -1;
2932         }
2933
2934         /* traverse and dump the cluster tdb */
2935         ret = ctdb_dump_db(ctdb_db, stdout);
2936         if (ret == -1) {
2937                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2938                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2939                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2940                                   db_name));
2941                 return -1;
2942         }
2943         talloc_free(ctdb_db);
2944
2945         printf("Dumped %d records\n", ret);
2946         return 0;
2947 }
2948
2949
2950 /*
2951   fetch a record from a persistent database
2952  */
2953 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
2954 {
2955         const char *db_name;
2956         struct ctdb_db_context *ctdb_db;
2957         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2958         struct ctdb_transaction_handle *h;
2959         TDB_DATA key, data;
2960         int fd, ret;
2961
2962         if (argc < 2) {
2963                 talloc_free(tmp_ctx);
2964                 usage();
2965         }
2966
2967         db_name = argv[0];
2968
2969
2970         if (db_exists(ctdb, db_name, NULL)) {
2971                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2972                 talloc_free(tmp_ctx);
2973                 return -1;
2974         }
2975
2976         ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
2977
2978         if (ctdb_db == NULL) {
2979                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2980                 talloc_free(tmp_ctx);
2981                 return -1;
2982         }
2983
2984         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
2985         if (h == NULL) {
2986                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
2987                 talloc_free(tmp_ctx);
2988                 return -1;
2989         }
2990
2991         key.dptr  = discard_const(argv[1]);
2992         key.dsize = strlen(argv[1]);
2993         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
2994         if (ret != 0) {
2995                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
2996                 talloc_free(tmp_ctx);
2997                 return -1;
2998         }
2999
3000         if (data.dsize == 0 || data.dptr == NULL) {
3001                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3002                 talloc_free(tmp_ctx);
3003                 return -1;
3004         }
3005
3006         if (argc == 3) {
3007           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3008                 if (fd == -1) {
3009                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3010                         talloc_free(tmp_ctx);
3011                         return -1;
3012                 }
3013                 write(fd, data.dptr, data.dsize);
3014                 close(fd);
3015         } else {
3016                 write(1, data.dptr, data.dsize);
3017         }
3018
3019         /* abort the transaction */
3020         talloc_free(h);
3021
3022
3023         talloc_free(tmp_ctx);
3024         return 0;
3025 }
3026
3027 /*
3028   fetch a record from a tdb-file
3029  */
3030 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3031 {
3032         const char *tdb_file;
3033         TDB_CONTEXT *tdb;
3034         TDB_DATA key, data;
3035         int fd;
3036
3037         if (argc < 2) {
3038                 usage();
3039         }
3040
3041         tdb_file = argv[0];
3042
3043         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3044         if (tdb == NULL) {
3045                 DEBUG(DEBUG_ERR,("Failed to open TDB file %s\n", tdb_file));
3046                 return -1;
3047         }
3048
3049         key.dptr  = discard_const(argv[1]);
3050         key.dsize = strlen(argv[1]);
3051         data = tdb_fetch(tdb, key);
3052         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3053                 DEBUG(DEBUG_ERR,("Failed to read record %s from tdb %s\n", argv[1], tdb_file));
3054                 tdb_close(tdb);
3055                 return -1;
3056         }
3057
3058         tdb_close(tdb);
3059
3060         if (argc == 3) {
3061           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3062                 if (fd == -1) {
3063                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3064                         return -1;
3065                 }
3066                 write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3067                 close(fd);
3068         } else {
3069                 write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3070         }
3071
3072         return 0;
3073 }
3074
3075 /*
3076   write a record to a persistent database
3077  */
3078 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3079 {
3080         const char *db_name;
3081         struct ctdb_db_context *ctdb_db;
3082         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3083         struct ctdb_transaction_handle *h;
3084         struct stat st;
3085         TDB_DATA key, data;
3086         int fd, ret;
3087
3088         if (argc < 3) {
3089                 talloc_free(tmp_ctx);
3090                 usage();
3091         }
3092
3093         fd = open(argv[2], O_RDONLY);
3094         if (fd == -1) {
3095                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3096                 talloc_free(tmp_ctx);
3097                 return -1;
3098         }
3099         
3100         ret = fstat(fd, &st);
3101         if (ret == -1) {
3102                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3103                 close(fd);
3104                 talloc_free(tmp_ctx);
3105                 return -1;
3106         }
3107
3108         if (!S_ISREG(st.st_mode)) {
3109                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3110                 close(fd);
3111                 talloc_free(tmp_ctx);
3112                 return -1;
3113         }
3114
3115         data.dsize = st.st_size;
3116         if (data.dsize == 0) {
3117                 data.dptr  = NULL;
3118         } else {
3119                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3120                 if (data.dptr == NULL) {
3121                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3122                         close(fd);
3123                         talloc_free(tmp_ctx);
3124                         return -1;
3125                 }
3126                 ret = read(fd, data.dptr, data.dsize);
3127                 if (ret != data.dsize) {
3128                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3129                         close(fd);
3130                         talloc_free(tmp_ctx);
3131                         return -1;
3132                 }
3133         }
3134         close(fd);
3135
3136
3137         db_name = argv[0];
3138
3139         ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
3140
3141         if (ctdb_db == NULL) {
3142                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3143                 talloc_free(tmp_ctx);
3144                 return -1;
3145         }
3146
3147         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3148         if (h == NULL) {
3149                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3150                 talloc_free(tmp_ctx);
3151                 return -1;
3152         }
3153
3154         key.dptr  = discard_const(argv[1]);
3155         key.dsize = strlen(argv[1]);
3156         ret = ctdb_transaction_store(h, key, data);
3157         if (ret != 0) {
3158                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3159                 talloc_free(tmp_ctx);
3160                 return -1;
3161         }
3162
3163         ret = ctdb_transaction_commit(h);
3164         if (ret != 0) {
3165                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3166                 talloc_free(tmp_ctx);
3167                 return -1;
3168         }
3169
3170
3171         talloc_free(tmp_ctx);
3172         return 0;
3173 }
3174
3175 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3176                              TDB_DATA data, void *private_data)
3177 {
3178         DEBUG(DEBUG_ERR,("Log data received\n"));
3179         if (data.dsize > 0) {
3180                 printf("%s", data.dptr);
3181         }
3182
3183         exit(0);
3184 }
3185
3186 /*
3187   display a list of log messages from the in memory ringbuffer
3188  */
3189 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3190 {
3191         int ret;
3192         int32_t res;
3193         struct ctdb_get_log_addr log_addr;
3194         TDB_DATA data;
3195         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3196         char *errmsg;
3197         struct timeval tv;
3198
3199         if (argc != 1) {
3200                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3201                 talloc_free(tmp_ctx);
3202                 return -1;
3203         }
3204
3205         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3206         log_addr.srvid = getpid();
3207         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3208                 log_addr.level = get_debug_by_desc(argv[0]);
3209         } else {
3210                 log_addr.level = strtol(argv[0], NULL, 0);
3211         }
3212
3213
3214         data.dptr = (unsigned char *)&log_addr;
3215         data.dsize = sizeof(log_addr);
3216
3217         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3218
3219         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3220         sleep(1);
3221
3222         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3223
3224         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
3225                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3226         if (ret != 0 || res != 0) {
3227                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
3228                 talloc_free(tmp_ctx);
3229                 return -1;
3230         }
3231
3232
3233         tv = timeval_current();
3234         /* this loop will terminate when we have received the reply */
3235         while (timeval_elapsed(&tv) < 3.0) {    
3236                 event_loop_once(ctdb->ev);
3237         }
3238
3239         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
3240
3241         talloc_free(tmp_ctx);
3242         return 0;
3243 }
3244
3245 /*
3246   clear the in memory log area
3247  */
3248 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
3249 {
3250         int ret;
3251         int32_t res;
3252         char *errmsg;
3253         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3254
3255         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
3256                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
3257         if (ret != 0 || res != 0) {
3258                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
3259                 talloc_free(tmp_ctx);
3260                 return -1;
3261         }
3262
3263         talloc_free(tmp_ctx);
3264         return 0;
3265 }
3266
3267
3268
3269 /*
3270   display a list of the databases on a remote ctdb
3271  */
3272 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
3273 {
3274         int i, ret;
3275         struct ctdb_dbid_map *dbmap=NULL;
3276
3277         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3278         if (ret != 0) {
3279                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3280                 return ret;
3281         }
3282
3283         if(options.machinereadable){
3284                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
3285                 for(i=0;i<dbmap->num;i++){
3286                         const char *path;
3287                         const char *name;
3288                         const char *health;
3289                         bool persistent;
3290
3291                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
3292                                             dbmap->dbs[i].dbid, ctdb, &path);
3293                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3294                                             dbmap->dbs[i].dbid, ctdb, &name);
3295                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3296                                               dbmap->dbs[i].dbid, ctdb, &health);
3297                         persistent = dbmap->dbs[i].persistent;
3298                         printf(":0x%08X:%s:%s:%d:%d:\n",
3299                                dbmap->dbs[i].dbid, name, path,
3300                                !!(persistent), !!(health));
3301                 }
3302                 return 0;
3303         }
3304
3305         printf("Number of databases:%d\n", dbmap->num);
3306         for(i=0;i<dbmap->num;i++){
3307                 const char *path;
3308                 const char *name;
3309                 const char *health;
3310                 bool persistent;
3311
3312                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3313                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3314                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3315                 persistent = dbmap->dbs[i].persistent;
3316                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
3317                        dbmap->dbs[i].dbid, name, path,
3318                        persistent?" PERSISTENT":"",
3319                        health?" UNHEALTHY":"");
3320         }
3321
3322         return 0;
3323 }
3324
3325 /*
3326   display the status of a database on a remote ctdb
3327  */
3328 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
3329 {
3330         int i, ret;
3331         struct ctdb_dbid_map *dbmap=NULL;
3332         const char *db_name;
3333
3334         if (argc < 1) {
3335                 usage();
3336         }
3337
3338         db_name = argv[0];
3339
3340         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3341         if (ret != 0) {
3342                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3343                 return ret;
3344         }
3345
3346         for(i=0;i<dbmap->num;i++){
3347                 const char *path;
3348                 const char *name;
3349                 const char *health;
3350                 bool persistent;
3351
3352                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3353                 if (strcmp(name, db_name) != 0) {
3354                         continue;
3355                 }
3356
3357                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3358                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3359                 persistent = dbmap->dbs[i].persistent;
3360                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
3361                        dbmap->dbs[i].dbid, name, path,
3362                        persistent?"yes":"no",
3363                        health?health:"OK");
3364                 return 0;
3365         }
3366
3367         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
3368         return 0;
3369 }
3370
3371 /*
3372   check if the local node is recmaster or not
3373   it will return 1 if this node is the recmaster and 0 if it is not
3374   or if the local ctdb daemon could not be contacted
3375  */
3376 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3377 {
3378         uint32_t mypnn, recmaster;
3379         int ret;
3380
3381         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
3382         if (mypnn == -1) {
3383                 printf("Failed to get pnn of node\n");
3384                 return 1;
3385         }
3386
3387         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
3388         if (ret != 0) {
3389                 printf("Failed to get the recmaster\n");
3390                 return 1;
3391         }
3392
3393         if (recmaster != mypnn) {
3394                 printf("this node is not the recmaster\n");
3395                 return 1;
3396         }
3397
3398         printf("this node is the recmaster\n");
3399         return 0;
3400 }
3401
3402 /*
3403   ping a node
3404  */
3405 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
3406 {
3407         int ret;
3408         struct timeval tv = timeval_current();
3409         ret = ctdb_ctrl_ping(ctdb, options.pnn);
3410         if (ret == -1) {
3411                 printf("Unable to get ping response from node %u\n", options.pnn);
3412                 return -1;
3413         } else {
3414                 printf("response from %u time=%.6f sec  (%d clients)\n", 
3415                        options.pnn, timeval_elapsed(&tv), ret);
3416         }
3417         return 0;
3418 }
3419
3420
3421 /*
3422   get a tunable
3423  */
3424 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3425 {
3426         const char *name;
3427         uint32_t value;
3428         int ret;
3429
3430         if (argc < 1) {
3431                 usage();
3432         }
3433
3434         name = argv[0];
3435         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
3436         if (ret == -1) {
3437                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3438                 return -1;
3439         }
3440
3441         printf("%-19s = %u\n", name, value);
3442         return 0;
3443 }
3444
3445 /*
3446   set a tunable
3447  */
3448 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3449 {
3450         const char *name;
3451         uint32_t value;
3452         int ret;
3453
3454         if (argc < 2) {
3455                 usage();
3456         }
3457
3458         name = argv[0];
3459         value = strtoul(argv[1], NULL, 0);
3460
3461         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3462         if (ret == -1) {
3463                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3464                 return -1;
3465         }
3466         return 0;
3467 }
3468
3469 /*
3470   list all tunables
3471  */
3472 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3473 {
3474         uint32_t count;
3475         const char **list;
3476         int ret, i;
3477
3478         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3479         if (ret == -1) {
3480                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3481                 return -1;
3482         }
3483
3484         for (i=0;i<count;i++) {
3485                 control_getvar(ctdb, 1, &list[i]);
3486         }
3487
3488         talloc_free(list);
3489         
3490         return 0;
3491 }
3492
3493 /*
3494   display debug level on a node
3495  */
3496 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3497 {
3498         int ret;
3499         int32_t level;
3500
3501         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3502         if (ret != 0) {
3503                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3504                 return ret;
3505         } else {
3506                 if (options.machinereadable){
3507                         printf(":Name:Level:\n");
3508                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3509                 } else {
3510                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3511                 }
3512         }
3513         return 0;
3514 }
3515
3516 /*
3517   display reclock file of a node
3518  */
3519 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3520 {
3521         int ret;
3522         const char *reclock;
3523
3524         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3525         if (ret != 0) {
3526                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3527                 return ret;
3528         } else {
3529                 if (options.machinereadable){
3530                         if (reclock != NULL) {
3531                                 printf("%s", reclock);
3532                         }
3533                 } else {
3534                         if (reclock == NULL) {
3535                                 printf("No reclock file used.\n");
3536                         } else {
3537                                 printf("Reclock file:%s\n", reclock);
3538                         }
3539                 }
3540         }
3541         return 0;
3542 }
3543
3544 /*
3545   set the reclock file of a node
3546  */
3547 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3548 {
3549         int ret;
3550         const char *reclock;
3551
3552         if (argc == 0) {
3553                 reclock = NULL;
3554         } else if (argc == 1) {
3555                 reclock = argv[0];
3556         } else {
3557                 usage();
3558         }
3559
3560         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3561         if (ret != 0) {
3562                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3563                 return ret;
3564         }
3565         return 0;
3566 }
3567
3568 /*
3569   set the natgw state on/off
3570  */
3571 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3572 {
3573         int ret;
3574         uint32_t natgwstate;
3575
3576         if (argc == 0) {
3577                 usage();
3578         }
3579
3580         if (!strcmp(argv[0], "on")) {
3581                 natgwstate = 1;
3582         } else if (!strcmp(argv[0], "off")) {
3583                 natgwstate = 0;
3584         } else {
3585                 usage();
3586         }
3587
3588         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3589         if (ret != 0) {
3590                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3591                 return ret;
3592         }
3593
3594         return 0;
3595 }
3596
3597 /*
3598   set the lmaster role on/off
3599  */
3600 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3601 {
3602         int ret;
3603         uint32_t lmasterrole;
3604
3605         if (argc == 0) {
3606                 usage();
3607         }
3608
3609         if (!strcmp(argv[0], "on")) {
3610                 lmasterrole = 1;
3611         } else if (!strcmp(argv[0], "off")) {
3612                 lmasterrole = 0;
3613         } else {
3614                 usage();
3615         }
3616
3617         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3618         if (ret != 0) {
3619                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3620                 return ret;
3621         }
3622
3623         return 0;
3624 }
3625
3626 /*
3627   set the recmaster role on/off
3628  */
3629 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3630 {
3631         int ret;
3632         uint32_t recmasterrole;
3633
3634         if (argc == 0) {
3635                 usage();
3636         }
3637
3638         if (!strcmp(argv[0], "on")) {
3639                 recmasterrole = 1;
3640         } else if (!strcmp(argv[0], "off")) {
3641                 recmasterrole = 0;
3642         } else {
3643                 usage();
3644         }
3645
3646         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3647         if (ret != 0) {
3648                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3649                 return ret;
3650         }
3651
3652         return 0;
3653 }
3654
3655 /*
3656   set debug level on a node or all nodes
3657  */
3658 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3659 {
3660         int i, ret;
3661         int32_t level;
3662
3663         if (argc == 0) {
3664                 printf("You must specify the debug level. Valid levels are:\n");
3665                 for (i=0; debug_levels[i].description != NULL; i++) {
3666                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3667                 }
3668
3669                 return 0;
3670         }
3671
3672         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3673                 level = get_debug_by_desc(argv[0]);
3674         } else {
3675                 level = strtol(argv[0], NULL, 0);
3676         }
3677
3678         for (i=0; debug_levels[i].description != NULL; i++) {
3679                 if (level == debug_levels[i].level) {
3680                         break;
3681                 }
3682         }
3683         if (debug_levels[i].description == NULL) {
3684                 printf("Invalid debug level, must be one of\n");
3685                 for (i=0; debug_levels[i].description != NULL; i++) {
3686                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3687                 }
3688                 return -1;
3689         }
3690
3691         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3692         if (ret != 0) {
3693                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3694         }
3695         return 0;
3696 }
3697
3698
3699 /*
3700   thaw a node
3701  */
3702 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3703 {
3704         int ret;
3705         uint32_t priority;
3706         
3707         if (argc == 1) {
3708                 priority = strtol(argv[0], NULL, 0);
3709         } else {
3710                 priority = 0;
3711         }
3712         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3713
3714         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3715         if (ret != 0) {
3716                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3717         }               
3718         return 0;
3719 }
3720
3721
3722 /*
3723   attach to a database
3724  */
3725 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3726 {
3727         const char *db_name;
3728         struct ctdb_db_context *ctdb_db;
3729
3730         if (argc < 1) {
3731                 usage();
3732         }
3733         db_name = argv[0];
3734
3735         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3736         if (ctdb_db == NULL) {
3737                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3738                 return -1;
3739         }
3740
3741         return 0;
3742 }
3743
3744 /*
3745   set db priority
3746  */
3747 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3748 {
3749         struct ctdb_db_priority db_prio;
3750         int ret;
3751
3752         if (argc < 2) {
3753                 usage();
3754         }
3755
3756         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3757         db_prio.priority = strtoul(argv[1], NULL, 0);
3758
3759         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3760         if (ret != 0) {
3761                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3762                 return -1;
3763         }
3764
3765         return 0;
3766 }
3767
3768 /*
3769   get db priority
3770  */
3771 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3772 {
3773         uint32_t db_id, priority;
3774         int ret;
3775
3776         if (argc < 1) {
3777                 usage();
3778         }
3779
3780         db_id = strtoul(argv[0], NULL, 0);
3781
3782         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3783         if (ret != 0) {
3784                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3785                 return -1;
3786         }
3787
3788         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3789
3790         return 0;
3791 }
3792
3793 /*
3794   run an eventscript on a node
3795  */
3796 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3797 {
3798         TDB_DATA data;
3799         int ret;
3800         int32_t res;
3801         char *errmsg;
3802         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3803
3804         if (argc != 1) {
3805                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3806                 return -1;
3807         }
3808
3809         data.dptr = (unsigned char *)discard_const(argv[0]);
3810         data.dsize = strlen((char *)data.dptr) + 1;
3811
3812         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3813
3814         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3815                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3816         if (ret != 0 || res != 0) {
3817                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3818                 talloc_free(tmp_ctx);
3819                 return -1;
3820         }
3821         talloc_free(tmp_ctx);
3822         return 0;
3823 }
3824
3825 #define DB_VERSION 1
3826 #define MAX_DB_NAME 64
3827 struct db_file_header {
3828         unsigned long version;
3829         time_t timestamp;
3830         unsigned long persistent;
3831         unsigned long size;
3832         const char name[MAX_DB_NAME];
3833 };
3834
3835 struct backup_data {
3836         struct ctdb_marshall_buffer *records;
3837         uint32_t len;
3838         uint32_t total;
3839         bool traverse_error;
3840 };
3841
3842 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3843 {
3844         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3845         struct ctdb_rec_data *rec;
3846
3847         /* add the record */
3848         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3849         if (rec == NULL) {
3850                 bd->traverse_error = true;
3851                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3852                 return -1;
3853         }
3854         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3855         if (bd->records == NULL) {
3856                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3857                 bd->traverse_error = true;
3858                 return -1;
3859         }
3860         bd->records->count++;
3861         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3862         bd->len += rec->length;
3863         talloc_free(rec);
3864
3865         bd->total++;
3866         return 0;
3867 }
3868
3869 /*
3870  * backup a database to a file 
3871  */
3872 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3873 {
3874         int i, ret;
3875         struct ctdb_dbid_map *dbmap=NULL;
3876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3877         struct db_file_header dbhdr;
3878         struct ctdb_db_context *ctdb_db;
3879         struct backup_data *bd;
3880         int fh = -1;
3881         int status = -1;
3882         const char *reason = NULL;
3883
3884         if (argc != 2) {
3885                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3886                 return -1;
3887         }
3888
3889         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3890         if (ret != 0) {
3891                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3892                 return ret;
3893         }
3894
3895         for(i=0;i<dbmap->num;i++){
3896                 const char *name;
3897
3898                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3899                 if(!strcmp(argv[0], name)){
3900                         talloc_free(discard_const(name));
3901                         break;
3902                 }
3903                 talloc_free(discard_const(name));
3904         }
3905         if (i == dbmap->num) {
3906                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3907                 talloc_free(tmp_ctx);
3908                 return -1;
3909         }
3910
3911         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3912                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3913         if (ret != 0) {
3914                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3915                                  argv[0]));
3916                 talloc_free(tmp_ctx);
3917                 return -1;
3918         }
3919         if (reason) {
3920                 uint32_t allow_unhealthy = 0;
3921
3922                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3923                                       "AllowUnhealthyDBRead",
3924                                       &allow_unhealthy);
3925
3926                 if (allow_unhealthy != 1) {
3927                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3928                                          argv[0], reason));
3929
3930                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3931                                          allow_unhealthy));
3932                         talloc_free(tmp_ctx);
3933                         return -1;
3934                 }
3935
3936                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3937                                      argv[0], argv[0]));
3938                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3939                                      "tunnable AllowUnhealthyDBRead = %u\n",
3940                                      allow_unhealthy));
3941         }
3942
3943         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3944         if (ctdb_db == NULL) {
3945                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3946                 talloc_free(tmp_ctx);
3947                 return -1;
3948         }
3949
3950
3951         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3952         if (ret == -1) {
3953                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3954                 talloc_free(tmp_ctx);
3955                 return -1;
3956         }
3957
3958
3959         bd = talloc_zero(tmp_ctx, struct backup_data);
3960         if (bd == NULL) {
3961                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3962                 talloc_free(tmp_ctx);
3963                 return -1;
3964         }
3965
3966         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3967         if (bd->records == NULL) {
3968                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3969                 talloc_free(tmp_ctx);
3970                 return -1;
3971         }
3972
3973         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3974         bd->records->db_id = ctdb_db->db_id;
3975         /* traverse the database collecting all records */
3976         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3977             bd->traverse_error) {
3978                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3979                 talloc_free(tmp_ctx);
3980                 return -1;              
3981         }
3982
3983         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3984
3985
3986         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3987         if (fh == -1) {
3988                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3989                 talloc_free(tmp_ctx);
3990                 return -1;
3991         }
3992
3993         dbhdr.version = DB_VERSION;
3994         dbhdr.timestamp = time(NULL);
3995         dbhdr.persistent = dbmap->dbs[i].persistent;
3996         dbhdr.size = bd->len;
3997         if (strlen(argv[0]) >= MAX_DB_NAME) {
3998                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3999                 goto done;
4000         }
4001         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
4002         ret = write(fh, &dbhdr, sizeof(dbhdr));
4003         if (ret == -1) {
4004                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4005                 goto done;
4006         }
4007         ret = write(fh, bd->records, bd->len);
4008         if (ret == -1) {
4009                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4010                 goto done;
4011         }
4012
4013         status = 0;
4014 done:
4015         if (fh != -1) {
4016                 ret = close(fh);
4017                 if (ret == -1) {
4018                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
4019                 }
4020         }
4021         talloc_free(tmp_ctx);
4022         return status;
4023 }
4024
4025 /*
4026  * restore a database from a file 
4027  */
4028 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
4029 {
4030         int ret;
4031         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4032         TDB_DATA outdata;
4033         TDB_DATA data;
4034         struct db_file_header dbhdr;
4035         struct ctdb_db_context *ctdb_db;
4036         struct ctdb_node_map *nodemap=NULL;
4037         struct ctdb_vnn_map *vnnmap=NULL;
4038         int i, fh;
4039         struct ctdb_control_wipe_database w;
4040         uint32_t *nodes;
4041         uint32_t generation;
4042         struct tm *tm;
4043         char tbuf[100];
4044         char *dbname;
4045
4046         if (argc < 1 || argc > 2) {
4047                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4048                 return -1;
4049         }
4050
4051         fh = open(argv[0], O_RDONLY);
4052         if (fh == -1) {
4053                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4054                 talloc_free(tmp_ctx);
4055                 return -1;
4056         }
4057
4058         read(fh, &dbhdr, sizeof(dbhdr));
4059         if (dbhdr.version != DB_VERSION) {
4060                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4061                 talloc_free(tmp_ctx);
4062                 return -1;
4063         }
4064
4065         dbname = discard_const(dbhdr.name);
4066         if (argc == 2) {
4067                 dbname = discard_const(argv[1]);
4068         }
4069
4070         outdata.dsize = dbhdr.size;
4071         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4072         if (outdata.dptr == NULL) {
4073                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4074                 close(fh);
4075                 talloc_free(tmp_ctx);
4076                 return -1;
4077         }               
4078         read(fh, outdata.dptr, outdata.dsize);
4079         close(fh);
4080
4081         tm = localtime(&dbhdr.timestamp);
4082         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4083         printf("Restoring database '%s' from backup @ %s\n",
4084                 dbname, tbuf);
4085
4086
4087         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
4088         if (ctdb_db == NULL) {
4089                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
4090                 talloc_free(tmp_ctx);
4091                 return -1;
4092         }
4093
4094         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4095         if (ret != 0) {
4096                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4097                 talloc_free(tmp_ctx);
4098                 return ret;
4099         }
4100
4101
4102         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
4103         if (ret != 0) {
4104                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
4105                 talloc_free(tmp_ctx);
4106                 return ret;
4107         }
4108
4109         /* freeze all nodes */
4110         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4111         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4112                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4113                                         nodes, i,
4114                                         TIMELIMIT(),
4115                                         false, tdb_null,
4116                                         NULL, NULL,
4117                                         NULL) != 0) {
4118                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4119                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4120                         talloc_free(tmp_ctx);
4121                         return -1;
4122                 }
4123         }
4124
4125         generation = vnnmap->generation;
4126         data.dptr = (void *)&generation;
4127         data.dsize = sizeof(generation);
4128
4129         /* start a cluster wide transaction */
4130         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4131         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4132                                         nodes, 0,
4133                                         TIMELIMIT(), false, data,
4134                                         NULL, NULL,
4135                                         NULL) != 0) {
4136                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
4137                 return -1;
4138         }
4139
4140
4141         w.db_id = ctdb_db->db_id;
4142         w.transaction_id = generation;
4143
4144         data.dptr = (void *)&w;
4145         data.dsize = sizeof(w);
4146
4147         /* wipe all the remote databases. */
4148         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4149         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4150                                         nodes, 0,
4151                                         TIMELIMIT(), false, data,
4152                                         NULL, NULL,
4153                                         NULL) != 0) {
4154                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4155                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4156                 talloc_free(tmp_ctx);
4157                 return -1;
4158         }
4159         
4160         /* push the database */
4161         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4162         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
4163                                         nodes, 0,
4164                                         TIMELIMIT(), false, outdata,
4165                                         NULL, NULL,
4166                                         NULL) != 0) {
4167                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
4168                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4169                 talloc_free(tmp_ctx);
4170                 return -1;
4171         }
4172
4173         data.dptr = (void *)&ctdb_db->db_id;
4174         data.dsize = sizeof(ctdb_db->db_id);
4175
4176         /* mark the database as healthy */
4177         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4178         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4179                                         nodes, 0,
4180                                         TIMELIMIT(), false, data,
4181                                         NULL, NULL,
4182                                         NULL) != 0) {
4183                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4184                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4185                 talloc_free(tmp_ctx);
4186                 return -1;
4187         }
4188
4189         data.dptr = (void *)&generation;
4190         data.dsize = sizeof(generation);
4191
4192         /* commit all the changes */
4193         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4194                                         nodes, 0,
4195                                         TIMELIMIT(), false, data,
4196                                         NULL, NULL,
4197                                         NULL) != 0) {
4198                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4199                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4200                 talloc_free(tmp_ctx);
4201                 return -1;
4202         }
4203
4204
4205         /* thaw all nodes */
4206         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4207         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4208                                         nodes, 0,
4209                                         TIMELIMIT(),
4210                                         false, tdb_null,
4211                                         NULL, NULL,
4212                                         NULL) != 0) {
4213                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4214                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4215                 talloc_free(tmp_ctx);
4216                 return -1;
4217         }
4218
4219
4220         talloc_free(tmp_ctx);
4221         return 0;
4222 }
4223
4224 /*
4225  * dump a database backup from a file
4226  */
4227 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
4228 {
4229         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4230         TDB_DATA outdata;
4231         struct db_file_header dbhdr;
4232         int i, fh;
4233         struct tm *tm;
4234         char tbuf[100];
4235         struct ctdb_rec_data *rec = NULL;
4236         struct ctdb_marshall_buffer *m;
4237
4238         if (argc != 1) {
4239                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4240                 return -1;
4241         }
4242
4243         fh = open(argv[0], O_RDONLY);
4244         if (fh == -1) {
4245                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4246                 talloc_free(tmp_ctx);
4247                 return -1;
4248         }
4249
4250         read(fh, &dbhdr, sizeof(dbhdr));
4251         if (dbhdr.version != DB_VERSION) {
4252                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4253                 talloc_free(tmp_ctx);
4254                 return -1;
4255         }
4256
4257         outdata.dsize = dbhdr.size;
4258         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4259         if (outdata.dptr == NULL) {
4260                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4261                 close(fh);
4262                 talloc_free(tmp_ctx);
4263                 return -1;
4264         }
4265         read(fh, outdata.dptr, outdata.dsize);
4266         close(fh);
4267         m = (struct ctdb_marshall_buffer *)outdata.dptr;
4268
4269         tm = localtime(&dbhdr.timestamp);
4270         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4271         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
4272                 dbhdr.name, m->db_id, tbuf);
4273
4274         for (i=0; i < m->count; i++) {
4275                 uint32_t reqid = 0;
4276                 TDB_DATA key, data;
4277
4278                 /* we do not want the header splitted, so we pass NULL*/
4279                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
4280                                               NULL, &key, &data);
4281
4282                 ctdb_dumpdb_record(ctdb, key, data, stdout);
4283         }
4284
4285         printf("Dumped %d records\n", i);
4286         talloc_free(tmp_ctx);
4287         return 0;
4288 }
4289
4290 /*
4291  * wipe a database from a file
4292  */
4293 static int control_wipedb(struct ctdb_context *ctdb, int argc,
4294                           const char **argv)
4295 {
4296         int ret;
4297         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4298         TDB_DATA data;
4299         struct ctdb_db_context *ctdb_db;
4300         struct ctdb_node_map *nodemap = NULL;
4301         struct ctdb_vnn_map *vnnmap = NULL;
4302         int i;
4303         struct ctdb_control_wipe_database w;
4304         uint32_t *nodes;
4305         uint32_t generation;
4306         struct ctdb_dbid_map *dbmap = NULL;
4307
4308         if (argc != 1) {
4309                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4310                 return -1;
4311         }
4312
4313         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4314                                  &dbmap);
4315         if (ret != 0) {
4316                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
4317                                   options.pnn));
4318                 return ret;
4319         }
4320
4321         for(i=0;i<dbmap->num;i++){
4322                 const char *name;
4323
4324                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4325                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
4326                 if(!strcmp(argv[0], name)){
4327                         talloc_free(discard_const(name));
4328                         break;
4329                 }
4330                 talloc_free(discard_const(name));
4331         }
4332         if (i == dbmap->num) {
4333                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
4334                                   argv[0]));
4335                 talloc_free(tmp_ctx);
4336                 return -1;
4337         }
4338
4339         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
4340         if (ctdb_db == NULL) {
4341                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
4342                                   argv[0]));
4343                 talloc_free(tmp_ctx);
4344                 return -1;
4345         }
4346
4347         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
4348                                    &nodemap);
4349         if (ret != 0) {
4350                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
4351                                   options.pnn));
4352                 talloc_free(tmp_ctx);
4353                 return ret;
4354         }
4355
4356         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4357                                   &vnnmap);
4358         if (ret != 0) {
4359                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
4360                                   options.pnn));
4361                 talloc_free(tmp_ctx);
4362                 return ret;
4363         }
4364
4365         /* freeze all nodes */
4366         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4367         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4368                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4369                                                 nodes, i,
4370                                                 TIMELIMIT(),
4371                                                 false, tdb_null,
4372                                                 NULL, NULL,
4373                                                 NULL);
4374                 if (ret != 0) {
4375                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4376                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
4377                                              CTDB_RECOVERY_ACTIVE);
4378                         talloc_free(tmp_ctx);
4379                         return -1;
4380                 }
4381         }
4382
4383         generation = vnnmap->generation;
4384         data.dptr = (void *)&generation;
4385         data.dsize = sizeof(generation);
4386
4387         /* start a cluster wide transaction */
4388         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4389         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4390                                         nodes, 0,
4391                                         TIMELIMIT(), false, data,
4392                                         NULL, NULL,
4393                                         NULL);
4394         if (ret!= 0) {
4395                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
4396                                   "transactions.\n"));
4397                 return -1;
4398         }
4399
4400         w.db_id = ctdb_db->db_id;
4401         w.transaction_id = generation;
4402
4403         data.dptr = (void *)&w;
4404         data.dsize = sizeof(w);
4405
4406         /* wipe all the remote databases. */
4407         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4408         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4409                                         nodes, 0,
4410                                         TIMELIMIT(), false, data,
4411                                         NULL, NULL,
4412                                         NULL) != 0) {
4413                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4414                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4415                 talloc_free(tmp_ctx);
4416                 return -1;
4417         }
4418
4419         data.dptr = (void *)&ctdb_db->db_id;
4420         data.dsize = sizeof(ctdb_db->db_id);
4421
4422         /* mark the database as healthy */
4423         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4424         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4425                                         nodes, 0,
4426                                         TIMELIMIT(), false, data,
4427                                         NULL, NULL,
4428                                         NULL) != 0) {
4429                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4430                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4431                 talloc_free(tmp_ctx);
4432                 return -1;
4433         }
4434
4435         data.dptr = (void *)&generation;
4436         data.dsize = sizeof(generation);
4437
4438         /* commit all the changes */
4439         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4440                                         nodes, 0,
4441                                         TIMELIMIT(), false, data,
4442                                         NULL, NULL,
4443                                         NULL) != 0) {
4444                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4445                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4446                 talloc_free(tmp_ctx);
4447                 return -1;
4448         }
4449
4450         /* thaw all nodes */
4451         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4452         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4453                                         nodes, 0,
4454                                         TIMELIMIT(),
4455                                         false, tdb_null,
4456                                         NULL, NULL,
4457                                         NULL) != 0) {
4458                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4459                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4460                 talloc_free(tmp_ctx);
4461                 return -1;
4462         }
4463
4464         talloc_free(tmp_ctx);
4465         return 0;
4466 }
4467
4468 /*
4469   dump memory usage
4470  */
4471 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4472 {
4473         TDB_DATA data;
4474         int ret;
4475         int32_t res;
4476         char *errmsg;
4477         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4478         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4479                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4480         if (ret != 0 || res != 0) {
4481                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4482                 talloc_free(tmp_ctx);
4483                 return -1;
4484         }
4485         write(1, data.dptr, data.dsize);
4486         talloc_free(tmp_ctx);
4487         return 0;
4488 }
4489
4490 /*
4491   handler for memory dumps
4492 */
4493 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4494                              TDB_DATA data, void *private_data)
4495 {
4496         write(1, data.dptr, data.dsize);
4497         exit(0);
4498 }
4499
4500 /*
4501   dump memory usage on the recovery daemon
4502  */
4503 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4504 {
4505         int ret;
4506         TDB_DATA data;
4507         struct rd_memdump_reply rd;
4508
4509         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4510         if (rd.pnn == -1) {
4511                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4512                 return -1;
4513         }
4514         rd.srvid = getpid();
4515
4516         /* register a message port for receiveing the reply so that we
4517            can receive the reply
4518         */
4519         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4520
4521
4522         data.dptr = (uint8_t *)&rd;
4523         data.dsize = sizeof(rd);
4524
4525         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4526         if (ret != 0) {
4527                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4528                 return -1;
4529         }
4530
4531         /* this loop will terminate when we have received the reply */
4532         while (1) {     
4533                 event_loop_once(ctdb->ev);
4534         }
4535
4536         return 0;
4537 }
4538
4539 /*
4540   send a message to a srvid
4541  */
4542 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4543 {
4544         unsigned long srvid;
4545         int ret;
4546         TDB_DATA data;
4547
4548         if (argc < 2) {
4549                 usage();
4550         }
4551
4552         srvid      = strtoul(argv[0], NULL, 0);
4553
4554         data.dptr = (uint8_t *)discard_const(argv[1]);
4555         data.dsize= strlen(argv[1]);
4556
4557         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4558         if (ret != 0) {
4559                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4560                 return -1;
4561         }
4562
4563         return 0;
4564 }
4565
4566 /*
4567   handler for msglisten
4568 */
4569 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4570                              TDB_DATA data, void *private_data)
4571 {
4572         int i;
4573
4574         printf("Message received: ");
4575         for (i=0;i<data.dsize;i++) {
4576                 printf("%c", data.dptr[i]);
4577         }
4578         printf("\n");
4579 }
4580
4581 /*
4582   listen for messages on a messageport
4583  */
4584 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4585 {
4586         uint64_t srvid;
4587
4588         srvid = getpid();
4589
4590         /* register a message port and listen for messages
4591         */
4592         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4593         printf("Listening for messages on srvid:%d\n", (int)srvid);
4594
4595         while (1) {     
4596                 event_loop_once(ctdb->ev);
4597         }
4598
4599         return 0;
4600 }
4601
4602 /*
4603   list all nodes in the cluster
4604   we parse the nodes file directly
4605  */
4606 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4607 {
4608         TALLOC_CTX *mem_ctx = talloc_new(NULL);
4609         struct pnn_node *pnn_nodes;
4610         struct pnn_node *pnn_node;
4611
4612         pnn_nodes = read_nodes_file(mem_ctx);
4613         if (pnn_nodes == NULL) {
4614                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4615                 talloc_free(mem_ctx);
4616                 return -1;
4617         }
4618
4619         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4620                 ctdb_sock_addr addr;
4621                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4622                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4623                         talloc_free(mem_ctx);
4624                         return -1;
4625                 }
4626                 if (options.machinereadable){
4627                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4628                 } else {
4629                         printf("%s\n", pnn_node->addr);
4630                 }
4631         }
4632         talloc_free(mem_ctx);
4633
4634         return 0;
4635 }
4636
4637 /*
4638   reload the nodes file on the local node
4639  */
4640 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4641 {
4642         int i, ret;
4643         int mypnn;
4644         struct ctdb_node_map *nodemap=NULL;
4645
4646         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4647         if (mypnn == -1) {
4648                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4649                 return -1;
4650         }
4651
4652         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4653         if (ret != 0) {
4654                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4655                 return ret;
4656         }
4657
4658         /* reload the nodes file on all remote nodes */
4659         for (i=0;i<nodemap->num;i++) {
4660                 if (nodemap->nodes[i].pnn == mypnn) {
4661                         continue;
4662                 }
4663                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4664                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4665                         nodemap->nodes[i].pnn);
4666                 if (ret != 0) {
4667                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4668                 }
4669         }
4670
4671         /* reload the nodes file on the local node */
4672         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4673         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4674         if (ret != 0) {
4675                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4676         }
4677
4678         /* initiate a recovery */
4679         control_recover(ctdb, argc, argv);
4680
4681         return 0;
4682 }
4683
4684
4685 static const struct {
4686         const char *name;
4687         int (*fn)(struct ctdb_context *, int, const char **);
4688         bool auto_all;
4689         bool without_daemon; /* can be run without daemon running ? */
4690         const char *msg;
4691         const char *args;
4692 } ctdb_commands[] = {
4693 #ifdef CTDB_VERS
4694         { "version",         control_version,           true,   false,  "show version of ctdb" },
4695 #endif
4696         { "status",          control_status,            true,   false,  "show node status" },
4697         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4698         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4699         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4700         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4701         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4702         { "statistics",      control_statistics,        false,  false, "show statistics" },
4703         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4704         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
4705         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4706         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4707         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4708         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4709         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4710         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4711         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4712         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4713         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4714         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4715         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4716         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4717         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4718         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4719         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4720         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4721         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4722         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4723         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4724         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4725         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4726         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4727         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4728         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4729         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4730         { "stop",            control_stop,              true,   false,  "stop a node" },
4731         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4732         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4733         { "unban",           control_unban,             true,   false,  "unban a node" },
4734         { "showban",         control_showban,           true,   false,  "show ban information"},
4735         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4736         { "recover",         control_recover,           true,   false,  "force recovery" },
4737         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
4738         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4739         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4740         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4741         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4742         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4743         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4744         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
4745         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
4746
4747         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
4748
4749         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4750         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4751         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4752         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4753         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4754         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4755         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4756         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4757         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4758         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4759         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4760         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4761         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4762         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4763         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4764         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4765         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4766         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4767         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4768         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4769         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4770         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4771         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4772         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4773         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4774         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4775         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4776         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4777         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4778         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4779         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4780         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
4781         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
4782         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
4783         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file", "<tdb-file> <key> [<file>]" },
4784 };
4785
4786 /*
4787   show usage message
4788  */
4789 static void usage(void)
4790 {
4791         int i;
4792         printf(
4793 "Usage: ctdb [options] <control>\n" \
4794 "Options:\n" \
4795 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4796 "   -Y                 generate machinereadable output\n"
4797 "   -v                 generate verbose output\n"
4798 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4799         printf("Controls:\n");
4800         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4801                 printf("  %-15s %-27s  %s\n", 
4802                        ctdb_commands[i].name, 
4803                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4804                        ctdb_commands[i].msg);
4805         }
4806         exit(1);
4807 }
4808
4809
4810 static void ctdb_alarm(int sig)
4811 {
4812         printf("Maximum runtime exceeded - exiting\n");
4813         _exit(ERR_TIMEOUT);
4814 }
4815
4816 /*
4817   main program
4818 */
4819 int main(int argc, const char *argv[])
4820 {
4821         struct ctdb_context *ctdb;
4822         char *nodestring = NULL;
4823         struct poptOption popt_options[] = {
4824                 POPT_AUTOHELP
4825                 POPT_CTDB_CMDLINE
4826                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4827                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4828                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4829                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
4830                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4831                 POPT_TABLEEND
4832         };
4833         int opt;
4834         const char **extra_argv;
4835         int extra_argc = 0;
4836         int ret=-1, i;
4837         poptContext pc;
4838         struct event_context *ev;
4839         const char *control;
4840
4841         setlinebuf(stdout);
4842         
4843         /* set some defaults */
4844         options.maxruntime = 0;
4845         options.timelimit = 3;
4846         options.pnn = CTDB_CURRENT_NODE;
4847
4848         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4849
4850         while ((opt = poptGetNextOpt(pc)) != -1) {
4851                 switch (opt) {
4852                 default:
4853                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4854                                 poptBadOption(pc, 0), poptStrerror(opt)));
4855                         exit(1);
4856                 }
4857         }
4858
4859         /* setup the remaining options for the main program to use */
4860         extra_argv = poptGetArgs(pc);
4861         if (extra_argv) {
4862                 extra_argv++;
4863                 while (extra_argv[extra_argc]) extra_argc++;
4864         }
4865
4866         if (extra_argc < 1) {
4867                 usage();
4868         }
4869
4870         if (options.maxruntime == 0) {
4871                 const char *ctdb_timeout;
4872                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4873                 if (ctdb_timeout != NULL) {
4874                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4875                 } else {
4876                         /* default timeout is 120 seconds */
4877                         options.maxruntime = 120;
4878                 }
4879         }
4880
4881         signal(SIGALRM, ctdb_alarm);
4882         alarm(options.maxruntime);
4883
4884         /* setup the node number to contact */
4885         if (nodestring != NULL) {
4886                 if (strcmp(nodestring, "all") == 0) {
4887                         options.pnn = CTDB_BROADCAST_ALL;
4888                 } else {
4889                         options.pnn = strtoul(nodestring, NULL, 0);
4890                 }
4891         }
4892
4893         control = extra_argv[0];
4894
4895         ev = event_context_init(NULL);
4896         if (!ev) {
4897                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
4898                 exit(1);
4899         }
4900         tevent_loop_allow_nesting(ev);
4901
4902         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4903                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4904                         int j;
4905
4906                         if (ctdb_commands[i].without_daemon == true) {
4907                                 close(2);
4908                         }
4909
4910                         /* initialise ctdb */
4911                         ctdb = ctdb_cmdline_client(ev);
4912
4913                         if (ctdb_commands[i].without_daemon == false) {
4914                                 const char *socket_name;
4915
4916                                 if (ctdb == NULL) {
4917                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4918                                         exit(1);
4919                                 }
4920
4921                                 /* initialize a libctdb connection as well */
4922                                 socket_name = ctdb_get_socketname(ctdb);
4923                                 ctdb_connection = ctdb_connect(socket_name,
4924                                                        ctdb_log_file, stderr);
4925                                 if (ctdb_connection == NULL) {
4926                                         fprintf(stderr, "Failed to connect to daemon from libctdb\n");
4927                                         exit(1);
4928                                 }                               
4929                         
4930                                 /* verify the node exists */
4931                                 verify_node(ctdb);
4932
4933                                 if (options.pnn == CTDB_CURRENT_NODE) {
4934                                         int pnn;
4935                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4936                                         if (pnn == -1) {
4937                                                 return -1;
4938                                         }
4939                                         options.pnn = pnn;
4940                                 }
4941                         }
4942
4943                         if (ctdb_commands[i].auto_all && 
4944                             options.pnn == CTDB_BROADCAST_ALL) {
4945                                 uint32_t *nodes;
4946                                 uint32_t num_nodes;
4947                                 ret = 0;
4948
4949                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4950                                 CTDB_NO_MEMORY(ctdb, nodes);
4951         
4952                                 for (j=0;j<num_nodes;j++) {
4953                                         options.pnn = nodes[j];
4954                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4955                                 }
4956                                 talloc_free(nodes);
4957                         } else {
4958                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4959                         }
4960                         break;
4961                 }
4962         }
4963
4964         if (i == ARRAY_SIZE(ctdb_commands)) {
4965                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4966                 exit(1);
4967         }
4968
4969         return ret;
4970 }