remove the "ctdb freeze" debugging command
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         int machinereadable;
47         int maxruntime;
48 } options;
49
50 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
51 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
52
53 #ifdef CTDB_VERS
54 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
55 {
56 #define STR(x) #x
57 #define XSTR(x) STR(x)
58         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
59         return 0;
60 }
61 #endif
62
63
64 /*
65   verify that a node exists and is reachable
66  */
67 static void verify_node(struct ctdb_context *ctdb)
68 {
69         int ret;
70         struct ctdb_node_map *nodemap=NULL;
71
72         if (options.pnn == CTDB_CURRENT_NODE) {
73                 return;
74         }
75         if (options.pnn == CTDB_BROADCAST_ALL) {
76                 return;
77         }
78
79         /* verify the node exists */
80         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
81                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
82                 exit(10);
83         }
84         if (options.pnn >= nodemap->num) {
85                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
86                 exit(ERR_NONODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
93                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
94                 exit(ERR_DISNODE);
95         }
96
97         /* verify we can access the node */
98         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
99         if (ret == -1) {
100                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
101                 exit(10);
102         }
103 }
104
105 /*
106  check if a database exists
107 */
108 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
109 {
110         int i, ret;
111         struct ctdb_dbid_map *dbmap=NULL;
112
113         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
114         if (ret != 0) {
115                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
116                 return -1;
117         }
118
119         for(i=0;i<dbmap->num;i++){
120                 const char *name;
121
122                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
123                 if (!strcmp(name, db_name)) {
124                         return 0;
125                 }
126         }
127
128         return -1;
129 }
130
131 /*
132   see if a process exists
133  */
134 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
135 {
136         uint32_t pnn, pid;
137         int ret;
138         if (argc < 1) {
139                 usage();
140         }
141
142         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
143                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
144                 return -1;
145         }
146
147         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
148         if (ret == 0) {
149                 printf("%u:%u exists\n", pnn, pid);
150         } else {
151                 printf("%u:%u does not exist\n", pnn, pid);
152         }
153         return ret;
154 }
155
156 /*
157   display statistics structure
158  */
159 static void show_statistics(struct ctdb_statistics *s)
160 {
161         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
162         int i;
163         const char *prefix=NULL;
164         int preflen=0;
165         int tmp, days, hours, minutes, seconds;
166         const struct {
167                 const char *name;
168                 uint32_t offset;
169         } fields[] = {
170 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
171                 STATISTICS_FIELD(num_clients),
172                 STATISTICS_FIELD(frozen),
173                 STATISTICS_FIELD(recovering),
174                 STATISTICS_FIELD(num_recoveries),
175                 STATISTICS_FIELD(client_packets_sent),
176                 STATISTICS_FIELD(client_packets_recv),
177                 STATISTICS_FIELD(node_packets_sent),
178                 STATISTICS_FIELD(node_packets_recv),
179                 STATISTICS_FIELD(keepalive_packets_sent),
180                 STATISTICS_FIELD(keepalive_packets_recv),
181                 STATISTICS_FIELD(node.req_call),
182                 STATISTICS_FIELD(node.reply_call),
183                 STATISTICS_FIELD(node.req_dmaster),
184                 STATISTICS_FIELD(node.reply_dmaster),
185                 STATISTICS_FIELD(node.reply_error),
186                 STATISTICS_FIELD(node.req_message),
187                 STATISTICS_FIELD(node.req_control),
188                 STATISTICS_FIELD(node.reply_control),
189                 STATISTICS_FIELD(client.req_call),
190                 STATISTICS_FIELD(client.req_message),
191                 STATISTICS_FIELD(client.req_control),
192                 STATISTICS_FIELD(timeouts.call),
193                 STATISTICS_FIELD(timeouts.control),
194                 STATISTICS_FIELD(timeouts.traverse),
195                 STATISTICS_FIELD(total_calls),
196                 STATISTICS_FIELD(pending_calls),
197                 STATISTICS_FIELD(lockwait_calls),
198                 STATISTICS_FIELD(pending_lockwait_calls),
199                 STATISTICS_FIELD(childwrite_calls),
200                 STATISTICS_FIELD(pending_childwrite_calls),
201                 STATISTICS_FIELD(memory_used),
202                 STATISTICS_FIELD(max_hop_count),
203         };
204         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
205         seconds = tmp%60;
206         tmp    /= 60;
207         minutes = tmp%60;
208         tmp    /= 60;
209         hours   = tmp%24;
210         tmp    /= 24;
211         days    = tmp;
212
213         printf("CTDB version %u\n", CTDB_VERSION);
214         printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
215         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
216
217         for (i=0;i<ARRAY_SIZE(fields);i++) {
218                 if (strchr(fields[i].name, '.')) {
219                         preflen = strcspn(fields[i].name, ".")+1;
220                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
221                                 prefix = fields[i].name;
222                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
223                         }
224                 } else {
225                         preflen = 0;
226                 }
227                 printf(" %*s%-22s%*s%10u\n", 
228                        preflen?4:0, "",
229                        fields[i].name+preflen, 
230                        preflen?0:4, "",
231                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
232         }
233         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
234         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
235
236         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
237         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
238         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
239         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
240
241         talloc_free(tmp_ctx);
242 }
243
244 /*
245   display remote ctdb statistics combined from all nodes
246  */
247 static int control_statistics_all(struct ctdb_context *ctdb)
248 {
249         int ret, i;
250         struct ctdb_statistics statistics;
251         uint32_t *nodes;
252         uint32_t num_nodes;
253
254         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
255         CTDB_NO_MEMORY(ctdb, nodes);
256         
257         ZERO_STRUCT(statistics);
258
259         for (i=0;i<num_nodes;i++) {
260                 struct ctdb_statistics s1;
261                 int j;
262                 uint32_t *v1 = (uint32_t *)&s1;
263                 uint32_t *v2 = (uint32_t *)&statistics;
264                 uint32_t num_ints = 
265                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
266                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
267                 if (ret != 0) {
268                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
269                         return ret;
270                 }
271                 for (j=0;j<num_ints;j++) {
272                         v2[j] += v1[j];
273                 }
274                 statistics.max_hop_count = 
275                         MAX(statistics.max_hop_count, s1.max_hop_count);
276                 statistics.max_call_latency = 
277                         MAX(statistics.max_call_latency, s1.max_call_latency);
278                 statistics.max_lockwait_latency = 
279                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
280         }
281         talloc_free(nodes);
282         printf("Gathered statistics for %u nodes\n", num_nodes);
283         show_statistics(&statistics);
284         return 0;
285 }
286
287 /*
288   display remote ctdb statistics
289  */
290 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
291 {
292         int ret;
293         struct ctdb_statistics statistics;
294
295         if (options.pnn == CTDB_BROADCAST_ALL) {
296                 return control_statistics_all(ctdb);
297         }
298
299         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
300         if (ret != 0) {
301                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
302                 return ret;
303         }
304         show_statistics(&statistics);
305         return 0;
306 }
307
308
309 /*
310   reset remote ctdb statistics
311  */
312 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
313 {
314         int ret;
315
316         ret = ctdb_statistics_reset(ctdb, options.pnn);
317         if (ret != 0) {
318                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
319                 return ret;
320         }
321         return 0;
322 }
323
324
325 /*
326   display uptime of remote node
327  */
328 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
329 {
330         int ret;
331         struct ctdb_uptime *uptime = NULL;
332         int tmp, days, hours, minutes, seconds;
333
334         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
335         if (ret != 0) {
336                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
337                 return ret;
338         }
339
340         if (options.machinereadable){
341                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
342                 printf(":%u:%u:%u:%lf\n",
343                         (unsigned int)uptime->current_time.tv_sec,
344                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
345                         (unsigned int)uptime->last_recovery_finished.tv_sec,
346                         timeval_delta(&uptime->last_recovery_finished,
347                                       &uptime->last_recovery_started)
348                 );
349                 return 0;
350         }
351
352         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
353
354         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
355         seconds = tmp%60;
356         tmp    /= 60;
357         minutes = tmp%60;
358         tmp    /= 60;
359         hours   = tmp%24;
360         tmp    /= 24;
361         days    = tmp;
362         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
363
364         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
365         seconds = tmp%60;
366         tmp    /= 60;
367         minutes = tmp%60;
368         tmp    /= 60;
369         hours   = tmp%24;
370         tmp    /= 24;
371         days    = tmp;
372         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
373         
374         printf("Duration of last recovery/failover: %lf seconds\n",
375                 timeval_delta(&uptime->last_recovery_finished,
376                               &uptime->last_recovery_started));
377
378         return 0;
379 }
380
381 /*
382   show the PNN of the current node
383  */
384 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
385 {
386         uint32_t mypnn;
387         bool ret;
388
389         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
390         if (!ret) {
391                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
392                 return -1;
393         }
394
395         printf("PNN:%d\n", mypnn);
396         return 0;
397 }
398
399
400 struct pnn_node {
401         struct pnn_node *next;
402         const char *addr;
403         int pnn;
404 };
405
406 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
407 {
408         const char *nodes_list;
409         int nlines;
410         char **lines;
411         int i, pnn;
412         struct pnn_node *pnn_nodes = NULL;
413         struct pnn_node *pnn_node;
414         struct pnn_node *tmp_node;
415
416         /* read the nodes file */
417         nodes_list = getenv("CTDB_NODES");
418         if (nodes_list == NULL) {
419                 nodes_list = "/etc/ctdb/nodes";
420         }
421         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
422         if (lines == NULL) {
423                 return NULL;
424         }
425         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
426                 nlines--;
427         }
428         for (i=0, pnn=0; i<nlines; i++) {
429                 char *node;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436                 if (*node == '#') {
437                         pnn++;
438                         continue;
439                 }
440                 if (strcmp(node, "") == 0) {
441                         continue;
442                 }
443                 pnn_node = talloc(mem_ctx, struct pnn_node);
444                 pnn_node->pnn = pnn++;
445                 pnn_node->addr = talloc_strdup(pnn_node, node);
446                 pnn_node->next = pnn_nodes;
447                 pnn_nodes = pnn_node;
448         }
449
450         /* swap them around so we return them in incrementing order */
451         pnn_node = pnn_nodes;
452         pnn_nodes = NULL;
453         while (pnn_node) {
454                 tmp_node = pnn_node;
455                 pnn_node = pnn_node->next;
456
457                 tmp_node->next = pnn_nodes;
458                 pnn_nodes = tmp_node;
459         }
460
461         return pnn_nodes;
462 }
463
464 /*
465   show the PNN of the current node
466   discover the pnn by loading the nodes file and try to bind to all
467   addresses one at a time until the ip address is found.
468  */
469 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
470 {
471         TALLOC_CTX *mem_ctx = talloc_new(NULL);
472         struct pnn_node *pnn_nodes;
473         struct pnn_node *pnn_node;
474
475         pnn_nodes = read_nodes_file(mem_ctx);
476         if (pnn_nodes == NULL) {
477                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
478                 talloc_free(mem_ctx);
479                 return -1;
480         }
481
482         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
483                 ctdb_sock_addr addr;
484
485                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
486                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
487                         talloc_free(mem_ctx);
488                         return -1;
489                 }
490
491                 if (ctdb_sys_have_ip(&addr)) {
492                         printf("PNN:%d\n", pnn_node->pnn);
493                         talloc_free(mem_ctx);
494                         return 0;
495                 }
496         }
497
498         printf("Failed to detect which PNN this node is\n");
499         talloc_free(mem_ctx);
500         return -1;
501 }
502
503 /*
504   display remote ctdb status
505  */
506 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
507 {
508         int i, ret;
509         struct ctdb_vnn_map *vnnmap=NULL;
510         struct ctdb_node_map *nodemap=NULL;
511         uint32_t recmode, recmaster;
512         int mypnn;
513
514         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
515         if (mypnn == -1) {
516                 return -1;
517         }
518
519         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
520         if (ret != 0) {
521                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
522                 return ret;
523         }
524
525         if(options.machinereadable){
526                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
527                 for(i=0;i<nodemap->num;i++){
528                         int partially_online = 0;
529                         int j;
530
531                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
532                                 continue;
533                         }
534                         if (nodemap->nodes[i].flags == 0) {
535                                 struct ctdb_control_get_ifaces *ifaces;
536
537                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
538                                                            nodemap->nodes[i].pnn,
539                                                            ctdb, &ifaces);
540                                 if (ret == 0) {
541                                         for (j=0; j < ifaces->num; j++) {
542                                                 if (ifaces->ifaces[j].link_state != 0) {
543                                                         continue;
544                                                 }
545                                                 partially_online = 1;
546                                                 break;
547                                         }
548                                         talloc_free(ifaces);
549                                 }
550                         }
551                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
552                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
553                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
554                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
555                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
556                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
557                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
558                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
559                                partially_online);
560                 }
561                 return 0;
562         }
563
564         printf("Number of nodes:%d\n", nodemap->num);
565         for(i=0;i<nodemap->num;i++){
566                 static const struct {
567                         uint32_t flag;
568                         const char *name;
569                 } flag_names[] = {
570                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
571                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
572                         { NODE_FLAGS_BANNED,                "BANNED" },
573                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
574                         { NODE_FLAGS_DELETED,               "DELETED" },
575                         { NODE_FLAGS_STOPPED,               "STOPPED" },
576                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
577                 };
578                 char *flags_str = NULL;
579                 int j;
580
581                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
582                         continue;
583                 }
584                 if (nodemap->nodes[i].flags == 0) {
585                         struct ctdb_control_get_ifaces *ifaces;
586
587                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
588                                                    nodemap->nodes[i].pnn,
589                                                    ctdb, &ifaces);
590                         if (ret == 0) {
591                                 for (j=0; j < ifaces->num; j++) {
592                                         if (ifaces->ifaces[j].link_state != 0) {
593                                                 continue;
594                                         }
595                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
596                                         break;
597                                 }
598                                 talloc_free(ifaces);
599                         }
600                 }
601                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
602                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
603                                 if (flags_str == NULL) {
604                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
605                                 } else {
606                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
607                                                                            flag_names[j].name);
608                                 }
609                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
610                         }
611                 }
612                 if (flags_str == NULL) {
613                         flags_str = talloc_strdup(ctdb, "OK");
614                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
615                 }
616                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
617                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
618                        flags_str,
619                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
620                 talloc_free(flags_str);
621         }
622
623         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
624         if (ret != 0) {
625                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
626                 return ret;
627         }
628         if (vnnmap->generation == INVALID_GENERATION) {
629                 printf("Generation:INVALID\n");
630         } else {
631                 printf("Generation:%d\n",vnnmap->generation);
632         }
633         printf("Size:%d\n",vnnmap->size);
634         for(i=0;i<vnnmap->size;i++){
635                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
636         }
637
638         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
639         if (ret != 0) {
640                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
641                 return ret;
642         }
643         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
644
645         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
646         if (ret != 0) {
647                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
648                 return ret;
649         }
650         printf("Recovery master:%d\n",recmaster);
651
652         return 0;
653 }
654
655
656 struct natgw_node {
657         struct natgw_node *next;
658         const char *addr;
659 };
660
661 /*
662   display the list of nodes belonging to this natgw configuration
663  */
664 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
665 {
666         int i, ret;
667         const char *natgw_list;
668         int nlines;
669         char **lines;
670         struct natgw_node *natgw_nodes = NULL;
671         struct natgw_node *natgw_node;
672         struct ctdb_node_map *nodemap=NULL;
673
674
675         /* read the natgw nodes file into a linked list */
676         natgw_list = getenv("NATGW_NODES");
677         if (natgw_list == NULL) {
678                 natgw_list = "/etc/ctdb/natgw_nodes";
679         }
680         lines = file_lines_load(natgw_list, &nlines, ctdb);
681         if (lines == NULL) {
682                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
683                 return -1;
684         }
685         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
686                 nlines--;
687         }
688         for (i=0;i<nlines;i++) {
689                 char *node;
690
691                 node = lines[i];
692                 /* strip leading spaces */
693                 while((*node == ' ') || (*node == '\t')) {
694                         node++;
695                 }
696                 if (*node == '#') {
697                         continue;
698                 }
699                 if (strcmp(node, "") == 0) {
700                         continue;
701                 }
702                 natgw_node = talloc(ctdb, struct natgw_node);
703                 natgw_node->addr = talloc_strdup(natgw_node, node);
704                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
705                 natgw_node->next = natgw_nodes;
706                 natgw_nodes = natgw_node;
707         }
708
709         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
710         if (ret != 0) {
711                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
712                 return ret;
713         }
714
715         i=0;
716         while(i<nodemap->num) {
717                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
718                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
719                                 break;
720                         }
721                 }
722
723                 /* this node was not in the natgw so we just remove it from
724                  * the list
725                  */
726                 if ((natgw_node == NULL) 
727                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
728                         int j;
729
730                         for (j=i+1; j<nodemap->num; j++) {
731                                 nodemap->nodes[j-1] = nodemap->nodes[j];
732                         }
733                         nodemap->num--;
734                         continue;
735                 }
736
737                 i++;
738         }               
739
740         /* pick a node to be natgwmaster
741          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
742          */
743         for(i=0;i<nodemap->num;i++){
744                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
745                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
746                         break;
747                 }
748         }
749         /* we couldnt find any healthy node, try unhealthy ones */
750         if (i == nodemap->num) {
751                 for(i=0;i<nodemap->num;i++){
752                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
753                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
754                                 break;
755                         }
756                 }
757         }
758         /* unless all nodes are STOPPED, when we pick one anyway */
759         if (i == nodemap->num) {
760                 for(i=0;i<nodemap->num;i++){
761                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
762                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
763                                 break;
764                         }
765                 }
766                 /* or if we still can not find any */
767                 if (i == nodemap->num) {
768                         printf("-1 0.0.0.0\n");
769                 }
770         }
771
772         /* print the pruned list of nodes belonging to this natgw list */
773         for(i=0;i<nodemap->num;i++){
774                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
775                         continue;
776                 }
777                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
778                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
779                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
780                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
781                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
782                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
783                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
784         }
785
786         return 0;
787 }
788
789 /*
790   display the status of the scripts for monitoring (or other events)
791  */
792 static int control_one_scriptstatus(struct ctdb_context *ctdb,
793                                     enum ctdb_eventscript_call type)
794 {
795         struct ctdb_scripts_wire *script_status;
796         int ret, i;
797
798         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
799         if (ret != 0) {
800                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
801                 return ret;
802         }
803
804         if (script_status == NULL) {
805                 if (!options.machinereadable) {
806                         printf("%s cycle never run\n",
807                                ctdb_eventscript_call_names[type]);
808                 }
809                 return 0;
810         }
811
812         if (!options.machinereadable) {
813                 printf("%d scripts were executed last %s cycle\n",
814                        script_status->num_scripts,
815                        ctdb_eventscript_call_names[type]);
816         }
817         for (i=0; i<script_status->num_scripts; i++) {
818                 const char *status = NULL;
819
820                 switch (script_status->scripts[i].status) {
821                 case -ETIME:
822                         status = "TIMEDOUT";
823                         break;
824                 case -ENOEXEC:
825                         status = "DISABLED";
826                         break;
827                 case 0:
828                         status = "OK";
829                         break;
830                 default:
831                         if (script_status->scripts[i].status > 0)
832                                 status = "ERROR";
833                         break;
834                 }
835                 if (options.machinereadable) {
836                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
837                                ctdb_eventscript_call_names[type],
838                                script_status->scripts[i].name,
839                                script_status->scripts[i].status,
840                                status,
841                                (long)script_status->scripts[i].start.tv_sec,
842                                (long)script_status->scripts[i].start.tv_usec,
843                                (long)script_status->scripts[i].finished.tv_sec,
844                                (long)script_status->scripts[i].finished.tv_usec,
845                                script_status->scripts[i].output);
846                         continue;
847                 }
848                 if (status)
849                         printf("%-20s Status:%s    ",
850                                script_status->scripts[i].name, status);
851                 else
852                         /* Some other error, eg from stat. */
853                         printf("%-20s Status:CANNOT RUN (%s)",
854                                script_status->scripts[i].name,
855                                strerror(-script_status->scripts[i].status));
856
857                 if (script_status->scripts[i].status >= 0) {
858                         printf("Duration:%.3lf ",
859                         timeval_delta(&script_status->scripts[i].finished,
860                               &script_status->scripts[i].start));
861                 }
862                 if (script_status->scripts[i].status != -ENOEXEC) {
863                         printf("%s",
864                                ctime(&script_status->scripts[i].start.tv_sec));
865                         if (script_status->scripts[i].status != 0) {
866                                 printf("   OUTPUT:%s\n",
867                                        script_status->scripts[i].output);
868                         }
869                 } else {
870                         printf("\n");
871                 }
872         }
873         return 0;
874 }
875
876
877 static int control_scriptstatus(struct ctdb_context *ctdb,
878                                 int argc, const char **argv)
879 {
880         int ret;
881         enum ctdb_eventscript_call type, min, max;
882         const char *arg;
883
884         if (argc > 1) {
885                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
886                 return -1;
887         }
888
889         if (argc == 0)
890                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
891         else
892                 arg = argv[0];
893
894         for (type = 0; type < CTDB_EVENT_MAX; type++) {
895                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
896                         min = type;
897                         max = type+1;
898                         break;
899                 }
900         }
901         if (type == CTDB_EVENT_MAX) {
902                 if (strcmp(arg, "all") == 0) {
903                         min = 0;
904                         max = CTDB_EVENT_MAX;
905                 } else {
906                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
907                         return -1;
908                 }
909         }
910
911         if (options.machinereadable) {
912                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
913         }
914
915         for (type = min; type < max; type++) {
916                 ret = control_one_scriptstatus(ctdb, type);
917                 if (ret != 0) {
918                         return ret;
919                 }
920         }
921
922         return 0;
923 }
924
925 /*
926   enable an eventscript
927  */
928 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
929 {
930         int ret;
931
932         if (argc < 1) {
933                 usage();
934         }
935
936         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
937         if (ret != 0) {
938           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
939                 return ret;
940         }
941
942         return 0;
943 }
944
945 /*
946   disable an eventscript
947  */
948 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
949 {
950         int ret;
951
952         if (argc < 1) {
953                 usage();
954         }
955
956         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
957         if (ret != 0) {
958           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
959                 return ret;
960         }
961
962         return 0;
963 }
964
965 /*
966   display the pnn of the recovery master
967  */
968 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
969 {
970         int ret;
971         uint32_t recmaster;
972
973         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
974         if (ret != 0) {
975                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
976                 return ret;
977         }
978         printf("%d\n",recmaster);
979
980         return 0;
981 }
982
983 /*
984   get a list of all tickles for this pnn
985  */
986 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
987 {
988         struct ctdb_control_tcp_tickle_list *list;
989         ctdb_sock_addr addr;
990         int i, ret;
991
992         if (argc < 1) {
993                 usage();
994         }
995
996         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
997                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
998                 return -1;
999         }
1000
1001         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1002         if (ret == -1) {
1003                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1004                 return -1;
1005         }
1006
1007         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1008         printf("Num tickles:%u\n", list->tickles.num);
1009         for (i=0;i<list->tickles.num;i++) {
1010                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1011                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1012         }
1013
1014         talloc_free(list);
1015         
1016         return 0;
1017 }
1018
1019
1020 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1021 {
1022         struct ctdb_all_public_ips *ips;
1023         struct ctdb_public_ip ip;
1024         int i, ret;
1025         uint32_t *nodes;
1026         uint32_t disable_time;
1027         TDB_DATA data;
1028         struct ctdb_node_map *nodemap=NULL;
1029         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1030
1031         disable_time = 30;
1032         data.dptr  = (uint8_t*)&disable_time;
1033         data.dsize = sizeof(disable_time);
1034         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1035         if (ret != 0) {
1036                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1037                 return -1;
1038         }
1039
1040
1041
1042         /* read the public ip list from the node */
1043         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1044         if (ret != 0) {
1045                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1046                 talloc_free(tmp_ctx);
1047                 return -1;
1048         }
1049
1050         for (i=0;i<ips->num;i++) {
1051                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1052                         break;
1053                 }
1054         }
1055         if (i==ips->num) {
1056                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1057                         pnn, ctdb_addr_to_str(addr)));
1058                 talloc_free(tmp_ctx);
1059                 return -1;
1060         }
1061
1062         ip.pnn  = pnn;
1063         ip.addr = *addr;
1064
1065         data.dptr  = (uint8_t *)&ip;
1066         data.dsize = sizeof(ip);
1067
1068         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1071                 talloc_free(tmp_ctx);
1072                 return ret;
1073         }
1074
1075         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1076         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1077                                         nodes, 0,
1078                                         LONGTIMELIMIT(),
1079                                         false, data,
1080                                         NULL, NULL,
1081                                         NULL);
1082         if (ret != 0) {
1083                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1084                 talloc_free(tmp_ctx);
1085                 return -1;
1086         }
1087
1088         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1089         if (ret != 0) {
1090                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1091                 talloc_free(tmp_ctx);
1092                 return -1;
1093         }
1094
1095         /* update the recovery daemon so it now knows to expect the new
1096            node assignment for this ip.
1097         */
1098         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1101                 return -1;
1102         }
1103
1104         talloc_free(tmp_ctx);
1105         return 0;
1106 }
1107
1108 /*
1109   move/failover an ip address to a specific node
1110  */
1111 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1112 {
1113         uint32_t pnn;
1114         ctdb_sock_addr addr;
1115
1116         if (argc < 2) {
1117                 usage();
1118                 return -1;
1119         }
1120
1121         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1122                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1123                 return -1;
1124         }
1125
1126
1127         if (sscanf(argv[1], "%u", &pnn) != 1) {
1128                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1129                 return -1;
1130         }
1131
1132         if (move_ip(ctdb, &addr, pnn) != 0) {
1133                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1134                 return -1;
1135         }
1136
1137         return 0;
1138 }
1139
1140 void getips_store_callback(void *param, void *data)
1141 {
1142         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1143         struct ctdb_all_public_ips *ips = param;
1144         int i;
1145
1146         i = ips->num++;
1147         ips->ips[i].pnn  = node_ip->pnn;
1148         ips->ips[i].addr = node_ip->addr;
1149 }
1150
1151 void getips_count_callback(void *param, void *data)
1152 {
1153         uint32_t *count = param;
1154
1155         (*count)++;
1156 }
1157
1158 #define IP_KEYLEN       4
1159 static uint32_t *ip_key(ctdb_sock_addr *ip)
1160 {
1161         static uint32_t key[IP_KEYLEN];
1162
1163         bzero(key, sizeof(key));
1164
1165         switch (ip->sa.sa_family) {
1166         case AF_INET:
1167                 key[0]  = ip->ip.sin_addr.s_addr;
1168                 break;
1169         case AF_INET6:
1170                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1171                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1172                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1173                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1174                 break;
1175         default:
1176                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1177                 return key;
1178         }
1179
1180         return key;
1181 }
1182
1183 static void *add_ip_callback(void *parm, void *data)
1184 {
1185         return parm;
1186 }
1187
1188 static int
1189 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1190 {
1191         struct ctdb_all_public_ips *tmp_ips;
1192         struct ctdb_node_map *nodemap=NULL;
1193         trbt_tree_t *ip_tree;
1194         int i, j, len, ret;
1195         uint32_t count;
1196
1197         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1198         if (ret != 0) {
1199                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1200                 return ret;
1201         }
1202
1203         ip_tree = trbt_create(tmp_ctx, 0);
1204
1205         for(i=0;i<nodemap->num;i++){
1206                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1207                         continue;
1208                 }
1209                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1210                         continue;
1211                 }
1212
1213                 /* read the public ip list from this node */
1214                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1215                 if (ret != 0) {
1216                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1217                         return -1;
1218                 }
1219         
1220                 for (j=0; j<tmp_ips->num;j++) {
1221                         struct ctdb_public_ip *node_ip;
1222
1223                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1224                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1225                         node_ip->addr = tmp_ips->ips[j].addr;
1226
1227                         trbt_insertarray32_callback(ip_tree,
1228                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1229                                 add_ip_callback,
1230                                 node_ip);
1231                 }
1232                 talloc_free(tmp_ips);
1233         }
1234
1235         /* traverse */
1236         count = 0;
1237         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1238
1239         len = offsetof(struct ctdb_all_public_ips, ips) + 
1240                 count*sizeof(struct ctdb_public_ip);
1241         tmp_ips = talloc_zero_size(tmp_ctx, len);
1242         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1243
1244         *ips = tmp_ips;
1245
1246         return 0;
1247 }
1248
1249
1250 /* 
1251  * scans all other nodes and returns a pnn for another node that can host this 
1252  * ip address or -1
1253  */
1254 static int
1255 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1256 {
1257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1258         struct ctdb_all_public_ips *ips;
1259         struct ctdb_node_map *nodemap=NULL;
1260         int i, j, ret;
1261
1262         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1263         if (ret != 0) {
1264                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1265                 talloc_free(tmp_ctx);
1266                 return ret;
1267         }
1268
1269         for(i=0;i<nodemap->num;i++){
1270                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1271                         continue;
1272                 }
1273                 if (nodemap->nodes[i].pnn == options.pnn) {
1274                         continue;
1275                 }
1276
1277                 /* read the public ip list from this node */
1278                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1279                 if (ret != 0) {
1280                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1281                         return -1;
1282                 }
1283
1284                 for (j=0;j<ips->num;j++) {
1285                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1286                                 talloc_free(tmp_ctx);
1287                                 return nodemap->nodes[i].pnn;
1288                         }
1289                 }
1290                 talloc_free(ips);
1291         }
1292
1293         talloc_free(tmp_ctx);
1294         return -1;
1295 }
1296
1297 /*
1298   add a public ip address to a node
1299  */
1300 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1301 {
1302         int i, ret;
1303         int len;
1304         uint32_t pnn;
1305         unsigned mask;
1306         ctdb_sock_addr addr;
1307         struct ctdb_control_ip_iface *pub;
1308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1309         struct ctdb_all_public_ips *ips;
1310
1311
1312         if (argc != 2) {
1313                 talloc_free(tmp_ctx);
1314                 usage();
1315         }
1316
1317         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1318                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1319                 talloc_free(tmp_ctx);
1320                 return -1;
1321         }
1322
1323         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1324         if (ret != 0) {
1325                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1326                 talloc_free(tmp_ctx);
1327                 return ret;
1328         }
1329
1330
1331         /* check if some other node is already serving this ip, if not,
1332          * we will claim it
1333          */
1334         for (i=0;i<ips->num;i++) {
1335                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1336                         break;
1337                 }
1338         }
1339
1340         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1341         pub = talloc_size(tmp_ctx, len); 
1342         CTDB_NO_MEMORY(ctdb, pub);
1343
1344         pub->addr  = addr;
1345         pub->mask  = mask;
1346         pub->len   = strlen(argv[1])+1;
1347         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1348
1349         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1350         if (ret != 0) {
1351                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1352                 talloc_free(tmp_ctx);
1353                 return ret;
1354         }
1355
1356         if (i == ips->num) {
1357                 /* no one has this ip so we claim it */
1358                 pnn  = options.pnn;
1359         } else {
1360                 pnn  = ips->ips[i].pnn;
1361         }
1362
1363         if (move_ip(ctdb, &addr, pnn) != 0) {
1364                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1365                 return -1;
1366         }
1367
1368         talloc_free(tmp_ctx);
1369         return 0;
1370 }
1371
1372 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1373
1374 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1375 {
1376         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1377         struct ctdb_node_map *nodemap=NULL;
1378         struct ctdb_all_public_ips *ips;
1379         int ret, i, j;
1380
1381         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1382         if (ret != 0) {
1383                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1384                 return ret;
1385         }
1386
1387         /* remove it from the nodes that are not hosting the ip currently */
1388         for(i=0;i<nodemap->num;i++){
1389                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1390                         continue;
1391                 }
1392                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1393                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1394                         continue;
1395                 }
1396
1397                 for (j=0;j<ips->num;j++) {
1398                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1399                                 break;
1400                         }
1401                 }
1402                 if (j==ips->num) {
1403                         continue;
1404                 }
1405
1406                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1407                         continue;
1408                 }
1409
1410                 options.pnn = nodemap->nodes[i].pnn;
1411                 control_delip(ctdb, argc, argv);
1412         }
1413
1414
1415         /* remove it from every node (also the one hosting it) */
1416         for(i=0;i<nodemap->num;i++){
1417                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1418                         continue;
1419                 }
1420                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1421                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1422                         continue;
1423                 }
1424
1425                 for (j=0;j<ips->num;j++) {
1426                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1427                                 break;
1428                         }
1429                 }
1430                 if (j==ips->num) {
1431                         continue;
1432                 }
1433
1434                 options.pnn = nodemap->nodes[i].pnn;
1435                 control_delip(ctdb, argc, argv);
1436         }
1437
1438         talloc_free(tmp_ctx);
1439         return 0;
1440 }
1441         
1442 /*
1443   delete a public ip address from a node
1444  */
1445 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1446 {
1447         int i, ret;
1448         ctdb_sock_addr addr;
1449         struct ctdb_control_ip_iface pub;
1450         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1451         struct ctdb_all_public_ips *ips;
1452
1453         if (argc != 1) {
1454                 talloc_free(tmp_ctx);
1455                 usage();
1456         }
1457
1458         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1459                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1460                 return -1;
1461         }
1462
1463         if (options.pnn == CTDB_BROADCAST_ALL) {
1464                 return control_delip_all(ctdb, argc, argv, &addr);
1465         }
1466
1467         pub.addr  = addr;
1468         pub.mask  = 0;
1469         pub.len   = 0;
1470
1471         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1472         if (ret != 0) {
1473                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1474                 talloc_free(tmp_ctx);
1475                 return ret;
1476         }
1477         
1478         for (i=0;i<ips->num;i++) {
1479                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1480                         break;
1481                 }
1482         }
1483
1484         if (i==ips->num) {
1485                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1486                         ctdb_addr_to_str(&addr)));
1487                 talloc_free(tmp_ctx);
1488                 return -1;
1489         }
1490
1491         if (ips->ips[i].pnn == options.pnn) {
1492                 ret = find_other_host_for_public_ip(ctdb, &addr);
1493                 if (ret != -1) {
1494                         if (move_ip(ctdb, &addr, ret) != 0) {
1495                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1496                                 return -1;
1497                         }
1498                 }
1499         }
1500
1501         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1502         if (ret != 0) {
1503                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1504                 talloc_free(tmp_ctx);
1505                 return ret;
1506         }
1507
1508         talloc_free(tmp_ctx);
1509         return 0;
1510 }
1511
1512 /*
1513   kill a tcp connection
1514  */
1515 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1516 {
1517         int ret;
1518         struct ctdb_control_killtcp killtcp;
1519
1520         if (argc < 2) {
1521                 usage();
1522         }
1523
1524         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1525                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1526                 return -1;
1527         }
1528
1529         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1530                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1531                 return -1;
1532         }
1533
1534         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1535         if (ret != 0) {
1536                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1537                 return ret;
1538         }
1539
1540         return 0;
1541 }
1542
1543
1544 /*
1545   send a gratious arp
1546  */
1547 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1548 {
1549         int ret;
1550         ctdb_sock_addr addr;
1551
1552         if (argc < 2) {
1553                 usage();
1554         }
1555
1556         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1557                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1558                 return -1;
1559         }
1560
1561         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1562         if (ret != 0) {
1563                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1564                 return ret;
1565         }
1566
1567         return 0;
1568 }
1569
1570 /*
1571   register a server id
1572  */
1573 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1574 {
1575         int ret;
1576         struct ctdb_server_id server_id;
1577
1578         if (argc < 3) {
1579                 usage();
1580         }
1581
1582         server_id.pnn       = strtoul(argv[0], NULL, 0);
1583         server_id.type      = strtoul(argv[1], NULL, 0);
1584         server_id.server_id = strtoul(argv[2], NULL, 0);
1585
1586         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1587         if (ret != 0) {
1588                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1589                 return ret;
1590         }
1591         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1592         sleep(999);
1593         return -1;
1594 }
1595
1596 /*
1597   unregister a server id
1598  */
1599 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1600 {
1601         int ret;
1602         struct ctdb_server_id server_id;
1603
1604         if (argc < 3) {
1605                 usage();
1606         }
1607
1608         server_id.pnn       = strtoul(argv[0], NULL, 0);
1609         server_id.type      = strtoul(argv[1], NULL, 0);
1610         server_id.server_id = strtoul(argv[2], NULL, 0);
1611
1612         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1615                 return ret;
1616         }
1617         return -1;
1618 }
1619
1620 /*
1621   check if a server id exists
1622  */
1623 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1624 {
1625         uint32_t status;
1626         int ret;
1627         struct ctdb_server_id server_id;
1628
1629         if (argc < 3) {
1630                 usage();
1631         }
1632
1633         server_id.pnn       = strtoul(argv[0], NULL, 0);
1634         server_id.type      = strtoul(argv[1], NULL, 0);
1635         server_id.server_id = strtoul(argv[2], NULL, 0);
1636
1637         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1638         if (ret != 0) {
1639                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1640                 return ret;
1641         }
1642
1643         if (status) {
1644                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1645         } else {
1646                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1647         }
1648         return 0;
1649 }
1650
1651 /*
1652   get a list of all server ids that are registered on a node
1653  */
1654 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1655 {
1656         int i, ret;
1657         struct ctdb_server_id_list *server_ids;
1658
1659         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1660         if (ret != 0) {
1661                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1662                 return ret;
1663         }
1664
1665         for (i=0; i<server_ids->num; i++) {
1666                 printf("Server id %d:%d:%d\n", 
1667                         server_ids->server_ids[i].pnn, 
1668                         server_ids->server_ids[i].type, 
1669                         server_ids->server_ids[i].server_id); 
1670         }
1671
1672         return -1;
1673 }
1674
1675 /*
1676   send a tcp tickle ack
1677  */
1678 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1679 {
1680         int ret;
1681         ctdb_sock_addr  src, dst;
1682
1683         if (argc < 2) {
1684                 usage();
1685         }
1686
1687         if (!parse_ip_port(argv[0], &src)) {
1688                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1689                 return -1;
1690         }
1691
1692         if (!parse_ip_port(argv[1], &dst)) {
1693                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1694                 return -1;
1695         }
1696
1697         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1698         if (ret==0) {
1699                 return 0;
1700         }
1701         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1702
1703         return -1;
1704 }
1705
1706
1707 /*
1708   display public ip status
1709  */
1710 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1711 {
1712         int i, ret;
1713         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1714         struct ctdb_all_public_ips *ips;
1715
1716         if (options.pnn == CTDB_BROADCAST_ALL) {
1717                 /* read the list of public ips from all nodes */
1718                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1719         } else {
1720                 /* read the public ip list from this node */
1721                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1722         }
1723         if (ret != 0) {
1724                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1725                 talloc_free(tmp_ctx);
1726                 return ret;
1727         }
1728
1729         if (options.machinereadable){
1730                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1731         } else {
1732                 if (options.pnn == CTDB_BROADCAST_ALL) {
1733                         printf("Public IPs on ALL nodes\n");
1734                 } else {
1735                         printf("Public IPs on node %u\n", options.pnn);
1736                 }
1737         }
1738
1739         for (i=1;i<=ips->num;i++) {
1740                 struct ctdb_control_public_ip_info *info = NULL;
1741                 int32_t pnn;
1742                 char *aciface = NULL;
1743                 char *avifaces = NULL;
1744                 char *cifaces = NULL;
1745
1746                 if (options.pnn == CTDB_BROADCAST_ALL) {
1747                         pnn = ips->ips[ips->num-i].pnn;
1748                 } else {
1749                         pnn = options.pnn;
1750                 }
1751
1752                 if (pnn != -1) {
1753                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1754                                                    &ips->ips[ips->num-i].addr, &info);
1755                 } else {
1756                         ret = -1;
1757                 }
1758
1759                 if (ret == 0) {
1760                         int j;
1761                         for (j=0; j < info->num; j++) {
1762                                 if (cifaces == NULL) {
1763                                         cifaces = talloc_strdup(info,
1764                                                                 info->ifaces[j].name);
1765                                 } else {
1766                                         cifaces = talloc_asprintf_append(cifaces,
1767                                                                          ",%s",
1768                                                                          info->ifaces[j].name);
1769                                 }
1770
1771                                 if (info->active_idx == j) {
1772                                         aciface = info->ifaces[j].name;
1773                                 }
1774
1775                                 if (info->ifaces[j].link_state == 0) {
1776                                         continue;
1777                                 }
1778
1779                                 if (avifaces == NULL) {
1780                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1781                                 } else {
1782                                         avifaces = talloc_asprintf_append(avifaces,
1783                                                                           ",%s",
1784                                                                           info->ifaces[j].name);
1785                                 }
1786                         }
1787                 }
1788
1789                 if (options.machinereadable){
1790                         printf(":%s:%d:%s:%s:%s:\n",
1791                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1792                                ips->ips[ips->num-i].pnn,
1793                                aciface?aciface:"",
1794                                avifaces?avifaces:"",
1795                                cifaces?cifaces:"");
1796                 } else {
1797                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1798                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1799                                ips->ips[ips->num-i].pnn,
1800                                aciface?aciface:"",
1801                                avifaces?avifaces:"",
1802                                cifaces?cifaces:"");
1803                 }
1804                 talloc_free(info);
1805         }
1806
1807         talloc_free(tmp_ctx);
1808         return 0;
1809 }
1810
1811 /*
1812   public ip info
1813  */
1814 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1815 {
1816         int i, ret;
1817         ctdb_sock_addr addr;
1818         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1819         struct ctdb_control_public_ip_info *info;
1820
1821         if (argc != 1) {
1822                 talloc_free(tmp_ctx);
1823                 usage();
1824         }
1825
1826         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1827                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1828                 return -1;
1829         }
1830
1831         /* read the public ip info from this node */
1832         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1833                                            tmp_ctx, &addr, &info);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1836                                   argv[0], options.pnn));
1837                 talloc_free(tmp_ctx);
1838                 return ret;
1839         }
1840
1841         printf("Public IP[%s] info on node %u\n",
1842                ctdb_addr_to_str(&info->ip.addr),
1843                options.pnn);
1844
1845         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1846                ctdb_addr_to_str(&info->ip.addr),
1847                info->ip.pnn, info->num);
1848
1849         for (i=0; i<info->num; i++) {
1850                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1851
1852                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1853                        i+1, info->ifaces[i].name,
1854                        info->ifaces[i].link_state?"up":"down",
1855                        (unsigned int)info->ifaces[i].references,
1856                        (i==info->active_idx)?" (active)":"");
1857         }
1858
1859         talloc_free(tmp_ctx);
1860         return 0;
1861 }
1862
1863 /*
1864   display interfaces status
1865  */
1866 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1867 {
1868         int i, ret;
1869         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1870         struct ctdb_control_get_ifaces *ifaces;
1871
1872         /* read the public ip list from this node */
1873         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1874                                    tmp_ctx, &ifaces);
1875         if (ret != 0) {
1876                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1877                                   options.pnn));
1878                 talloc_free(tmp_ctx);
1879                 return ret;
1880         }
1881
1882         if (options.machinereadable){
1883                 printf(":Name:LinkStatus:References:\n");
1884         } else {
1885                 printf("Interfaces on node %u\n", options.pnn);
1886         }
1887
1888         for (i=0; i<ifaces->num; i++) {
1889                 if (options.machinereadable){
1890                         printf(":%s:%s:%u\n",
1891                                ifaces->ifaces[i].name,
1892                                ifaces->ifaces[i].link_state?"1":"0",
1893                                (unsigned int)ifaces->ifaces[i].references);
1894                 } else {
1895                         printf("name:%s link:%s references:%u\n",
1896                                ifaces->ifaces[i].name,
1897                                ifaces->ifaces[i].link_state?"up":"down",
1898                                (unsigned int)ifaces->ifaces[i].references);
1899                 }
1900         }
1901
1902         talloc_free(tmp_ctx);
1903         return 0;
1904 }
1905
1906
1907 /*
1908   set link status of an interface
1909  */
1910 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1911 {
1912         int ret;
1913         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1914         struct ctdb_control_iface_info info;
1915
1916         ZERO_STRUCT(info);
1917
1918         if (argc != 2) {
1919                 usage();
1920         }
1921
1922         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1923                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1924                                   argv[0]));
1925                 talloc_free(tmp_ctx);
1926                 return -1;
1927         }
1928         strcpy(info.name, argv[0]);
1929
1930         if (strcmp(argv[1], "up") == 0) {
1931                 info.link_state = 1;
1932         } else if (strcmp(argv[1], "down") == 0) {
1933                 info.link_state = 0;
1934         } else {
1935                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1936                                   argv[1]));
1937                 talloc_free(tmp_ctx);
1938                 return -1;
1939         }
1940
1941         /* read the public ip list from this node */
1942         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1943                                    tmp_ctx, &info);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1946                                   argv[0], options.pnn));
1947                 talloc_free(tmp_ctx);
1948                 return ret;
1949         }
1950
1951         talloc_free(tmp_ctx);
1952         return 0;
1953 }
1954
1955 /*
1956   display pid of a ctdb daemon
1957  */
1958 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1959 {
1960         uint32_t pid;
1961         int ret;
1962
1963         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1964         if (ret != 0) {
1965                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1966                 return ret;
1967         }
1968         printf("Pid:%d\n", pid);
1969
1970         return 0;
1971 }
1972
1973 static uint32_t ipreallocate_finished;
1974
1975 /*
1976   handler for receiving the response to ipreallocate
1977 */
1978 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1979                              TDB_DATA data, void *private_data)
1980 {
1981         ipreallocate_finished = 1;
1982 }
1983
1984 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1985 {
1986         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1987
1988         event_add_timed(ctdb->ev, ctdb, 
1989                                 timeval_current_ofs(1, 0),
1990                                 ctdb_every_second, ctdb);
1991 }
1992
1993 /*
1994   ask the recovery daemon on the recovery master to perform a ip reallocation
1995  */
1996 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1997 {
1998         int i, ret;
1999         TDB_DATA data;
2000         struct takeover_run_reply rd;
2001         uint32_t recmaster;
2002         struct ctdb_node_map *nodemap=NULL;
2003         int retries=0;
2004         struct timeval tv = timeval_current();
2005
2006         /* we need some events to trigger so we can timeout and restart
2007            the loop
2008         */
2009         event_add_timed(ctdb->ev, ctdb, 
2010                                 timeval_current_ofs(1, 0),
2011                                 ctdb_every_second, ctdb);
2012
2013         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2014         if (rd.pnn == -1) {
2015                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
2016                 return -1;
2017         }
2018         rd.srvid = getpid();
2019
2020         /* register a message port for receiveing the reply so that we
2021            can receive the reply
2022         */
2023         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2024
2025         data.dptr = (uint8_t *)&rd;
2026         data.dsize = sizeof(rd);
2027
2028 again:
2029         /* check that there are valid nodes available */
2030         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2031                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2032                 return -1;
2033         }
2034         for (i=0; i<nodemap->num;i++) {
2035                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2036                         break;
2037                 }
2038         }
2039         if (i==nodemap->num) {
2040                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2041                 return 0;
2042         }
2043
2044
2045         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2048                 return ret;
2049         }
2050
2051         /* verify the node exists */
2052         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2053                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2054                 return -1;
2055         }
2056
2057
2058         /* check tha there are nodes available that can act as a recmaster */
2059         for (i=0; i<nodemap->num; i++) {
2060                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2061                         continue;
2062                 }
2063                 break;
2064         }
2065         if (i == nodemap->num) {
2066                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2067                 return 0;
2068         }
2069
2070         /* verify the recovery master is not STOPPED, nor BANNED */
2071         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2072                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2073                 retries++;
2074                 sleep(1);
2075                 goto again;
2076         } 
2077
2078         
2079         /* verify the recovery master is not STOPPED, nor BANNED */
2080         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2081                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2082                 retries++;
2083                 sleep(1);
2084                 goto again;
2085         } 
2086
2087         ipreallocate_finished = 0;
2088         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2091                 return -1;
2092         }
2093
2094         tv = timeval_current();
2095         /* this loop will terminate when we have received the reply */
2096         while (timeval_elapsed(&tv) < 3.0) {
2097                 event_loop_once(ctdb->ev);
2098         }
2099         if (ipreallocate_finished == 1) {
2100                 return 0;
2101         }
2102
2103         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2104         retries++;
2105         sleep(1);
2106         goto again;
2107
2108         return 0;
2109 }
2110
2111
2112 /*
2113   disable a remote node
2114  */
2115 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2116 {
2117         int ret;
2118         struct ctdb_node_map *nodemap=NULL;
2119
2120         /* check if the node is already disabled */
2121         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2122                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2123                 exit(10);
2124         }
2125         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2126                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2127                 return 0;
2128         }
2129
2130         do {
2131                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2132                 if (ret != 0) {
2133                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2134                         return ret;
2135                 }
2136
2137                 sleep(1);
2138
2139                 /* read the nodemap and verify the change took effect */
2140                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2141                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2142                         exit(10);
2143                 }
2144
2145         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2146         ret = control_ipreallocate(ctdb, argc, argv);
2147         if (ret != 0) {
2148                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2149                 return ret;
2150         }
2151
2152         return 0;
2153 }
2154
2155 /*
2156   enable a disabled remote node
2157  */
2158 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2159 {
2160         int ret;
2161
2162         struct ctdb_node_map *nodemap=NULL;
2163
2164
2165         /* check if the node is already enabled */
2166         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2167                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2168                 exit(10);
2169         }
2170         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2171                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2172                 return 0;
2173         }
2174
2175         do {
2176                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2177                 if (ret != 0) {
2178                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2179                         return ret;
2180                 }
2181
2182                 sleep(1);
2183
2184                 /* read the nodemap and verify the change took effect */
2185                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2186                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2187                         exit(10);
2188                 }
2189
2190         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2191
2192         ret = control_ipreallocate(ctdb, argc, argv);
2193         if (ret != 0) {
2194                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2195                 return ret;
2196         }
2197
2198         return 0;
2199 }
2200
2201 /*
2202   stop a remote node
2203  */
2204 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2205 {
2206         int ret;
2207         struct ctdb_node_map *nodemap=NULL;
2208
2209         do {
2210                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2211                 if (ret != 0) {
2212                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2213                 }
2214         
2215                 sleep(1);
2216
2217                 /* read the nodemap and verify the change took effect */
2218                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2219                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2220                         exit(10);
2221                 }
2222
2223         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2224         ret = control_ipreallocate(ctdb, argc, argv);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2227                 return ret;
2228         }
2229
2230         return 0;
2231 }
2232
2233 /*
2234   restart a stopped remote node
2235  */
2236 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2237 {
2238         int ret;
2239
2240         struct ctdb_node_map *nodemap=NULL;
2241
2242         do {
2243                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2244                 if (ret != 0) {
2245                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2246                         return ret;
2247                 }
2248         
2249                 sleep(1);
2250
2251                 /* read the nodemap and verify the change took effect */
2252                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2253                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2254                         exit(10);
2255                 }
2256
2257         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2258         ret = control_ipreallocate(ctdb, argc, argv);
2259         if (ret != 0) {
2260                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2261                 return ret;
2262         }
2263
2264         return 0;
2265 }
2266
2267 static uint32_t get_generation(struct ctdb_context *ctdb)
2268 {
2269         struct ctdb_vnn_map *vnnmap=NULL;
2270         int ret;
2271
2272         /* wait until the recmaster is not in recovery mode */
2273         while (1) {
2274                 uint32_t recmode, recmaster;
2275                 
2276                 if (vnnmap != NULL) {
2277                         talloc_free(vnnmap);
2278                         vnnmap = NULL;
2279                 }
2280
2281                 /* get the recmaster */
2282                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2283                 if (ret != 0) {
2284                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2285                         exit(10);
2286                 }
2287
2288                 /* get recovery mode */
2289                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2290                 if (ret != 0) {
2291                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2292                         exit(10);
2293                 }
2294
2295                 /* get the current generation number */
2296                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2297                 if (ret != 0) {
2298                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2299                         exit(10);
2300                 }
2301
2302                 if ((recmode == CTDB_RECOVERY_NORMAL)
2303                 &&  (vnnmap->generation != 1)){
2304                         return vnnmap->generation;
2305                 }
2306                 sleep(1);
2307         }
2308 }
2309
2310 /*
2311   ban a node from the cluster
2312  */
2313 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2314 {
2315         int ret;
2316         struct ctdb_node_map *nodemap=NULL;
2317         struct ctdb_ban_time bantime;
2318
2319         if (argc < 1) {
2320                 usage();
2321         }
2322         
2323         /* verify the node exists */
2324         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2325         if (ret != 0) {
2326                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2327                 return ret;
2328         }
2329
2330         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2331                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2332                 return -1;
2333         }
2334
2335         bantime.pnn  = options.pnn;
2336         bantime.time = strtoul(argv[0], NULL, 0);
2337
2338         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2339         if (ret != 0) {
2340                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2341                 return -1;
2342         }       
2343
2344         ret = control_ipreallocate(ctdb, argc, argv);
2345         if (ret != 0) {
2346                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2347                 return ret;
2348         }
2349
2350         return 0;
2351 }
2352
2353
2354 /*
2355   unban a node from the cluster
2356  */
2357 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2358 {
2359         int ret;
2360         struct ctdb_node_map *nodemap=NULL;
2361         struct ctdb_ban_time bantime;
2362
2363         /* verify the node exists */
2364         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2367                 return ret;
2368         }
2369
2370         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2371                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2372                 return -1;
2373         }
2374
2375         bantime.pnn  = options.pnn;
2376         bantime.time = 0;
2377
2378         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2379         if (ret != 0) {
2380                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2381                 return -1;
2382         }       
2383
2384         ret = control_ipreallocate(ctdb, argc, argv);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2387                 return ret;
2388         }
2389
2390         return 0;
2391 }
2392
2393
2394 /*
2395   show ban information for a node
2396  */
2397 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2398 {
2399         int ret;
2400         struct ctdb_node_map *nodemap=NULL;
2401         struct ctdb_ban_time *bantime;
2402
2403         /* verify the node exists */
2404         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2405         if (ret != 0) {
2406                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2407                 return ret;
2408         }
2409
2410         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2411         if (ret != 0) {
2412                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2413                 return -1;
2414         }       
2415
2416         if (bantime->time == 0) {
2417                 printf("Node %u is not banned\n", bantime->pnn);
2418         } else {
2419                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2420         }
2421
2422         return 0;
2423 }
2424
2425 /*
2426   shutdown a daemon
2427  */
2428 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2429 {
2430         int ret;
2431
2432         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2433         if (ret != 0) {
2434                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2435                 return ret;
2436         }
2437
2438         return 0;
2439 }
2440
2441 /*
2442   trigger a recovery
2443  */
2444 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2445 {
2446         int ret;
2447         uint32_t generation, next_generation;
2448
2449         /* record the current generation number */
2450         generation = get_generation(ctdb);
2451
2452         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2453         if (ret != 0) {
2454                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2455                 return ret;
2456         }
2457
2458         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2459         if (ret != 0) {
2460                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2461                 return ret;
2462         }
2463
2464         /* wait until we are in a new generation */
2465         while (1) {
2466                 next_generation = get_generation(ctdb);
2467                 if (next_generation != generation) {
2468                         return 0;
2469                 }
2470                 sleep(1);
2471         }
2472
2473         return 0;
2474 }
2475
2476
2477 /*
2478   display monitoring mode of a remote node
2479  */
2480 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2481 {
2482         uint32_t monmode;
2483         int ret;
2484
2485         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2486         if (ret != 0) {
2487                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2488                 return ret;
2489         }
2490         if (!options.machinereadable){
2491                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2492         } else {
2493                 printf(":mode:\n");
2494                 printf(":%d:\n",monmode);
2495         }
2496         return 0;
2497 }
2498
2499
2500 /*
2501   display capabilities of a remote node
2502  */
2503 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2504 {
2505         uint32_t capabilities;
2506         int ret;
2507
2508         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2511                 return ret;
2512         }
2513         
2514         if (!options.machinereadable){
2515                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2516                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2517                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2518                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2519         } else {
2520                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2521                 printf(":%d:%d:%d:%d:\n",
2522                         !!(capabilities&CTDB_CAP_RECMASTER),
2523                         !!(capabilities&CTDB_CAP_LMASTER),
2524                         !!(capabilities&CTDB_CAP_LVS),
2525                         !!(capabilities&CTDB_CAP_NATGW));
2526         }
2527         return 0;
2528 }
2529
2530 /*
2531   display lvs configuration
2532  */
2533 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2534 {
2535         uint32_t *capabilities;
2536         struct ctdb_node_map *nodemap=NULL;
2537         int i, ret;
2538         int healthy_count = 0;
2539
2540         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2543                 return ret;
2544         }
2545
2546         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2547         CTDB_NO_MEMORY(ctdb, capabilities);
2548         
2549         /* collect capabilities for all connected nodes */
2550         for (i=0; i<nodemap->num; i++) {
2551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2552                         continue;
2553                 }
2554                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2555                         continue;
2556                 }
2557         
2558                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2559                 if (ret != 0) {
2560                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2561                         return ret;
2562                 }
2563
2564                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2565                         continue;
2566                 }
2567
2568                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2569                         healthy_count++;
2570                 }
2571         }
2572
2573         /* Print all LVS nodes */
2574         for (i=0; i<nodemap->num; i++) {
2575                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2576                         continue;
2577                 }
2578                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2579                         continue;
2580                 }
2581                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2582                         continue;
2583                 }
2584
2585                 if (healthy_count != 0) {
2586                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2587                                 continue;
2588                         }
2589                 }
2590
2591                 printf("%d:%s\n", i, 
2592                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2593         }
2594
2595         return 0;
2596 }
2597
2598 /*
2599   display who is the lvs master
2600  */
2601 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2602 {
2603         uint32_t *capabilities;
2604         struct ctdb_node_map *nodemap=NULL;
2605         int i, ret;
2606         int healthy_count = 0;
2607
2608         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2609         if (ret != 0) {
2610                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2611                 return ret;
2612         }
2613
2614         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2615         CTDB_NO_MEMORY(ctdb, capabilities);
2616         
2617         /* collect capabilities for all connected nodes */
2618         for (i=0; i<nodemap->num; i++) {
2619                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2620                         continue;
2621                 }
2622                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2623                         continue;
2624                 }
2625         
2626                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2627                 if (ret != 0) {
2628                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2629                         return ret;
2630                 }
2631
2632                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2633                         continue;
2634                 }
2635
2636                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2637                         healthy_count++;
2638                 }
2639         }
2640
2641         /* find and show the lvsmaster */
2642         for (i=0; i<nodemap->num; i++) {
2643                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2644                         continue;
2645                 }
2646                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2647                         continue;
2648                 }
2649                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2650                         continue;
2651                 }
2652
2653                 if (healthy_count != 0) {
2654                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2655                                 continue;
2656                         }
2657                 }
2658
2659                 if (options.machinereadable){
2660                         printf("%d\n", i);
2661                 } else {
2662                         printf("Node %d is LVS master\n", i);
2663                 }
2664                 return 0;
2665         }
2666
2667         printf("There is no LVS master\n");
2668         return -1;
2669 }
2670
2671 /*
2672   disable monitoring on a  node
2673  */
2674 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2675 {
2676         
2677         int ret;
2678
2679         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2680         if (ret != 0) {
2681                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2682                 return ret;
2683         }
2684         printf("Monitoring mode:%s\n","DISABLED");
2685
2686         return 0;
2687 }
2688
2689 /*
2690   enable monitoring on a  node
2691  */
2692 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2693 {
2694         
2695         int ret;
2696
2697         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2698         if (ret != 0) {
2699                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2700                 return ret;
2701         }
2702         printf("Monitoring mode:%s\n","ACTIVE");
2703
2704         return 0;
2705 }
2706
2707 /*
2708   display remote list of keys/data for a db
2709  */
2710 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2711 {
2712         const char *db_name;
2713         struct ctdb_db_context *ctdb_db;
2714         int ret;
2715
2716         if (argc < 1) {
2717                 usage();
2718         }
2719
2720         db_name = argv[0];
2721
2722
2723         if (db_exists(ctdb, db_name)) {
2724                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2725                 return -1;
2726         }
2727
2728         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2729
2730         if (ctdb_db == NULL) {
2731                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2732                 return -1;
2733         }
2734
2735         /* traverse and dump the cluster tdb */
2736         ret = ctdb_dump_db(ctdb_db, stdout);
2737         if (ret == -1) {
2738                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2739                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2740                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2741                                   db_name));
2742                 return -1;
2743         }
2744         talloc_free(ctdb_db);
2745
2746         printf("Dumped %d records\n", ret);
2747         return 0;
2748 }
2749
2750
2751 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2752                              TDB_DATA data, void *private_data)
2753 {
2754         DEBUG(DEBUG_ERR,("Log data received\n"));
2755         if (data.dsize > 0) {
2756                 printf("%s", data.dptr);
2757         }
2758
2759         exit(0);
2760 }
2761
2762 /*
2763   display a list of log messages from the in memory ringbuffer
2764  */
2765 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2766 {
2767         int ret;
2768         int32_t res;
2769         struct ctdb_get_log_addr log_addr;
2770         TDB_DATA data;
2771         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2772         char *errmsg;
2773         struct timeval tv;
2774
2775         if (argc != 1) {
2776                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2777                 talloc_free(tmp_ctx);
2778                 return -1;
2779         }
2780
2781         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2782         log_addr.srvid = getpid();
2783         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2784                 log_addr.level = get_debug_by_desc(argv[0]);
2785         } else {
2786                 log_addr.level = strtol(argv[0], NULL, 0);
2787         }
2788
2789
2790         data.dptr = (unsigned char *)&log_addr;
2791         data.dsize = sizeof(log_addr);
2792
2793         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2794
2795         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2796         sleep(1);
2797
2798         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2799
2800         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2801                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2802         if (ret != 0 || res != 0) {
2803                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2804                 talloc_free(tmp_ctx);
2805                 return -1;
2806         }
2807
2808
2809         tv = timeval_current();
2810         /* this loop will terminate when we have received the reply */
2811         while (timeval_elapsed(&tv) < 3.0) {    
2812                 event_loop_once(ctdb->ev);
2813         }
2814
2815         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2816
2817         talloc_free(tmp_ctx);
2818         return 0;
2819 }
2820
2821 /*
2822   clear the in memory log area
2823  */
2824 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2825 {
2826         int ret;
2827         int32_t res;
2828         char *errmsg;
2829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2830
2831         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2832                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2833         if (ret != 0 || res != 0) {
2834                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2835                 talloc_free(tmp_ctx);
2836                 return -1;
2837         }
2838
2839         talloc_free(tmp_ctx);
2840         return 0;
2841 }
2842
2843
2844
2845 /*
2846   display a list of the databases on a remote ctdb
2847  */
2848 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2849 {
2850         int i, ret;
2851         struct ctdb_dbid_map *dbmap=NULL;
2852
2853         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2854         if (ret != 0) {
2855                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2856                 return ret;
2857         }
2858
2859         if(options.machinereadable){
2860                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2861                 for(i=0;i<dbmap->num;i++){
2862                         const char *path;
2863                         const char *name;
2864                         const char *health;
2865                         bool persistent;
2866
2867                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2868                                             dbmap->dbs[i].dbid, ctdb, &path);
2869                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2870                                             dbmap->dbs[i].dbid, ctdb, &name);
2871                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2872                                               dbmap->dbs[i].dbid, ctdb, &health);
2873                         persistent = dbmap->dbs[i].persistent;
2874                         printf(":0x%08X:%s:%s:%d:%d:\n",
2875                                dbmap->dbs[i].dbid, name, path,
2876                                !!(persistent), !!(health));
2877                 }
2878                 return 0;
2879         }
2880
2881         printf("Number of databases:%d\n", dbmap->num);
2882         for(i=0;i<dbmap->num;i++){
2883                 const char *path;
2884                 const char *name;
2885                 const char *health;
2886                 bool persistent;
2887
2888                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2889                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2890                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2891                 persistent = dbmap->dbs[i].persistent;
2892                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2893                        dbmap->dbs[i].dbid, name, path,
2894                        persistent?" PERSISTENT":"",
2895                        health?" UNHEALTHY":"");
2896         }
2897
2898         return 0;
2899 }
2900
2901 /*
2902   display the status of a database on a remote ctdb
2903  */
2904 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2905 {
2906         int i, ret;
2907         struct ctdb_dbid_map *dbmap=NULL;
2908         const char *db_name;
2909
2910         if (argc < 1) {
2911                 usage();
2912         }
2913
2914         db_name = argv[0];
2915
2916         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2917         if (ret != 0) {
2918                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2919                 return ret;
2920         }
2921
2922         for(i=0;i<dbmap->num;i++){
2923                 const char *path;
2924                 const char *name;
2925                 const char *health;
2926                 bool persistent;
2927
2928                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2929                 if (strcmp(name, db_name) != 0) {
2930                         continue;
2931                 }
2932
2933                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2934                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2935                 persistent = dbmap->dbs[i].persistent;
2936                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2937                        dbmap->dbs[i].dbid, name, path,
2938                        persistent?"yes":"no",
2939                        health?health:"OK");
2940                 return 0;
2941         }
2942
2943         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2944         return 0;
2945 }
2946
2947 /*
2948   check if the local node is recmaster or not
2949   it will return 1 if this node is the recmaster and 0 if it is not
2950   or if the local ctdb daemon could not be contacted
2951  */
2952 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2953 {
2954         uint32_t mypnn, recmaster;
2955         int ret;
2956
2957         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2958         if (mypnn == -1) {
2959                 printf("Failed to get pnn of node\n");
2960                 return 1;
2961         }
2962
2963         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2964         if (ret != 0) {
2965                 printf("Failed to get the recmaster\n");
2966                 return 1;
2967         }
2968
2969         if (recmaster != mypnn) {
2970                 printf("this node is not the recmaster\n");
2971                 return 1;
2972         }
2973
2974         printf("this node is the recmaster\n");
2975         return 0;
2976 }
2977
2978 /*
2979   ping a node
2980  */
2981 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2982 {
2983         int ret;
2984         struct timeval tv = timeval_current();
2985         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2986         if (ret == -1) {
2987                 printf("Unable to get ping response from node %u\n", options.pnn);
2988                 return -1;
2989         } else {
2990                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2991                        options.pnn, timeval_elapsed(&tv), ret);
2992         }
2993         return 0;
2994 }
2995
2996
2997 /*
2998   get a tunable
2999  */
3000 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3001 {
3002         const char *name;
3003         uint32_t value;
3004         int ret;
3005
3006         if (argc < 1) {
3007                 usage();
3008         }
3009
3010         name = argv[0];
3011         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
3012         if (ret == -1) {
3013                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3014                 return -1;
3015         }
3016
3017         printf("%-19s = %u\n", name, value);
3018         return 0;
3019 }
3020
3021 /*
3022   set a tunable
3023  */
3024 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3025 {
3026         const char *name;
3027         uint32_t value;
3028         int ret;
3029
3030         if (argc < 2) {
3031                 usage();
3032         }
3033
3034         name = argv[0];
3035         value = strtoul(argv[1], NULL, 0);
3036
3037         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3038         if (ret == -1) {
3039                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3040                 return -1;
3041         }
3042         return 0;
3043 }
3044
3045 /*
3046   list all tunables
3047  */
3048 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3049 {
3050         uint32_t count;
3051         const char **list;
3052         int ret, i;
3053
3054         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3055         if (ret == -1) {
3056                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3057                 return -1;
3058         }
3059
3060         for (i=0;i<count;i++) {
3061                 control_getvar(ctdb, 1, &list[i]);
3062         }
3063
3064         talloc_free(list);
3065         
3066         return 0;
3067 }
3068
3069 /*
3070   display debug level on a node
3071  */
3072 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3073 {
3074         int ret;
3075         int32_t level;
3076
3077         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3078         if (ret != 0) {
3079                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3080                 return ret;
3081         } else {
3082                 if (options.machinereadable){
3083                         printf(":Name:Level:\n");
3084                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3085                 } else {
3086                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3087                 }
3088         }
3089         return 0;
3090 }
3091
3092 /*
3093   display reclock file of a node
3094  */
3095 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3096 {
3097         int ret;
3098         const char *reclock;
3099
3100         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3101         if (ret != 0) {
3102                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3103                 return ret;
3104         } else {
3105                 if (options.machinereadable){
3106                         if (reclock != NULL) {
3107                                 printf("%s", reclock);
3108                         }
3109                 } else {
3110                         if (reclock == NULL) {
3111                                 printf("No reclock file used.\n");
3112                         } else {
3113                                 printf("Reclock file:%s\n", reclock);
3114                         }
3115                 }
3116         }
3117         return 0;
3118 }
3119
3120 /*
3121   set the reclock file of a node
3122  */
3123 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3124 {
3125         int ret;
3126         const char *reclock;
3127
3128         if (argc == 0) {
3129                 reclock = NULL;
3130         } else if (argc == 1) {
3131                 reclock = argv[0];
3132         } else {
3133                 usage();
3134         }
3135
3136         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3137         if (ret != 0) {
3138                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3139                 return ret;
3140         }
3141         return 0;
3142 }
3143
3144 /*
3145   set the natgw state on/off
3146  */
3147 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3148 {
3149         int ret;
3150         uint32_t natgwstate;
3151
3152         if (argc == 0) {
3153                 usage();
3154         }
3155
3156         if (!strcmp(argv[0], "on")) {
3157                 natgwstate = 1;
3158         } else if (!strcmp(argv[0], "off")) {
3159                 natgwstate = 0;
3160         } else {
3161                 usage();
3162         }
3163
3164         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3165         if (ret != 0) {
3166                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3167                 return ret;
3168         }
3169
3170         return 0;
3171 }
3172
3173 /*
3174   set the lmaster role on/off
3175  */
3176 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3177 {
3178         int ret;
3179         uint32_t lmasterrole;
3180
3181         if (argc == 0) {
3182                 usage();
3183         }
3184
3185         if (!strcmp(argv[0], "on")) {
3186                 lmasterrole = 1;
3187         } else if (!strcmp(argv[0], "off")) {
3188                 lmasterrole = 0;
3189         } else {
3190                 usage();
3191         }
3192
3193         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3194         if (ret != 0) {
3195                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3196                 return ret;
3197         }
3198
3199         return 0;
3200 }
3201
3202 /*
3203   set the recmaster role on/off
3204  */
3205 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3206 {
3207         int ret;
3208         uint32_t recmasterrole;
3209
3210         if (argc == 0) {
3211                 usage();
3212         }
3213
3214         if (!strcmp(argv[0], "on")) {
3215                 recmasterrole = 1;
3216         } else if (!strcmp(argv[0], "off")) {
3217                 recmasterrole = 0;
3218         } else {
3219                 usage();
3220         }
3221
3222         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3223         if (ret != 0) {
3224                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3225                 return ret;
3226         }
3227
3228         return 0;
3229 }
3230
3231 /*
3232   set debug level on a node or all nodes
3233  */
3234 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3235 {
3236         int i, ret;
3237         int32_t level;
3238
3239         if (argc == 0) {
3240                 printf("You must specify the debug level. Valid levels are:\n");
3241                 for (i=0; debug_levels[i].description != NULL; i++) {
3242                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3243                 }
3244
3245                 return 0;
3246         }
3247
3248         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3249                 level = get_debug_by_desc(argv[0]);
3250         } else {
3251                 level = strtol(argv[0], NULL, 0);
3252         }
3253
3254         for (i=0; debug_levels[i].description != NULL; i++) {
3255                 if (level == debug_levels[i].level) {
3256                         break;
3257                 }
3258         }
3259         if (debug_levels[i].description == NULL) {
3260                 printf("Invalid debug level, must be one of\n");
3261                 for (i=0; debug_levels[i].description != NULL; i++) {
3262                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3263                 }
3264                 return -1;
3265         }
3266
3267         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3268         if (ret != 0) {
3269                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3270         }
3271         return 0;
3272 }
3273
3274
3275 /*
3276   thaw a node
3277  */
3278 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3279 {
3280         int ret;
3281         uint32_t priority;
3282         
3283         if (argc == 1) {
3284                 priority = strtol(argv[0], NULL, 0);
3285         } else {
3286                 priority = 0;
3287         }
3288         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3289
3290         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3291         if (ret != 0) {
3292                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3293         }               
3294         return 0;
3295 }
3296
3297
3298 /*
3299   attach to a database
3300  */
3301 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3302 {
3303         const char *db_name;
3304         struct ctdb_db_context *ctdb_db;
3305
3306         if (argc < 1) {
3307                 usage();
3308         }
3309         db_name = argv[0];
3310
3311         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3312         if (ctdb_db == NULL) {
3313                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3314                 return -1;
3315         }
3316
3317         return 0;
3318 }
3319
3320 /*
3321   set db priority
3322  */
3323 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3324 {
3325         struct ctdb_db_priority db_prio;
3326         int ret;
3327
3328         if (argc < 2) {
3329                 usage();
3330         }
3331
3332         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3333         db_prio.priority = strtoul(argv[1], NULL, 0);
3334
3335         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3336         if (ret != 0) {
3337                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3338                 return -1;
3339         }
3340
3341         return 0;
3342 }
3343
3344 /*
3345   get db priority
3346  */
3347 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3348 {
3349         uint32_t db_id, priority;
3350         int ret;
3351
3352         if (argc < 1) {
3353                 usage();
3354         }
3355
3356         db_id = strtoul(argv[0], NULL, 0);
3357
3358         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3359         if (ret != 0) {
3360                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3361                 return -1;
3362         }
3363
3364         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3365
3366         return 0;
3367 }
3368
3369 /*
3370   run an eventscript on a node
3371  */
3372 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3373 {
3374         TDB_DATA data;
3375         int ret;
3376         int32_t res;
3377         char *errmsg;
3378         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3379
3380         if (argc != 1) {
3381                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3382                 return -1;
3383         }
3384
3385         data.dptr = (unsigned char *)discard_const(argv[0]);
3386         data.dsize = strlen((char *)data.dptr) + 1;
3387
3388         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3389
3390         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3391                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3392         if (ret != 0 || res != 0) {
3393                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3394                 talloc_free(tmp_ctx);
3395                 return -1;
3396         }
3397         talloc_free(tmp_ctx);
3398         return 0;
3399 }
3400
3401 #define DB_VERSION 1
3402 #define MAX_DB_NAME 64
3403 struct db_file_header {
3404         unsigned long version;
3405         time_t timestamp;
3406         unsigned long persistent;
3407         unsigned long size;
3408         const char name[MAX_DB_NAME];
3409 };
3410
3411 struct backup_data {
3412         struct ctdb_marshall_buffer *records;
3413         uint32_t len;
3414         uint32_t total;
3415         bool traverse_error;
3416 };
3417
3418 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3419 {
3420         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3421         struct ctdb_rec_data *rec;
3422
3423         /* add the record */
3424         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3425         if (rec == NULL) {
3426                 bd->traverse_error = true;
3427                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3428                 return -1;
3429         }
3430         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3431         if (bd->records == NULL) {
3432                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3433                 bd->traverse_error = true;
3434                 return -1;
3435         }
3436         bd->records->count++;
3437         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3438         bd->len += rec->length;
3439         talloc_free(rec);
3440
3441         bd->total++;
3442         return 0;
3443 }
3444
3445 /*
3446  * backup a database to a file 
3447  */
3448 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3449 {
3450         int i, ret;
3451         struct ctdb_dbid_map *dbmap=NULL;
3452         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3453         struct db_file_header dbhdr;
3454         struct ctdb_db_context *ctdb_db;
3455         struct backup_data *bd;
3456         int fh = -1;
3457         int status = -1;
3458         const char *reason = NULL;
3459
3460         if (argc != 2) {
3461                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3462                 return -1;
3463         }
3464
3465         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3466         if (ret != 0) {
3467                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3468                 return ret;
3469         }
3470
3471         for(i=0;i<dbmap->num;i++){
3472                 const char *name;
3473
3474                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3475                 if(!strcmp(argv[0], name)){
3476                         talloc_free(discard_const(name));
3477                         break;
3478                 }
3479                 talloc_free(discard_const(name));
3480         }
3481         if (i == dbmap->num) {
3482                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3483                 talloc_free(tmp_ctx);
3484                 return -1;
3485         }
3486
3487         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3488                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3489         if (ret != 0) {
3490                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3491                                  argv[0]));
3492                 talloc_free(tmp_ctx);
3493                 return -1;
3494         }
3495         if (reason) {
3496                 uint32_t allow_unhealthy = 0;
3497
3498                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3499                                       "AllowUnhealthyDBRead",
3500                                       &allow_unhealthy);
3501
3502                 if (allow_unhealthy != 1) {
3503                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3504                                          argv[0], reason));
3505
3506                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3507                                          allow_unhealthy));
3508                         talloc_free(tmp_ctx);
3509                         return -1;
3510                 }
3511
3512                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3513                                      argv[0], argv[0]));
3514                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3515                                      "tunnable AllowUnhealthyDBRead = %u\n",
3516                                      allow_unhealthy));
3517         }
3518
3519         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3520         if (ctdb_db == NULL) {
3521                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3522                 talloc_free(tmp_ctx);
3523                 return -1;
3524         }
3525
3526
3527         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3528         if (ret == -1) {
3529                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3530                 talloc_free(tmp_ctx);
3531                 return -1;
3532         }
3533
3534
3535         bd = talloc_zero(tmp_ctx, struct backup_data);
3536         if (bd == NULL) {
3537                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3538                 talloc_free(tmp_ctx);
3539                 return -1;
3540         }
3541
3542         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3543         if (bd->records == NULL) {
3544                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3545                 talloc_free(tmp_ctx);
3546                 return -1;
3547         }
3548
3549         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3550         bd->records->db_id = ctdb_db->db_id;
3551         /* traverse the database collecting all records */
3552         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3553             bd->traverse_error) {
3554                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3555                 talloc_free(tmp_ctx);
3556                 return -1;              
3557         }
3558
3559         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3560
3561
3562         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3563         if (fh == -1) {
3564                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3565                 talloc_free(tmp_ctx);
3566                 return -1;
3567         }
3568
3569         dbhdr.version = DB_VERSION;
3570         dbhdr.timestamp = time(NULL);
3571         dbhdr.persistent = dbmap->dbs[i].persistent;
3572         dbhdr.size = bd->len;
3573         if (strlen(argv[0]) >= MAX_DB_NAME) {
3574                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3575                 goto done;
3576         }
3577         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3578         ret = write(fh, &dbhdr, sizeof(dbhdr));
3579         if (ret == -1) {
3580                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3581                 goto done;
3582         }
3583         ret = write(fh, bd->records, bd->len);
3584         if (ret == -1) {
3585                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3586                 goto done;
3587         }
3588
3589         status = 0;
3590 done:
3591         if (fh != -1) {
3592                 ret = close(fh);
3593                 if (ret == -1) {
3594                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3595                 }
3596         }
3597         talloc_free(tmp_ctx);
3598         return status;
3599 }
3600
3601 /*
3602  * restore a database from a file 
3603  */
3604 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3605 {
3606         int ret;
3607         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3608         TDB_DATA outdata;
3609         TDB_DATA data;
3610         struct db_file_header dbhdr;
3611         struct ctdb_db_context *ctdb_db;
3612         struct ctdb_node_map *nodemap=NULL;
3613         struct ctdb_vnn_map *vnnmap=NULL;
3614         int i, fh;
3615         struct ctdb_control_wipe_database w;
3616         uint32_t *nodes;
3617         uint32_t generation;
3618         struct tm *tm;
3619         char tbuf[100];
3620         char *dbname;
3621
3622         if (argc < 1 || argc > 2) {
3623                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3624                 return -1;
3625         }
3626
3627         fh = open(argv[0], O_RDONLY);
3628         if (fh == -1) {
3629                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3630                 talloc_free(tmp_ctx);
3631                 return -1;
3632         }
3633
3634         read(fh, &dbhdr, sizeof(dbhdr));
3635         if (dbhdr.version != DB_VERSION) {
3636                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3637                 talloc_free(tmp_ctx);
3638                 return -1;
3639         }
3640
3641         dbname = dbhdr.name;
3642         if (argc == 2) {
3643                 dbname = argv[1];
3644         }
3645
3646         outdata.dsize = dbhdr.size;
3647         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3648         if (outdata.dptr == NULL) {
3649                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3650                 close(fh);
3651                 talloc_free(tmp_ctx);
3652                 return -1;
3653         }               
3654         read(fh, outdata.dptr, outdata.dsize);
3655         close(fh);
3656
3657         tm = localtime(&dbhdr.timestamp);
3658         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3659         printf("Restoring database '%s' from backup @ %s\n",
3660                 dbname, tbuf);
3661
3662
3663         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3664         if (ctdb_db == NULL) {
3665                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3666                 talloc_free(tmp_ctx);
3667                 return -1;
3668         }
3669
3670         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3671         if (ret != 0) {
3672                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3673                 talloc_free(tmp_ctx);
3674                 return ret;
3675         }
3676
3677
3678         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3679         if (ret != 0) {
3680                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3681                 talloc_free(tmp_ctx);
3682                 return ret;
3683         }
3684
3685         /* freeze all nodes */
3686         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3687         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3688                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3689                                         nodes, i,
3690                                         TIMELIMIT(),
3691                                         false, tdb_null,
3692                                         NULL, NULL,
3693                                         NULL) != 0) {
3694                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3695                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3696                         talloc_free(tmp_ctx);
3697                         return -1;
3698                 }
3699         }
3700
3701         generation = vnnmap->generation;
3702         data.dptr = (void *)&generation;
3703         data.dsize = sizeof(generation);
3704
3705         /* start a cluster wide transaction */
3706         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3707         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3708                                         nodes, 0,
3709                                         TIMELIMIT(), false, data,
3710                                         NULL, NULL,
3711                                         NULL) != 0) {
3712                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3713                 return -1;
3714         }
3715
3716
3717         w.db_id = ctdb_db->db_id;
3718         w.transaction_id = generation;
3719
3720         data.dptr = (void *)&w;
3721         data.dsize = sizeof(w);
3722
3723         /* wipe all the remote databases. */
3724         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3725         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3726                                         nodes, 0,
3727                                         TIMELIMIT(), false, data,
3728                                         NULL, NULL,
3729                                         NULL) != 0) {
3730                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3731                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3732                 talloc_free(tmp_ctx);
3733                 return -1;
3734         }
3735         
3736         /* push the database */
3737         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3738         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3739                                         nodes, 0,
3740                                         TIMELIMIT(), false, outdata,
3741                                         NULL, NULL,
3742                                         NULL) != 0) {
3743                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3744                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3745                 talloc_free(tmp_ctx);
3746                 return -1;
3747         }
3748
3749         data.dptr = (void *)&ctdb_db->db_id;
3750         data.dsize = sizeof(ctdb_db->db_id);
3751
3752         /* mark the database as healthy */
3753         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3754         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3755                                         nodes, 0,
3756                                         TIMELIMIT(), false, data,
3757                                         NULL, NULL,
3758                                         NULL) != 0) {
3759                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3760                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3761                 talloc_free(tmp_ctx);
3762                 return -1;
3763         }
3764
3765         data.dptr = (void *)&generation;
3766         data.dsize = sizeof(generation);
3767
3768         /* commit all the changes */
3769         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3770                                         nodes, 0,
3771                                         TIMELIMIT(), false, data,
3772                                         NULL, NULL,
3773                                         NULL) != 0) {
3774                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3775                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3776                 talloc_free(tmp_ctx);
3777                 return -1;
3778         }
3779
3780
3781         /* thaw all nodes */
3782         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3783         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3784                                         nodes, 0,
3785                                         TIMELIMIT(),
3786                                         false, tdb_null,
3787                                         NULL, NULL,
3788                                         NULL) != 0) {
3789                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3790                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3791                 talloc_free(tmp_ctx);
3792                 return -1;
3793         }
3794
3795
3796         talloc_free(tmp_ctx);
3797         return 0;
3798 }
3799
3800 /*
3801  * dump a database backup from a file
3802  */
3803 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3804 {
3805         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3806         TDB_DATA outdata;
3807         struct db_file_header dbhdr;
3808         int i, fh;
3809         struct tm *tm;
3810         char tbuf[100];
3811         struct ctdb_rec_data *rec = NULL;
3812         struct ctdb_marshall_buffer *m;
3813
3814         if (argc != 1) {
3815                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3816                 return -1;
3817         }
3818
3819         fh = open(argv[0], O_RDONLY);
3820         if (fh == -1) {
3821                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3822                 talloc_free(tmp_ctx);
3823                 return -1;
3824         }
3825
3826         read(fh, &dbhdr, sizeof(dbhdr));
3827         if (dbhdr.version != DB_VERSION) {
3828                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3829                 talloc_free(tmp_ctx);
3830                 return -1;
3831         }
3832
3833         outdata.dsize = dbhdr.size;
3834         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3835         if (outdata.dptr == NULL) {
3836                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3837                 close(fh);
3838                 talloc_free(tmp_ctx);
3839                 return -1;
3840         }
3841         read(fh, outdata.dptr, outdata.dsize);
3842         close(fh);
3843         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3844
3845         tm = localtime(&dbhdr.timestamp);
3846         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3847         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3848                 dbhdr.name, m->db_id, tbuf);
3849
3850         for (i=0; i < m->count; i++) {
3851                 uint32_t reqid = 0;
3852                 TDB_DATA key, data;
3853
3854                 /* we do not want the header splitted, so we pass NULL*/
3855                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3856                                               NULL, &key, &data);
3857
3858                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3859         }
3860
3861         printf("Dumped %d records\n", i);
3862         talloc_free(tmp_ctx);
3863         return 0;
3864 }
3865
3866 /*
3867  * wipe a database from a file
3868  */
3869 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3870                           const char **argv)
3871 {
3872         int ret;
3873         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3874         TDB_DATA data;
3875         struct ctdb_db_context *ctdb_db;
3876         struct ctdb_node_map *nodemap = NULL;
3877         struct ctdb_vnn_map *vnnmap = NULL;
3878         int i;
3879         struct ctdb_control_wipe_database w;
3880         uint32_t *nodes;
3881         uint32_t generation;
3882         struct ctdb_dbid_map *dbmap = NULL;
3883
3884         if (argc != 1) {
3885                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3886                 return -1;
3887         }
3888
3889         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3890                                  &dbmap);
3891         if (ret != 0) {
3892                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3893                                   options.pnn));
3894                 return ret;
3895         }
3896
3897         for(i=0;i<dbmap->num;i++){
3898                 const char *name;
3899
3900                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3901                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3902                 if(!strcmp(argv[0], name)){
3903                         talloc_free(discard_const(name));
3904                         break;
3905                 }
3906                 talloc_free(discard_const(name));
3907         }
3908         if (i == dbmap->num) {
3909                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3910                                   argv[0]));
3911                 talloc_free(tmp_ctx);
3912                 return -1;
3913         }
3914
3915         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3916         if (ctdb_db == NULL) {
3917                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3918                                   argv[0]));
3919                 talloc_free(tmp_ctx);
3920                 return -1;
3921         }
3922
3923         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3924                                    &nodemap);
3925         if (ret != 0) {
3926                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3927                                   options.pnn));
3928                 talloc_free(tmp_ctx);
3929                 return ret;
3930         }
3931
3932         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3933                                   &vnnmap);
3934         if (ret != 0) {
3935                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3936                                   options.pnn));
3937                 talloc_free(tmp_ctx);
3938                 return ret;
3939         }
3940
3941         /* freeze all nodes */
3942         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3943         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3944                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3945                                                 nodes, i,
3946                                                 TIMELIMIT(),
3947                                                 false, tdb_null,
3948                                                 NULL, NULL,
3949                                                 NULL);
3950                 if (ret != 0) {
3951                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3952                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3953                                              CTDB_RECOVERY_ACTIVE);
3954                         talloc_free(tmp_ctx);
3955                         return -1;
3956                 }
3957         }
3958
3959         generation = vnnmap->generation;
3960         data.dptr = (void *)&generation;
3961         data.dsize = sizeof(generation);
3962
3963         /* start a cluster wide transaction */
3964         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3965         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3966                                         nodes, 0,
3967                                         TIMELIMIT(), false, data,
3968                                         NULL, NULL,
3969                                         NULL);
3970         if (ret!= 0) {
3971                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3972                                   "transactions.\n"));
3973                 return -1;
3974         }
3975
3976         w.db_id = ctdb_db->db_id;
3977         w.transaction_id = generation;
3978
3979         data.dptr = (void *)&w;
3980         data.dsize = sizeof(w);
3981
3982         /* wipe all the remote databases. */
3983         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3984         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3985                                         nodes, 0,
3986                                         TIMELIMIT(), false, data,
3987                                         NULL, NULL,
3988                                         NULL) != 0) {
3989                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3990                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3991                 talloc_free(tmp_ctx);
3992                 return -1;
3993         }
3994
3995         data.dptr = (void *)&ctdb_db->db_id;
3996         data.dsize = sizeof(ctdb_db->db_id);
3997
3998         /* mark the database as healthy */
3999         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4000         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4001                                         nodes, 0,
4002                                         TIMELIMIT(), false, data,
4003                                         NULL, NULL,
4004                                         NULL) != 0) {
4005                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4006                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4007                 talloc_free(tmp_ctx);
4008                 return -1;
4009         }
4010
4011         data.dptr = (void *)&generation;
4012         data.dsize = sizeof(generation);
4013
4014         /* commit all the changes */
4015         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4016                                         nodes, 0,
4017                                         TIMELIMIT(), false, data,
4018                                         NULL, NULL,
4019                                         NULL) != 0) {
4020                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4021                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4022                 talloc_free(tmp_ctx);
4023                 return -1;
4024         }
4025
4026         /* thaw all nodes */
4027         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4028         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4029                                         nodes, 0,
4030                                         TIMELIMIT(),
4031                                         false, tdb_null,
4032                                         NULL, NULL,
4033                                         NULL) != 0) {
4034                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4035                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4036                 talloc_free(tmp_ctx);
4037                 return -1;
4038         }
4039
4040         talloc_free(tmp_ctx);
4041         return 0;
4042 }
4043
4044 /*
4045  * set flags of a node in the nodemap
4046  */
4047 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4048 {
4049         int ret;
4050         int32_t status;
4051         int node;
4052         int flags;
4053         TDB_DATA data;
4054         struct ctdb_node_flag_change c;
4055
4056         if (argc != 2) {
4057                 usage();
4058                 return -1;
4059         }
4060
4061         if (sscanf(argv[0], "%d", &node) != 1) {
4062                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4063                 usage();
4064                 return -1;
4065         }
4066         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4067                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4068                 usage();
4069                 return -1;
4070         }
4071
4072         c.pnn       = node;
4073         c.old_flags = 0;
4074         c.new_flags = flags;
4075
4076         data.dsize = sizeof(c);
4077         data.dptr = (unsigned char *)&c;
4078
4079         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4080                            data, NULL, NULL, &status, NULL, NULL);
4081         if (ret != 0 || status != 0) {
4082                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4083                 return -1;
4084         }
4085         return 0;
4086 }
4087
4088 /*
4089   dump memory usage
4090  */
4091 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4092 {
4093         TDB_DATA data;
4094         int ret;
4095         int32_t res;
4096         char *errmsg;
4097         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4098         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4099                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4100         if (ret != 0 || res != 0) {
4101                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4102                 talloc_free(tmp_ctx);
4103                 return -1;
4104         }
4105         write(1, data.dptr, data.dsize);
4106         talloc_free(tmp_ctx);
4107         return 0;
4108 }
4109
4110 /*
4111   handler for memory dumps
4112 */
4113 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4114                              TDB_DATA data, void *private_data)
4115 {
4116         write(1, data.dptr, data.dsize);
4117         exit(0);
4118 }
4119
4120 /*
4121   dump memory usage on the recovery daemon
4122  */
4123 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4124 {
4125         int ret;
4126         TDB_DATA data;
4127         struct rd_memdump_reply rd;
4128
4129         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4130         if (rd.pnn == -1) {
4131                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4132                 return -1;
4133         }
4134         rd.srvid = getpid();
4135
4136         /* register a message port for receiveing the reply so that we
4137            can receive the reply
4138         */
4139         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4140
4141
4142         data.dptr = (uint8_t *)&rd;
4143         data.dsize = sizeof(rd);
4144
4145         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4146         if (ret != 0) {
4147                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4148                 return -1;
4149         }
4150
4151         /* this loop will terminate when we have received the reply */
4152         while (1) {     
4153                 event_loop_once(ctdb->ev);
4154         }
4155
4156         return 0;
4157 }
4158
4159 /*
4160   send a message to a srvid
4161  */
4162 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4163 {
4164         unsigned long srvid;
4165         int ret;
4166         TDB_DATA data;
4167
4168         if (argc < 2) {
4169                 usage();
4170         }
4171
4172         srvid      = strtoul(argv[0], NULL, 0);
4173
4174         data.dptr = (uint8_t *)discard_const(argv[1]);
4175         data.dsize= strlen(argv[1]);
4176
4177         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4178         if (ret != 0) {
4179                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4180                 return -1;
4181         }
4182
4183         return 0;
4184 }
4185
4186 /*
4187   handler for msglisten
4188 */
4189 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4190                              TDB_DATA data, void *private_data)
4191 {
4192         int i;
4193
4194         printf("Message received: ");
4195         for (i=0;i<data.dsize;i++) {
4196                 printf("%c", data.dptr[i]);
4197         }
4198         printf("\n");
4199 }
4200
4201 /*
4202   listen for messages on a messageport
4203  */
4204 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4205 {
4206         uint64_t srvid;
4207
4208         srvid = getpid();
4209
4210         /* register a message port and listen for messages
4211         */
4212         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4213         printf("Listening for messages on srvid:%d\n", (int)srvid);
4214
4215         while (1) {     
4216                 event_loop_once(ctdb->ev);
4217         }
4218
4219         return 0;
4220 }
4221
4222 /*
4223   list all nodes in the cluster
4224   if the daemon is running, we read the data from the daemon.
4225   if the daemon is not running we parse the nodes file directly
4226  */
4227 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4228 {
4229         int i, ret;
4230         struct ctdb_node_map *nodemap=NULL;
4231
4232         if (ctdb != NULL) {
4233                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4234                 if (ret != 0) {
4235                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4236                         return ret;
4237                 }
4238
4239                 for(i=0;i<nodemap->num;i++){
4240                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4241                                 continue;
4242                         }
4243                         if (options.machinereadable){
4244                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4245                         } else {
4246                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4247                         }
4248                 }
4249         } else {
4250                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4251                 struct pnn_node *pnn_nodes;
4252                 struct pnn_node *pnn_node;
4253         
4254                 pnn_nodes = read_nodes_file(mem_ctx);
4255                 if (pnn_nodes == NULL) {
4256                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4257                         talloc_free(mem_ctx);
4258                         return -1;
4259                 }
4260
4261                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4262                         ctdb_sock_addr addr;
4263
4264                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4265                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4266                                 talloc_free(mem_ctx);
4267                                 return -1;
4268                         }
4269
4270                         if (options.machinereadable){
4271                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4272                         } else {
4273                                 printf("%s\n", pnn_node->addr);
4274                         }
4275                 }
4276                 talloc_free(mem_ctx);
4277         }
4278
4279         return 0;
4280 }
4281
4282 /*
4283   reload the nodes file on the local node
4284  */
4285 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4286 {
4287         int i, ret;
4288         int mypnn;
4289         struct ctdb_node_map *nodemap=NULL;
4290
4291         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4292         if (mypnn == -1) {
4293                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4294                 return -1;
4295         }
4296
4297         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4298         if (ret != 0) {
4299                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4300                 return ret;
4301         }
4302
4303         /* reload the nodes file on all remote nodes */
4304         for (i=0;i<nodemap->num;i++) {
4305                 if (nodemap->nodes[i].pnn == mypnn) {
4306                         continue;
4307                 }
4308                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4309                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4310                         nodemap->nodes[i].pnn);
4311                 if (ret != 0) {
4312                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4313                 }
4314         }
4315
4316         /* reload the nodes file on the local node */
4317         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4318         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4319         if (ret != 0) {
4320                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4321         }
4322
4323         /* initiate a recovery */
4324         control_recover(ctdb, argc, argv);
4325
4326         return 0;
4327 }
4328
4329
4330 static const struct {
4331         const char *name;
4332         int (*fn)(struct ctdb_context *, int, const char **);
4333         bool auto_all;
4334         bool without_daemon; /* can be run without daemon running ? */
4335         const char *msg;
4336         const char *args;
4337 } ctdb_commands[] = {
4338 #ifdef CTDB_VERS
4339         { "version",         control_version,           true,   false,  "show version of ctdb" },
4340 #endif
4341         { "status",          control_status,            true,   false,  "show node status" },
4342         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4343         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4344         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4345         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4346         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4347         { "statistics",      control_statistics,        false,  false, "show statistics" },
4348         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4349         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4350         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4351         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4352         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4353         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4354         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4355         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4356         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4357         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4358         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4359         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4360         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4361         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4362         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4363         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4364         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4365         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4366         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4367         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4368         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4369         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4370         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4371         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4372         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4373         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4374         { "stop",            control_stop,              true,   false,  "stop a node" },
4375         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4376         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4377         { "unban",           control_unban,             true,   false,  "unban a node" },
4378         { "showban",         control_showban,           true,   false,  "show ban information"},
4379         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4380         { "recover",         control_recover,           true,   false,  "force recovery" },
4381         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4382         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4383         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4384         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4385         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4386         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4387         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4388
4389         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4390         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4391         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4392         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4393         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4394         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4395         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4396         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4397         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4398         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4399         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4400         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4401         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4402         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4403         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4404         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4405         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4406         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4407         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4408         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4409         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4410         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4411         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4412         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4413         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4414         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4415         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4416         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4417         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4418         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4419         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4420         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4421 };
4422
4423 /*
4424   show usage message
4425  */
4426 static void usage(void)
4427 {
4428         int i;
4429         printf(
4430 "Usage: ctdb [options] <control>\n" \
4431 "Options:\n" \
4432 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4433 "   -Y                 generate machinereadable output\n"
4434 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4435         printf("Controls:\n");
4436         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4437                 printf("  %-15s %-27s  %s\n", 
4438                        ctdb_commands[i].name, 
4439                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4440                        ctdb_commands[i].msg);
4441         }
4442         exit(1);
4443 }
4444
4445
4446 static void ctdb_alarm(int sig)
4447 {
4448         printf("Maximum runtime exceeded - exiting\n");
4449         _exit(ERR_TIMEOUT);
4450 }
4451
4452 /*
4453   main program
4454 */
4455 int main(int argc, const char *argv[])
4456 {
4457         struct ctdb_context *ctdb;
4458         char *nodestring = NULL;
4459         struct poptOption popt_options[] = {
4460                 POPT_AUTOHELP
4461                 POPT_CTDB_CMDLINE
4462                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4463                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4464                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4465                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4466                 POPT_TABLEEND
4467         };
4468         int opt;
4469         const char **extra_argv;
4470         int extra_argc = 0;
4471         int ret=-1, i;
4472         poptContext pc;
4473         struct event_context *ev;
4474         const char *control;
4475
4476         setlinebuf(stdout);
4477         
4478         /* set some defaults */
4479         options.maxruntime = 0;
4480         options.timelimit = 3;
4481         options.pnn = CTDB_CURRENT_NODE;
4482
4483         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4484
4485         while ((opt = poptGetNextOpt(pc)) != -1) {
4486                 switch (opt) {
4487                 default:
4488                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4489                                 poptBadOption(pc, 0), poptStrerror(opt)));
4490                         exit(1);
4491                 }
4492         }
4493
4494         /* setup the remaining options for the main program to use */
4495         extra_argv = poptGetArgs(pc);
4496         if (extra_argv) {
4497                 extra_argv++;
4498                 while (extra_argv[extra_argc]) extra_argc++;
4499         }
4500
4501         if (extra_argc < 1) {
4502                 usage();
4503         }
4504
4505         if (options.maxruntime == 0) {
4506                 const char *ctdb_timeout;
4507                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4508                 if (ctdb_timeout != NULL) {
4509                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4510                 } else {
4511                         /* default timeout is 120 seconds */
4512                         options.maxruntime = 120;
4513                 }
4514         }
4515
4516         signal(SIGALRM, ctdb_alarm);
4517         alarm(options.maxruntime);
4518
4519         /* setup the node number to contact */
4520         if (nodestring != NULL) {
4521                 if (strcmp(nodestring, "all") == 0) {
4522                         options.pnn = CTDB_BROADCAST_ALL;
4523                 } else {
4524                         options.pnn = strtoul(nodestring, NULL, 0);
4525                 }
4526         }
4527
4528         control = extra_argv[0];
4529
4530         ev = event_context_init(NULL);
4531         if (!ev) {
4532                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
4533                 exit(1);
4534         }
4535
4536         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4537                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4538                         int j;
4539
4540                         if (ctdb_commands[i].without_daemon == true) {
4541                                 close(2);
4542                         }
4543
4544                         /* initialise ctdb */
4545                         ctdb = ctdb_cmdline_client(ev);
4546
4547                         if (ctdb_commands[i].without_daemon == false) {
4548                                 const char *socket_name;
4549
4550                                 if (ctdb == NULL) {
4551                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4552                                         exit(1);
4553                                 }
4554
4555                                 /* initialize a libctdb connection as well */
4556                                 socket_name = ctdb_get_socketname(ctdb);
4557                                 ctdb_connection = ctdb_connect(socket_name,
4558                                                        ctdb_log_file, stderr);
4559                                 if (ctdb_connection == NULL) {
4560                                         fprintf(stderr, "Failed to connect to daemon from libctdb\n");
4561                                         exit(1);
4562                                 }                               
4563                         
4564                                 /* verify the node exists */
4565                                 verify_node(ctdb);
4566
4567                                 if (options.pnn == CTDB_CURRENT_NODE) {
4568                                         int pnn;
4569                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4570                                         if (pnn == -1) {
4571                                                 return -1;
4572                                         }
4573                                         options.pnn = pnn;
4574                                 }
4575                         }
4576
4577                         if (ctdb_commands[i].auto_all && 
4578                             options.pnn == CTDB_BROADCAST_ALL) {
4579                                 uint32_t *nodes;
4580                                 uint32_t num_nodes;
4581                                 ret = 0;
4582
4583                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4584                                 CTDB_NO_MEMORY(ctdb, nodes);
4585         
4586                                 for (j=0;j<num_nodes;j++) {
4587                                         options.pnn = nodes[j];
4588                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4589                                 }
4590                                 talloc_free(nodes);
4591                         } else {
4592                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4593                         }
4594                         break;
4595                 }
4596         }
4597
4598         if (i == ARRAY_SIZE(ctdb_commands)) {
4599                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4600                 exit(1);
4601         }
4602
4603         return ret;
4604 }