The recent change to the recovery daemon to keep track of and
[rusty/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
49
50 #ifdef CTDB_VERS
51 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
52 {
53 #define STR(x) #x
54 #define XSTR(x) STR(x)
55         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
56         return 0;
57 }
58 #endif
59
60
61 /*
62   verify that a node exists and is reachable
63  */
64 static void verify_node(struct ctdb_context *ctdb)
65 {
66         int ret;
67         struct ctdb_node_map *nodemap=NULL;
68
69         if (options.pnn == CTDB_CURRENT_NODE) {
70                 return;
71         }
72         if (options.pnn == CTDB_BROADCAST_ALL) {
73                 return;
74         }
75
76         /* verify the node exists */
77         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
78                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
79                 exit(10);
80         }
81         if (options.pnn >= nodemap->num) {
82                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
83                 exit(ERR_NONODE);
84         }
85         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
86                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
87                 exit(ERR_DISNODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93
94         /* verify we can access the node */
95         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
96         if (ret == -1) {
97                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
98                 exit(10);
99         }
100 }
101
102 /*
103  check if a database exists
104 */
105 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
106 {
107         int i, ret;
108         struct ctdb_dbid_map *dbmap=NULL;
109
110         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
111         if (ret != 0) {
112                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
113                 return -1;
114         }
115
116         for(i=0;i<dbmap->num;i++){
117                 const char *name;
118
119                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
120                 if (!strcmp(name, db_name)) {
121                         return 0;
122                 }
123         }
124
125         return -1;
126 }
127
128 /*
129   see if a process exists
130  */
131 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
132 {
133         uint32_t pnn, pid;
134         int ret;
135         if (argc < 1) {
136                 usage();
137         }
138
139         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
140                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
141                 return -1;
142         }
143
144         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
145         if (ret == 0) {
146                 printf("%u:%u exists\n", pnn, pid);
147         } else {
148                 printf("%u:%u does not exist\n", pnn, pid);
149         }
150         return ret;
151 }
152
153 /*
154   display statistics structure
155  */
156 static void show_statistics(struct ctdb_statistics *s)
157 {
158         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
159         int i;
160         const char *prefix=NULL;
161         int preflen=0;
162         const struct {
163                 const char *name;
164                 uint32_t offset;
165         } fields[] = {
166 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
167                 STATISTICS_FIELD(num_clients),
168                 STATISTICS_FIELD(frozen),
169                 STATISTICS_FIELD(recovering),
170                 STATISTICS_FIELD(client_packets_sent),
171                 STATISTICS_FIELD(client_packets_recv),
172                 STATISTICS_FIELD(node_packets_sent),
173                 STATISTICS_FIELD(node_packets_recv),
174                 STATISTICS_FIELD(keepalive_packets_sent),
175                 STATISTICS_FIELD(keepalive_packets_recv),
176                 STATISTICS_FIELD(node.req_call),
177                 STATISTICS_FIELD(node.reply_call),
178                 STATISTICS_FIELD(node.req_dmaster),
179                 STATISTICS_FIELD(node.reply_dmaster),
180                 STATISTICS_FIELD(node.reply_error),
181                 STATISTICS_FIELD(node.req_message),
182                 STATISTICS_FIELD(node.req_control),
183                 STATISTICS_FIELD(node.reply_control),
184                 STATISTICS_FIELD(client.req_call),
185                 STATISTICS_FIELD(client.req_message),
186                 STATISTICS_FIELD(client.req_control),
187                 STATISTICS_FIELD(timeouts.call),
188                 STATISTICS_FIELD(timeouts.control),
189                 STATISTICS_FIELD(timeouts.traverse),
190                 STATISTICS_FIELD(total_calls),
191                 STATISTICS_FIELD(pending_calls),
192                 STATISTICS_FIELD(lockwait_calls),
193                 STATISTICS_FIELD(pending_lockwait_calls),
194                 STATISTICS_FIELD(childwrite_calls),
195                 STATISTICS_FIELD(pending_childwrite_calls),
196                 STATISTICS_FIELD(memory_used),
197                 STATISTICS_FIELD(max_hop_count),
198         };
199         printf("CTDB version %u\n", CTDB_VERSION);
200         for (i=0;i<ARRAY_SIZE(fields);i++) {
201                 if (strchr(fields[i].name, '.')) {
202                         preflen = strcspn(fields[i].name, ".")+1;
203                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
204                                 prefix = fields[i].name;
205                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
206                         }
207                 } else {
208                         preflen = 0;
209                 }
210                 printf(" %*s%-22s%*s%10u\n", 
211                        preflen?4:0, "",
212                        fields[i].name+preflen, 
213                        preflen?0:4, "",
214                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
215         }
216         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
217         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
218
219         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
220         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
221         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
222         talloc_free(tmp_ctx);
223 }
224
225 /*
226   display remote ctdb statistics combined from all nodes
227  */
228 static int control_statistics_all(struct ctdb_context *ctdb)
229 {
230         int ret, i;
231         struct ctdb_statistics statistics;
232         uint32_t *nodes;
233         uint32_t num_nodes;
234
235         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
236         CTDB_NO_MEMORY(ctdb, nodes);
237         
238         ZERO_STRUCT(statistics);
239
240         for (i=0;i<num_nodes;i++) {
241                 struct ctdb_statistics s1;
242                 int j;
243                 uint32_t *v1 = (uint32_t *)&s1;
244                 uint32_t *v2 = (uint32_t *)&statistics;
245                 uint32_t num_ints = 
246                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
247                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
248                 if (ret != 0) {
249                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
250                         return ret;
251                 }
252                 for (j=0;j<num_ints;j++) {
253                         v2[j] += v1[j];
254                 }
255                 statistics.max_hop_count = 
256                         MAX(statistics.max_hop_count, s1.max_hop_count);
257                 statistics.max_call_latency = 
258                         MAX(statistics.max_call_latency, s1.max_call_latency);
259                 statistics.max_lockwait_latency = 
260                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
261         }
262         talloc_free(nodes);
263         printf("Gathered statistics for %u nodes\n", num_nodes);
264         show_statistics(&statistics);
265         return 0;
266 }
267
268 /*
269   display remote ctdb statistics
270  */
271 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
272 {
273         int ret;
274         struct ctdb_statistics statistics;
275
276         if (options.pnn == CTDB_BROADCAST_ALL) {
277                 return control_statistics_all(ctdb);
278         }
279
280         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
281         if (ret != 0) {
282                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
283                 return ret;
284         }
285         show_statistics(&statistics);
286         return 0;
287 }
288
289
290 /*
291   reset remote ctdb statistics
292  */
293 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
294 {
295         int ret;
296
297         ret = ctdb_statistics_reset(ctdb, options.pnn);
298         if (ret != 0) {
299                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
300                 return ret;
301         }
302         return 0;
303 }
304
305
306 /*
307   display uptime of remote node
308  */
309 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
310 {
311         int ret;
312         struct ctdb_uptime *uptime = NULL;
313         int tmp, days, hours, minutes, seconds;
314
315         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
316         if (ret != 0) {
317                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
318                 return ret;
319         }
320
321         if (options.machinereadable){
322                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
323                 printf(":%u:%u:%u:%lf\n",
324                         (unsigned int)uptime->current_time.tv_sec,
325                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
326                         (unsigned int)uptime->last_recovery_finished.tv_sec,
327                         timeval_delta(&uptime->last_recovery_finished,
328                                       &uptime->last_recovery_started)
329                 );
330                 return 0;
331         }
332
333         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
334
335         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
336         seconds = tmp%60;
337         tmp    /= 60;
338         minutes = tmp%60;
339         tmp    /= 60;
340         hours   = tmp%24;
341         tmp    /= 24;
342         days    = tmp;
343         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
344
345         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
346         seconds = tmp%60;
347         tmp    /= 60;
348         minutes = tmp%60;
349         tmp    /= 60;
350         hours   = tmp%24;
351         tmp    /= 24;
352         days    = tmp;
353         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
354         
355         printf("Duration of last recovery/failover: %lf seconds\n",
356                 timeval_delta(&uptime->last_recovery_finished,
357                               &uptime->last_recovery_started));
358
359         return 0;
360 }
361
362 /*
363   show the PNN of the current node
364  */
365 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
366 {
367         int mypnn;
368
369         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
370         if (mypnn == -1) {
371                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
372                 return -1;
373         }
374
375         printf("PNN:%d\n", mypnn);
376         return 0;
377 }
378
379
380 struct pnn_node {
381         struct pnn_node *next;
382         const char *addr;
383         int pnn;
384 };
385
386 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
387 {
388         const char *nodes_list;
389         int nlines;
390         char **lines;
391         int i, pnn;
392         struct pnn_node *pnn_nodes = NULL;
393         struct pnn_node *pnn_node;
394         struct pnn_node *tmp_node;
395
396         /* read the nodes file */
397         nodes_list = getenv("CTDB_NODES");
398         if (nodes_list == NULL) {
399                 nodes_list = "/etc/ctdb/nodes";
400         }
401         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
402         if (lines == NULL) {
403                 return NULL;
404         }
405         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
406                 nlines--;
407         }
408         for (i=0, pnn=0; i<nlines; i++) {
409                 char *node;
410
411                 node = lines[i];
412                 /* strip leading spaces */
413                 while((*node == ' ') || (*node == '\t')) {
414                         node++;
415                 }
416                 if (*node == '#') {
417                         pnn++;
418                         continue;
419                 }
420                 if (strcmp(node, "") == 0) {
421                         continue;
422                 }
423                 pnn_node = talloc(mem_ctx, struct pnn_node);
424                 pnn_node->pnn = pnn++;
425                 pnn_node->addr = talloc_strdup(pnn_node, node);
426                 pnn_node->next = pnn_nodes;
427                 pnn_nodes = pnn_node;
428         }
429
430         /* swap them around so we return them in incrementing order */
431         pnn_node = pnn_nodes;
432         pnn_nodes = NULL;
433         while (pnn_node) {
434                 tmp_node = pnn_node;
435                 pnn_node = pnn_node->next;
436
437                 tmp_node->next = pnn_nodes;
438                 pnn_nodes = tmp_node;
439         }
440
441         return pnn_nodes;
442 }
443
444 /*
445   show the PNN of the current node
446   discover the pnn by loading the nodes file and try to bind to all
447   addresses one at a time until the ip address is found.
448  */
449 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
450 {
451         TALLOC_CTX *mem_ctx = talloc_new(NULL);
452         struct pnn_node *pnn_nodes;
453         struct pnn_node *pnn_node;
454
455         pnn_nodes = read_nodes_file(mem_ctx);
456         if (pnn_nodes == NULL) {
457                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
458                 talloc_free(mem_ctx);
459                 return -1;
460         }
461
462         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
463                 ctdb_sock_addr addr;
464
465                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
466                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
467                         talloc_free(mem_ctx);
468                         return -1;
469                 }
470
471                 if (ctdb_sys_have_ip(&addr)) {
472                         printf("PNN:%d\n", pnn_node->pnn);
473                         talloc_free(mem_ctx);
474                         return 0;
475                 }
476         }
477
478         printf("Failed to detect which PNN this node is\n");
479         talloc_free(mem_ctx);
480         return -1;
481 }
482
483 /*
484   display remote ctdb status
485  */
486 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
487 {
488         int i, ret;
489         struct ctdb_vnn_map *vnnmap=NULL;
490         struct ctdb_node_map *nodemap=NULL;
491         uint32_t recmode, recmaster;
492         int mypnn;
493
494         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
495         if (mypnn == -1) {
496                 return -1;
497         }
498
499         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
500         if (ret != 0) {
501                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
502                 return ret;
503         }
504
505         if(options.machinereadable){
506                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
507                 for(i=0;i<nodemap->num;i++){
508                         int partially_online = 0;
509                         int j;
510
511                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
512                                 continue;
513                         }
514                         if (nodemap->nodes[i].flags == 0) {
515                                 struct ctdb_control_get_ifaces *ifaces;
516
517                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
518                                                            nodemap->nodes[i].pnn,
519                                                            ctdb, &ifaces);
520                                 if (ret == 0) {
521                                         for (j=0; j < ifaces->num; j++) {
522                                                 if (ifaces->ifaces[j].link_state != 0) {
523                                                         continue;
524                                                 }
525                                                 partially_online = 1;
526                                                 break;
527                                         }
528                                         talloc_free(ifaces);
529                                 }
530                         }
531                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
532                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
533                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
538                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
539                                partially_online);
540                 }
541                 return 0;
542         }
543
544         printf("Number of nodes:%d\n", nodemap->num);
545         for(i=0;i<nodemap->num;i++){
546                 static const struct {
547                         uint32_t flag;
548                         const char *name;
549                 } flag_names[] = {
550                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
551                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
552                         { NODE_FLAGS_BANNED,                "BANNED" },
553                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
554                         { NODE_FLAGS_DELETED,               "DELETED" },
555                         { NODE_FLAGS_STOPPED,               "STOPPED" },
556                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
557                 };
558                 char *flags_str = NULL;
559                 int j;
560
561                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
562                         continue;
563                 }
564                 if (nodemap->nodes[i].flags == 0) {
565                         struct ctdb_control_get_ifaces *ifaces;
566
567                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
568                                                    nodemap->nodes[i].pnn,
569                                                    ctdb, &ifaces);
570                         if (ret == 0) {
571                                 for (j=0; j < ifaces->num; j++) {
572                                         if (ifaces->ifaces[j].link_state != 0) {
573                                                 continue;
574                                         }
575                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
576                                         break;
577                                 }
578                                 talloc_free(ifaces);
579                         }
580                 }
581                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
582                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
583                                 if (flags_str == NULL) {
584                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
585                                 } else {
586                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
587                                                                            flag_names[j].name);
588                                 }
589                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
590                         }
591                 }
592                 if (flags_str == NULL) {
593                         flags_str = talloc_strdup(ctdb, "OK");
594                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
595                 }
596                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
597                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
598                        flags_str,
599                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
600                 talloc_free(flags_str);
601         }
602
603         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
604         if (ret != 0) {
605                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
606                 return ret;
607         }
608         if (vnnmap->generation == INVALID_GENERATION) {
609                 printf("Generation:INVALID\n");
610         } else {
611                 printf("Generation:%d\n",vnnmap->generation);
612         }
613         printf("Size:%d\n",vnnmap->size);
614         for(i=0;i<vnnmap->size;i++){
615                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
616         }
617
618         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
619         if (ret != 0) {
620                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
621                 return ret;
622         }
623         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
624
625         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
626         if (ret != 0) {
627                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
628                 return ret;
629         }
630         printf("Recovery master:%d\n",recmaster);
631
632         return 0;
633 }
634
635
636 struct natgw_node {
637         struct natgw_node *next;
638         const char *addr;
639 };
640
641 /*
642   display the list of nodes belonging to this natgw configuration
643  */
644 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
645 {
646         int i, ret;
647         const char *natgw_list;
648         int nlines;
649         char **lines;
650         struct natgw_node *natgw_nodes = NULL;
651         struct natgw_node *natgw_node;
652         struct ctdb_node_map *nodemap=NULL;
653
654
655         /* read the natgw nodes file into a linked list */
656         natgw_list = getenv("NATGW_NODES");
657         if (natgw_list == NULL) {
658                 natgw_list = "/etc/ctdb/natgw_nodes";
659         }
660         lines = file_lines_load(natgw_list, &nlines, ctdb);
661         if (lines == NULL) {
662                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
663                 return -1;
664         }
665         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
666                 nlines--;
667         }
668         for (i=0;i<nlines;i++) {
669                 char *node;
670
671                 node = lines[i];
672                 /* strip leading spaces */
673                 while((*node == ' ') || (*node == '\t')) {
674                         node++;
675                 }
676                 if (*node == '#') {
677                         continue;
678                 }
679                 if (strcmp(node, "") == 0) {
680                         continue;
681                 }
682                 natgw_node = talloc(ctdb, struct natgw_node);
683                 natgw_node->addr = talloc_strdup(natgw_node, node);
684                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
685                 natgw_node->next = natgw_nodes;
686                 natgw_nodes = natgw_node;
687         }
688
689         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
690         if (ret != 0) {
691                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
692                 return ret;
693         }
694
695         i=0;
696         while(i<nodemap->num) {
697                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
698                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
699                                 break;
700                         }
701                 }
702
703                 /* this node was not in the natgw so we just remove it from
704                  * the list
705                  */
706                 if ((natgw_node == NULL) 
707                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
708                         int j;
709
710                         for (j=i+1; j<nodemap->num; j++) {
711                                 nodemap->nodes[j-1] = nodemap->nodes[j];
712                         }
713                         nodemap->num--;
714                         continue;
715                 }
716
717                 i++;
718         }               
719
720         /* pick a node to be natgwmaster
721          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
722          */
723         for(i=0;i<nodemap->num;i++){
724                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
725                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
726                         break;
727                 }
728         }
729         /* we couldnt find any healthy node, try unhealthy ones */
730         if (i == nodemap->num) {
731                 for(i=0;i<nodemap->num;i++){
732                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
733                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
734                                 break;
735                         }
736                 }
737         }
738         /* unless all nodes are STOPPED, when we pick one anyway */
739         if (i == nodemap->num) {
740                 for(i=0;i<nodemap->num;i++){
741                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
742                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
743                                 break;
744                         }
745                 }
746                 /* or if we still can not find any */
747                 if (i == nodemap->num) {
748                         printf("-1 0.0.0.0\n");
749                 }
750         }
751
752         /* print the pruned list of nodes belonging to this natgw list */
753         for(i=0;i<nodemap->num;i++){
754                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
758                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
759                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
760                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
761                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
762                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
763                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
764         }
765
766         return 0;
767 }
768
769 /*
770   display the status of the scripts for monitoring (or other events)
771  */
772 static int control_one_scriptstatus(struct ctdb_context *ctdb,
773                                     enum ctdb_eventscript_call type)
774 {
775         struct ctdb_scripts_wire *script_status;
776         int ret, i;
777
778         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
779         if (ret != 0) {
780                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
781                 return ret;
782         }
783
784         if (script_status == NULL) {
785                 if (!options.machinereadable) {
786                         printf("%s cycle never run\n",
787                                ctdb_eventscript_call_names[type]);
788                 }
789                 return 0;
790         }
791
792         if (!options.machinereadable) {
793                 printf("%d scripts were executed last %s cycle\n",
794                        script_status->num_scripts,
795                        ctdb_eventscript_call_names[type]);
796         }
797         for (i=0; i<script_status->num_scripts; i++) {
798                 const char *status = NULL;
799
800                 switch (script_status->scripts[i].status) {
801                 case -ETIME:
802                         status = "TIMEDOUT";
803                         break;
804                 case -ENOEXEC:
805                         status = "DISABLED";
806                         break;
807                 case 0:
808                         status = "OK";
809                         break;
810                 default:
811                         if (script_status->scripts[i].status > 0)
812                                 status = "ERROR";
813                         break;
814                 }
815                 if (options.machinereadable) {
816                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
817                                ctdb_eventscript_call_names[type],
818                                script_status->scripts[i].name,
819                                script_status->scripts[i].status,
820                                status,
821                                (long)script_status->scripts[i].start.tv_sec,
822                                (long)script_status->scripts[i].start.tv_usec,
823                                (long)script_status->scripts[i].finished.tv_sec,
824                                (long)script_status->scripts[i].finished.tv_usec,
825                                script_status->scripts[i].output);
826                         continue;
827                 }
828                 if (status)
829                         printf("%-20s Status:%s    ",
830                                script_status->scripts[i].name, status);
831                 else
832                         /* Some other error, eg from stat. */
833                         printf("%-20s Status:CANNOT RUN (%s)",
834                                script_status->scripts[i].name,
835                                strerror(-script_status->scripts[i].status));
836
837                 if (script_status->scripts[i].status >= 0) {
838                         printf("Duration:%.3lf ",
839                         timeval_delta(&script_status->scripts[i].finished,
840                               &script_status->scripts[i].start));
841                 }
842                 if (script_status->scripts[i].status != -ENOEXEC) {
843                         printf("%s",
844                                ctime(&script_status->scripts[i].start.tv_sec));
845                         if (script_status->scripts[i].status != 0) {
846                                 printf("   OUTPUT:%s\n",
847                                        script_status->scripts[i].output);
848                         }
849                 } else {
850                         printf("\n");
851                 }
852         }
853         return 0;
854 }
855
856
857 static int control_scriptstatus(struct ctdb_context *ctdb,
858                                 int argc, const char **argv)
859 {
860         int ret;
861         enum ctdb_eventscript_call type, min, max;
862         const char *arg;
863
864         if (argc > 1) {
865                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
866                 return -1;
867         }
868
869         if (argc == 0)
870                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
871         else
872                 arg = argv[0];
873
874         for (type = 0; type < CTDB_EVENT_MAX; type++) {
875                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
876                         min = type;
877                         max = type+1;
878                         break;
879                 }
880         }
881         if (type == CTDB_EVENT_MAX) {
882                 if (strcmp(arg, "all") == 0) {
883                         min = 0;
884                         max = CTDB_EVENT_MAX;
885                 } else {
886                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
887                         return -1;
888                 }
889         }
890
891         if (options.machinereadable) {
892                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
893         }
894
895         for (type = min; type < max; type++) {
896                 ret = control_one_scriptstatus(ctdb, type);
897                 if (ret != 0) {
898                         return ret;
899                 }
900         }
901
902         return 0;
903 }
904
905 /*
906   enable an eventscript
907  */
908 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
909 {
910         int ret;
911
912         if (argc < 1) {
913                 usage();
914         }
915
916         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
917         if (ret != 0) {
918           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
919                 return ret;
920         }
921
922         return 0;
923 }
924
925 /*
926   disable an eventscript
927  */
928 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
929 {
930         int ret;
931
932         if (argc < 1) {
933                 usage();
934         }
935
936         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
937         if (ret != 0) {
938           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
939                 return ret;
940         }
941
942         return 0;
943 }
944
945 /*
946   display the pnn of the recovery master
947  */
948 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
949 {
950         int ret;
951         uint32_t recmaster;
952
953         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
954         if (ret != 0) {
955                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
956                 return ret;
957         }
958         printf("%d\n",recmaster);
959
960         return 0;
961 }
962
963 /*
964   get a list of all tickles for this pnn
965  */
966 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
967 {
968         struct ctdb_control_tcp_tickle_list *list;
969         ctdb_sock_addr addr;
970         int i, ret;
971
972         if (argc < 1) {
973                 usage();
974         }
975
976         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
977                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
978                 return -1;
979         }
980
981         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
982         if (ret == -1) {
983                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
984                 return -1;
985         }
986
987         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
988         printf("Num tickles:%u\n", list->tickles.num);
989         for (i=0;i<list->tickles.num;i++) {
990                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
991                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
992         }
993
994         talloc_free(list);
995         
996         return 0;
997 }
998
999
1000 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1001 {
1002         struct ctdb_all_public_ips *ips;
1003         struct ctdb_public_ip ip;
1004         int i, ret;
1005         uint32_t *nodes;
1006         uint32_t disable_time;
1007         TDB_DATA data;
1008         struct ctdb_node_map *nodemap=NULL;
1009         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1010
1011         disable_time = 30;
1012         data.dptr  = (uint8_t*)&disable_time;
1013         data.dsize = sizeof(disable_time);
1014         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1017                 return -1;
1018         }
1019
1020
1021
1022         /* read the public ip list from the node */
1023         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         for (i=0;i<ips->num;i++) {
1031                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1032                         break;
1033                 }
1034         }
1035         if (i==ips->num) {
1036                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1037                         pnn, ctdb_addr_to_str(addr)));
1038                 talloc_free(tmp_ctx);
1039                 return -1;
1040         }
1041
1042         ip.pnn  = pnn;
1043         ip.addr = *addr;
1044
1045         data.dptr  = (uint8_t *)&ip;
1046         data.dsize = sizeof(ip);
1047
1048         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1049         if (ret != 0) {
1050                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1051                 talloc_free(tmp_ctx);
1052                 return ret;
1053         }
1054
1055         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1056         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1057                                         nodes, 0,
1058                                         LONGTIMELIMIT(),
1059                                         false, data,
1060                                         NULL, NULL,
1061                                         NULL);
1062         if (ret != 0) {
1063                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1064                 talloc_free(tmp_ctx);
1065                 return -1;
1066         }
1067
1068         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1071                 talloc_free(tmp_ctx);
1072                 return -1;
1073         }
1074
1075         /* update the recovery daemon so it now knows to expect the new
1076            node assignment for this ip.
1077         */
1078         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1079         if (ret != 0) {
1080                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1081                 return -1;
1082         }
1083
1084         talloc_free(tmp_ctx);
1085         return 0;
1086 }
1087
1088 /*
1089   move/failover an ip address to a specific node
1090  */
1091 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1092 {
1093         uint32_t pnn;
1094         ctdb_sock_addr addr;
1095
1096         if (argc < 2) {
1097                 usage();
1098                 return -1;
1099         }
1100
1101         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1102                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1103                 return -1;
1104         }
1105
1106
1107         if (sscanf(argv[1], "%u", &pnn) != 1) {
1108                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1109                 return -1;
1110         }
1111
1112         if (move_ip(ctdb, &addr, pnn) != 0) {
1113                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1114                 return -1;
1115         }
1116
1117         return 0;
1118 }
1119
1120 void getips_store_callback(void *param, void *data)
1121 {
1122         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1123         struct ctdb_all_public_ips *ips = param;
1124         int i;
1125
1126         i = ips->num++;
1127         ips->ips[i].pnn  = node_ip->pnn;
1128         ips->ips[i].addr = node_ip->addr;
1129 }
1130
1131 void getips_count_callback(void *param, void *data)
1132 {
1133         uint32_t *count = param;
1134
1135         (*count)++;
1136 }
1137
1138 #define IP_KEYLEN       4
1139 static uint32_t *ip_key(ctdb_sock_addr *ip)
1140 {
1141         static uint32_t key[IP_KEYLEN];
1142
1143         bzero(key, sizeof(key));
1144
1145         switch (ip->sa.sa_family) {
1146         case AF_INET:
1147                 key[0]  = ip->ip.sin_addr.s_addr;
1148                 break;
1149         case AF_INET6:
1150                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1151                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1152                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1153                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1154                 break;
1155         default:
1156                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1157                 return key;
1158         }
1159
1160         return key;
1161 }
1162
1163 static void *add_ip_callback(void *parm, void *data)
1164 {
1165         return parm;
1166 }
1167
1168 static int
1169 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1170 {
1171         struct ctdb_all_public_ips *tmp_ips;
1172         struct ctdb_node_map *nodemap=NULL;
1173         trbt_tree_t *ip_tree;
1174         int i, j, len, ret;
1175         uint32_t count;
1176
1177         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1178         if (ret != 0) {
1179                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1180                 return ret;
1181         }
1182
1183         ip_tree = trbt_create(tmp_ctx, 0);
1184
1185         for(i=0;i<nodemap->num;i++){
1186                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1187                         continue;
1188                 }
1189                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1190                         continue;
1191                 }
1192
1193                 /* read the public ip list from this node */
1194                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1195                 if (ret != 0) {
1196                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1197                         return -1;
1198                 }
1199         
1200                 for (j=0; j<tmp_ips->num;j++) {
1201                         struct ctdb_public_ip *node_ip;
1202
1203                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1204                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1205                         node_ip->addr = tmp_ips->ips[j].addr;
1206
1207                         trbt_insertarray32_callback(ip_tree,
1208                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1209                                 add_ip_callback,
1210                                 node_ip);
1211                 }
1212                 talloc_free(tmp_ips);
1213         }
1214
1215         /* traverse */
1216         count = 0;
1217         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1218
1219         len = offsetof(struct ctdb_all_public_ips, ips) + 
1220                 count*sizeof(struct ctdb_public_ip);
1221         tmp_ips = talloc_zero_size(tmp_ctx, len);
1222         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1223
1224         *ips = tmp_ips;
1225
1226         return 0;
1227 }
1228
1229
1230 /* 
1231  * scans all other nodes and returns a pnn for another node that can host this 
1232  * ip address or -1
1233  */
1234 static int
1235 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1236 {
1237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1238         struct ctdb_all_public_ips *ips;
1239         struct ctdb_node_map *nodemap=NULL;
1240         int i, j, ret;
1241
1242         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1243         if (ret != 0) {
1244                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1245                 talloc_free(tmp_ctx);
1246                 return ret;
1247         }
1248
1249         for(i=0;i<nodemap->num;i++){
1250                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1251                         continue;
1252                 }
1253                 if (nodemap->nodes[i].pnn == options.pnn) {
1254                         continue;
1255                 }
1256
1257                 /* read the public ip list from this node */
1258                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1259                 if (ret != 0) {
1260                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1261                         return -1;
1262                 }
1263
1264                 for (j=0;j<ips->num;j++) {
1265                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1266                                 talloc_free(tmp_ctx);
1267                                 return nodemap->nodes[i].pnn;
1268                         }
1269                 }
1270                 talloc_free(ips);
1271         }
1272
1273         talloc_free(tmp_ctx);
1274         return -1;
1275 }
1276
1277 /*
1278   add a public ip address to a node
1279  */
1280 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1281 {
1282         int i, ret;
1283         int len;
1284         uint32_t pnn;
1285         unsigned mask;
1286         ctdb_sock_addr addr;
1287         struct ctdb_control_ip_iface *pub;
1288         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1289         struct ctdb_all_public_ips *ips;
1290
1291
1292         if (argc != 2) {
1293                 talloc_free(tmp_ctx);
1294                 usage();
1295         }
1296
1297         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1298                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1299                 talloc_free(tmp_ctx);
1300                 return -1;
1301         }
1302
1303         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1304         if (ret != 0) {
1305                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1306                 talloc_free(tmp_ctx);
1307                 return ret;
1308         }
1309
1310
1311         /* check if some other node is already serving this ip, if not,
1312          * we will claim it
1313          */
1314         for (i=0;i<ips->num;i++) {
1315                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1316                         break;
1317                 }
1318         }
1319
1320         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1321         pub = talloc_size(tmp_ctx, len); 
1322         CTDB_NO_MEMORY(ctdb, pub);
1323
1324         pub->addr  = addr;
1325         pub->mask  = mask;
1326         pub->len   = strlen(argv[1])+1;
1327         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1328
1329         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1330         if (ret != 0) {
1331                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1332                 talloc_free(tmp_ctx);
1333                 return ret;
1334         }
1335
1336         if (i == ips->num) {
1337                 /* no one has this ip so we claim it */
1338                 pnn  = options.pnn;
1339         } else {
1340                 pnn  = ips->ips[i].pnn;
1341         }
1342
1343         if (move_ip(ctdb, &addr, pnn) != 0) {
1344                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1345                 return -1;
1346         }
1347
1348         talloc_free(tmp_ctx);
1349         return 0;
1350 }
1351
1352 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1353
1354 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1355 {
1356         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1357         struct ctdb_node_map *nodemap=NULL;
1358         struct ctdb_all_public_ips *ips;
1359         int ret, i, j;
1360
1361         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1362         if (ret != 0) {
1363                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1364                 return ret;
1365         }
1366
1367         /* remove it from the nodes that are not hosting the ip currently */
1368         for(i=0;i<nodemap->num;i++){
1369                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1370                         continue;
1371                 }
1372                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1373                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1374                         continue;
1375                 }
1376
1377                 for (j=0;j<ips->num;j++) {
1378                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1379                                 break;
1380                         }
1381                 }
1382                 if (j==ips->num) {
1383                         continue;
1384                 }
1385
1386                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1387                         continue;
1388                 }
1389
1390                 options.pnn = nodemap->nodes[i].pnn;
1391                 control_delip(ctdb, argc, argv);
1392         }
1393
1394
1395         /* remove it from every node (also the one hosting it) */
1396         for(i=0;i<nodemap->num;i++){
1397                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1398                         continue;
1399                 }
1400                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1401                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1402                         continue;
1403                 }
1404
1405                 for (j=0;j<ips->num;j++) {
1406                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1407                                 break;
1408                         }
1409                 }
1410                 if (j==ips->num) {
1411                         continue;
1412                 }
1413
1414                 options.pnn = nodemap->nodes[i].pnn;
1415                 control_delip(ctdb, argc, argv);
1416         }
1417
1418         talloc_free(tmp_ctx);
1419         return 0;
1420 }
1421         
1422 /*
1423   delete a public ip address from a node
1424  */
1425 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1426 {
1427         int i, ret;
1428         ctdb_sock_addr addr;
1429         struct ctdb_control_ip_iface pub;
1430         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1431         struct ctdb_all_public_ips *ips;
1432
1433         if (argc != 1) {
1434                 talloc_free(tmp_ctx);
1435                 usage();
1436         }
1437
1438         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1439                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1440                 return -1;
1441         }
1442
1443         if (options.pnn == CTDB_BROADCAST_ALL) {
1444                 return control_delip_all(ctdb, argc, argv, &addr);
1445         }
1446
1447         pub.addr  = addr;
1448         pub.mask  = 0;
1449         pub.len   = 0;
1450
1451         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1452         if (ret != 0) {
1453                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1454                 talloc_free(tmp_ctx);
1455                 return ret;
1456         }
1457         
1458         for (i=0;i<ips->num;i++) {
1459                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1460                         break;
1461                 }
1462         }
1463
1464         if (i==ips->num) {
1465                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1466                         ctdb_addr_to_str(&addr)));
1467                 talloc_free(tmp_ctx);
1468                 return -1;
1469         }
1470
1471         if (ips->ips[i].pnn == options.pnn) {
1472                 ret = find_other_host_for_public_ip(ctdb, &addr);
1473                 if (ret != -1) {
1474                         if (move_ip(ctdb, &addr, ret) != 0) {
1475                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1476                                 return -1;
1477                         }
1478                 }
1479         }
1480
1481         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1482         if (ret != 0) {
1483                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1484                 talloc_free(tmp_ctx);
1485                 return ret;
1486         }
1487
1488         talloc_free(tmp_ctx);
1489         return 0;
1490 }
1491
1492 /*
1493   kill a tcp connection
1494  */
1495 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1496 {
1497         int ret;
1498         struct ctdb_control_killtcp killtcp;
1499
1500         if (argc < 2) {
1501                 usage();
1502         }
1503
1504         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1505                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1506                 return -1;
1507         }
1508
1509         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1510                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1511                 return -1;
1512         }
1513
1514         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1515         if (ret != 0) {
1516                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1517                 return ret;
1518         }
1519
1520         return 0;
1521 }
1522
1523
1524 /*
1525   send a gratious arp
1526  */
1527 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1528 {
1529         int ret;
1530         ctdb_sock_addr addr;
1531
1532         if (argc < 2) {
1533                 usage();
1534         }
1535
1536         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1537                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1538                 return -1;
1539         }
1540
1541         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1542         if (ret != 0) {
1543                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1544                 return ret;
1545         }
1546
1547         return 0;
1548 }
1549
1550 /*
1551   register a server id
1552  */
1553 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1554 {
1555         int ret;
1556         struct ctdb_server_id server_id;
1557
1558         if (argc < 3) {
1559                 usage();
1560         }
1561
1562         server_id.pnn       = strtoul(argv[0], NULL, 0);
1563         server_id.type      = strtoul(argv[1], NULL, 0);
1564         server_id.server_id = strtoul(argv[2], NULL, 0);
1565
1566         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1569                 return ret;
1570         }
1571         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1572         sleep(999);
1573         return -1;
1574 }
1575
1576 /*
1577   unregister a server id
1578  */
1579 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1580 {
1581         int ret;
1582         struct ctdb_server_id server_id;
1583
1584         if (argc < 3) {
1585                 usage();
1586         }
1587
1588         server_id.pnn       = strtoul(argv[0], NULL, 0);
1589         server_id.type      = strtoul(argv[1], NULL, 0);
1590         server_id.server_id = strtoul(argv[2], NULL, 0);
1591
1592         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1593         if (ret != 0) {
1594                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1595                 return ret;
1596         }
1597         return -1;
1598 }
1599
1600 /*
1601   check if a server id exists
1602  */
1603 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1604 {
1605         uint32_t status;
1606         int ret;
1607         struct ctdb_server_id server_id;
1608
1609         if (argc < 3) {
1610                 usage();
1611         }
1612
1613         server_id.pnn       = strtoul(argv[0], NULL, 0);
1614         server_id.type      = strtoul(argv[1], NULL, 0);
1615         server_id.server_id = strtoul(argv[2], NULL, 0);
1616
1617         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1618         if (ret != 0) {
1619                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1620                 return ret;
1621         }
1622
1623         if (status) {
1624                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1625         } else {
1626                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1627         }
1628         return 0;
1629 }
1630
1631 /*
1632   get a list of all server ids that are registered on a node
1633  */
1634 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1635 {
1636         int i, ret;
1637         struct ctdb_server_id_list *server_ids;
1638
1639         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1640         if (ret != 0) {
1641                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1642                 return ret;
1643         }
1644
1645         for (i=0; i<server_ids->num; i++) {
1646                 printf("Server id %d:%d:%d\n", 
1647                         server_ids->server_ids[i].pnn, 
1648                         server_ids->server_ids[i].type, 
1649                         server_ids->server_ids[i].server_id); 
1650         }
1651
1652         return -1;
1653 }
1654
1655 /*
1656   send a tcp tickle ack
1657  */
1658 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1659 {
1660         int ret;
1661         ctdb_sock_addr  src, dst;
1662
1663         if (argc < 2) {
1664                 usage();
1665         }
1666
1667         if (!parse_ip_port(argv[0], &src)) {
1668                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1669                 return -1;
1670         }
1671
1672         if (!parse_ip_port(argv[1], &dst)) {
1673                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1674                 return -1;
1675         }
1676
1677         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1678         if (ret==0) {
1679                 return 0;
1680         }
1681         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1682
1683         return -1;
1684 }
1685
1686
1687 /*
1688   display public ip status
1689  */
1690 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1691 {
1692         int i, ret;
1693         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1694         struct ctdb_all_public_ips *ips;
1695
1696         if (options.pnn == CTDB_BROADCAST_ALL) {
1697                 /* read the list of public ips from all nodes */
1698                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1699         } else {
1700                 /* read the public ip list from this node */
1701                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1702         }
1703         if (ret != 0) {
1704                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1705                 talloc_free(tmp_ctx);
1706                 return ret;
1707         }
1708
1709         if (options.machinereadable){
1710                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1711         } else {
1712                 if (options.pnn == CTDB_BROADCAST_ALL) {
1713                         printf("Public IPs on ALL nodes\n");
1714                 } else {
1715                         printf("Public IPs on node %u\n", options.pnn);
1716                 }
1717         }
1718
1719         for (i=1;i<=ips->num;i++) {
1720                 struct ctdb_control_public_ip_info *info = NULL;
1721                 int32_t pnn;
1722                 char *aciface = NULL;
1723                 char *avifaces = NULL;
1724                 char *cifaces = NULL;
1725
1726                 if (options.pnn == CTDB_BROADCAST_ALL) {
1727                         pnn = ips->ips[ips->num-i].pnn;
1728                 } else {
1729                         pnn = options.pnn;
1730                 }
1731
1732                 if (pnn != -1) {
1733                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1734                                                    &ips->ips[ips->num-i].addr, &info);
1735                 } else {
1736                         ret = -1;
1737                 }
1738
1739                 if (ret == 0) {
1740                         int j;
1741                         for (j=0; j < info->num; j++) {
1742                                 if (cifaces == NULL) {
1743                                         cifaces = talloc_strdup(info,
1744                                                                 info->ifaces[j].name);
1745                                 } else {
1746                                         cifaces = talloc_asprintf_append(cifaces,
1747                                                                          ",%s",
1748                                                                          info->ifaces[j].name);
1749                                 }
1750
1751                                 if (info->active_idx == j) {
1752                                         aciface = info->ifaces[j].name;
1753                                 }
1754
1755                                 if (info->ifaces[j].link_state == 0) {
1756                                         continue;
1757                                 }
1758
1759                                 if (avifaces == NULL) {
1760                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1761                                 } else {
1762                                         avifaces = talloc_asprintf_append(avifaces,
1763                                                                           ",%s",
1764                                                                           info->ifaces[j].name);
1765                                 }
1766                         }
1767                 }
1768
1769                 if (options.machinereadable){
1770                         printf(":%s:%d:%s:%s:%s:\n",
1771                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1772                                ips->ips[ips->num-i].pnn,
1773                                aciface?aciface:"",
1774                                avifaces?avifaces:"",
1775                                cifaces?cifaces:"");
1776                 } else {
1777                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1778                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1779                                ips->ips[ips->num-i].pnn,
1780                                aciface?aciface:"",
1781                                avifaces?avifaces:"",
1782                                cifaces?cifaces:"");
1783                 }
1784                 talloc_free(info);
1785         }
1786
1787         talloc_free(tmp_ctx);
1788         return 0;
1789 }
1790
1791 /*
1792   public ip info
1793  */
1794 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1795 {
1796         int i, ret;
1797         ctdb_sock_addr addr;
1798         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1799         struct ctdb_control_public_ip_info *info;
1800
1801         if (argc != 1) {
1802                 talloc_free(tmp_ctx);
1803                 usage();
1804         }
1805
1806         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1807                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1808                 return -1;
1809         }
1810
1811         /* read the public ip info from this node */
1812         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1813                                            tmp_ctx, &addr, &info);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1816                                   argv[0], options.pnn));
1817                 talloc_free(tmp_ctx);
1818                 return ret;
1819         }
1820
1821         printf("Public IP[%s] info on node %u\n",
1822                ctdb_addr_to_str(&info->ip.addr),
1823                options.pnn);
1824
1825         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1826                ctdb_addr_to_str(&info->ip.addr),
1827                info->ip.pnn, info->num);
1828
1829         for (i=0; i<info->num; i++) {
1830                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1831
1832                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1833                        i+1, info->ifaces[i].name,
1834                        info->ifaces[i].link_state?"up":"down",
1835                        (unsigned int)info->ifaces[i].references,
1836                        (i==info->active_idx)?" (active)":"");
1837         }
1838
1839         talloc_free(tmp_ctx);
1840         return 0;
1841 }
1842
1843 /*
1844   display interfaces status
1845  */
1846 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1847 {
1848         int i, ret;
1849         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1850         struct ctdb_control_get_ifaces *ifaces;
1851
1852         /* read the public ip list from this node */
1853         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1854                                    tmp_ctx, &ifaces);
1855         if (ret != 0) {
1856                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1857                                   options.pnn));
1858                 talloc_free(tmp_ctx);
1859                 return ret;
1860         }
1861
1862         if (options.machinereadable){
1863                 printf(":Name:LinkStatus:References:\n");
1864         } else {
1865                 printf("Interfaces on node %u\n", options.pnn);
1866         }
1867
1868         for (i=0; i<ifaces->num; i++) {
1869                 if (options.machinereadable){
1870                         printf(":%s:%s:%u\n",
1871                                ifaces->ifaces[i].name,
1872                                ifaces->ifaces[i].link_state?"1":"0",
1873                                (unsigned int)ifaces->ifaces[i].references);
1874                 } else {
1875                         printf("name:%s link:%s references:%u\n",
1876                                ifaces->ifaces[i].name,
1877                                ifaces->ifaces[i].link_state?"up":"down",
1878                                (unsigned int)ifaces->ifaces[i].references);
1879                 }
1880         }
1881
1882         talloc_free(tmp_ctx);
1883         return 0;
1884 }
1885
1886
1887 /*
1888   set link status of an interface
1889  */
1890 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1891 {
1892         int ret;
1893         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1894         struct ctdb_control_iface_info info;
1895
1896         ZERO_STRUCT(info);
1897
1898         if (argc != 2) {
1899                 usage();
1900         }
1901
1902         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1903                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1904                                   argv[0]));
1905                 talloc_free(tmp_ctx);
1906                 return -1;
1907         }
1908         strcpy(info.name, argv[0]);
1909
1910         if (strcmp(argv[1], "up") == 0) {
1911                 info.link_state = 1;
1912         } else if (strcmp(argv[1], "down") == 0) {
1913                 info.link_state = 0;
1914         } else {
1915                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1916                                   argv[1]));
1917                 talloc_free(tmp_ctx);
1918                 return -1;
1919         }
1920
1921         /* read the public ip list from this node */
1922         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1923                                    tmp_ctx, &info);
1924         if (ret != 0) {
1925                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1926                                   argv[0], options.pnn));
1927                 talloc_free(tmp_ctx);
1928                 return ret;
1929         }
1930
1931         talloc_free(tmp_ctx);
1932         return 0;
1933 }
1934
1935 /*
1936   display pid of a ctdb daemon
1937  */
1938 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1939 {
1940         uint32_t pid;
1941         int ret;
1942
1943         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1946                 return ret;
1947         }
1948         printf("Pid:%d\n", pid);
1949
1950         return 0;
1951 }
1952
1953 static uint32_t ipreallocate_finished;
1954
1955 /*
1956   handler for receiving the response to ipreallocate
1957 */
1958 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1959                              TDB_DATA data, void *private_data)
1960 {
1961         ipreallocate_finished = 1;
1962 }
1963
1964 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1965 {
1966         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1967
1968         event_add_timed(ctdb->ev, ctdb, 
1969                                 timeval_current_ofs(1, 0),
1970                                 ctdb_every_second, ctdb);
1971 }
1972
1973 /*
1974   ask the recovery daemon on the recovery master to perform a ip reallocation
1975  */
1976 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1977 {
1978         int i, ret;
1979         TDB_DATA data;
1980         struct takeover_run_reply rd;
1981         uint32_t recmaster;
1982         struct ctdb_node_map *nodemap=NULL;
1983         int retries=0;
1984         struct timeval tv = timeval_current();
1985
1986         /* we need some events to trigger so we can timeout and restart
1987            the loop
1988         */
1989         event_add_timed(ctdb->ev, ctdb, 
1990                                 timeval_current_ofs(1, 0),
1991                                 ctdb_every_second, ctdb);
1992
1993         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1994         if (rd.pnn == -1) {
1995                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1996                 return -1;
1997         }
1998         rd.srvid = getpid();
1999
2000         /* register a message port for receiveing the reply so that we
2001            can receive the reply
2002         */
2003         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2004
2005         data.dptr = (uint8_t *)&rd;
2006         data.dsize = sizeof(rd);
2007
2008 again:
2009         if (retries>5) {
2010                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
2011                 exit(10);
2012         }
2013
2014         /* check that there are valid nodes available */
2015         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2016                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2017                 exit(10);
2018         }
2019         for (i=0; i<nodemap->num;i++) {
2020                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2021                         break;
2022                 }
2023         }
2024         if (i==nodemap->num) {
2025                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2026                 return 0;
2027         }
2028
2029
2030         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2031         if (ret != 0) {
2032                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2033                 return ret;
2034         }
2035
2036         /* verify the node exists */
2037         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2038                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2039                 exit(10);
2040         }
2041
2042
2043         /* check tha there are nodes available that can act as a recmaster */
2044         for (i=0; i<nodemap->num; i++) {
2045                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2046                         continue;
2047                 }
2048                 break;
2049         }
2050         if (i == nodemap->num) {
2051                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2052                 return 0;
2053         }
2054
2055         /* verify the recovery master is not STOPPED, nor BANNED */
2056         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2057                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2058                 retries++;
2059                 sleep(1);
2060                 goto again;
2061         } 
2062
2063         
2064         /* verify the recovery master is not STOPPED, nor BANNED */
2065         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2066                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2067                 retries++;
2068                 sleep(1);
2069                 goto again;
2070         } 
2071
2072         ipreallocate_finished = 0;
2073         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2074         if (ret != 0) {
2075                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2076                 return -1;
2077         }
2078
2079         tv = timeval_current();
2080         /* this loop will terminate when we have received the reply */
2081         while (timeval_elapsed(&tv) < 3.0) {    
2082                 event_loop_once(ctdb->ev);
2083         }
2084         if (ipreallocate_finished == 1) {
2085                 return 0;
2086         }
2087
2088         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2089         retries++;
2090         sleep(1);
2091         goto again;
2092
2093         return 0;
2094 }
2095
2096
2097 /*
2098   disable a remote node
2099  */
2100 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2101 {
2102         int ret;
2103         struct ctdb_node_map *nodemap=NULL;
2104
2105         /* check if the node is already disabled */
2106         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2107                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2108                 exit(10);
2109         }
2110         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2111                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2112                 return 0;
2113         }
2114
2115         do {
2116                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2117                 if (ret != 0) {
2118                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2119                         return ret;
2120                 }
2121
2122                 sleep(1);
2123
2124                 /* read the nodemap and verify the change took effect */
2125                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2126                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2127                         exit(10);
2128                 }
2129
2130         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2131         ret = control_ipreallocate(ctdb, argc, argv);
2132         if (ret != 0) {
2133                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2134                 return ret;
2135         }
2136
2137         return 0;
2138 }
2139
2140 /*
2141   enable a disabled remote node
2142  */
2143 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2144 {
2145         int ret;
2146
2147         struct ctdb_node_map *nodemap=NULL;
2148
2149
2150         /* check if the node is already enabled */
2151         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2152                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2153                 exit(10);
2154         }
2155         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2156                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2157                 return 0;
2158         }
2159
2160         do {
2161                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2162                 if (ret != 0) {
2163                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2164                         return ret;
2165                 }
2166
2167                 sleep(1);
2168
2169                 /* read the nodemap and verify the change took effect */
2170                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2171                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2172                         exit(10);
2173                 }
2174
2175         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2176
2177         ret = control_ipreallocate(ctdb, argc, argv);
2178         if (ret != 0) {
2179                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2180                 return ret;
2181         }
2182
2183         return 0;
2184 }
2185
2186 /*
2187   stop a remote node
2188  */
2189 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2190 {
2191         int ret;
2192         struct ctdb_node_map *nodemap=NULL;
2193
2194         do {
2195                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2196                 if (ret != 0) {
2197                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2198                 }
2199         
2200                 sleep(1);
2201
2202                 /* read the nodemap and verify the change took effect */
2203                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2204                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2205                         exit(10);
2206                 }
2207
2208         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2209         ret = control_ipreallocate(ctdb, argc, argv);
2210         if (ret != 0) {
2211                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2212                 return ret;
2213         }
2214
2215         return 0;
2216 }
2217
2218 /*
2219   restart a stopped remote node
2220  */
2221 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2222 {
2223         int ret;
2224
2225         struct ctdb_node_map *nodemap=NULL;
2226
2227         do {
2228                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2229                 if (ret != 0) {
2230                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2231                         return ret;
2232                 }
2233         
2234                 sleep(1);
2235
2236                 /* read the nodemap and verify the change took effect */
2237                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2238                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2239                         exit(10);
2240                 }
2241
2242         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2243         ret = control_ipreallocate(ctdb, argc, argv);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2246                 return ret;
2247         }
2248
2249         return 0;
2250 }
2251
2252 static uint32_t get_generation(struct ctdb_context *ctdb)
2253 {
2254         struct ctdb_vnn_map *vnnmap=NULL;
2255         int ret;
2256
2257         /* wait until the recmaster is not in recovery mode */
2258         while (1) {
2259                 uint32_t recmode, recmaster;
2260                 
2261                 if (vnnmap != NULL) {
2262                         talloc_free(vnnmap);
2263                         vnnmap = NULL;
2264                 }
2265
2266                 /* get the recmaster */
2267                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2268                 if (ret != 0) {
2269                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2270                         exit(10);
2271                 }
2272
2273                 /* get recovery mode */
2274                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2275                 if (ret != 0) {
2276                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2277                         exit(10);
2278                 }
2279
2280                 /* get the current generation number */
2281                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2282                 if (ret != 0) {
2283                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2284                         exit(10);
2285                 }
2286
2287                 if ((recmode == CTDB_RECOVERY_NORMAL)
2288                 &&  (vnnmap->generation != 1)){
2289                         return vnnmap->generation;
2290                 }
2291                 sleep(1);
2292         }
2293 }
2294
2295 /*
2296   ban a node from the cluster
2297  */
2298 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2299 {
2300         int ret;
2301         struct ctdb_node_map *nodemap=NULL;
2302         struct ctdb_ban_time bantime;
2303
2304         if (argc < 1) {
2305                 usage();
2306         }
2307         
2308         /* verify the node exists */
2309         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2310         if (ret != 0) {
2311                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2312                 return ret;
2313         }
2314
2315         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2316                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2317                 return -1;
2318         }
2319
2320         bantime.pnn  = options.pnn;
2321         bantime.time = strtoul(argv[0], NULL, 0);
2322
2323         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2324         if (ret != 0) {
2325                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2326                 return -1;
2327         }       
2328
2329         ret = control_ipreallocate(ctdb, argc, argv);
2330         if (ret != 0) {
2331                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2332                 return ret;
2333         }
2334
2335         return 0;
2336 }
2337
2338
2339 /*
2340   unban a node from the cluster
2341  */
2342 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2343 {
2344         int ret;
2345         struct ctdb_node_map *nodemap=NULL;
2346         struct ctdb_ban_time bantime;
2347
2348         /* verify the node exists */
2349         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2350         if (ret != 0) {
2351                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2352                 return ret;
2353         }
2354
2355         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2356                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2357                 return -1;
2358         }
2359
2360         bantime.pnn  = options.pnn;
2361         bantime.time = 0;
2362
2363         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2364         if (ret != 0) {
2365                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2366                 return -1;
2367         }       
2368
2369         ret = control_ipreallocate(ctdb, argc, argv);
2370         if (ret != 0) {
2371                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2372                 return ret;
2373         }
2374
2375         return 0;
2376 }
2377
2378
2379 /*
2380   show ban information for a node
2381  */
2382 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2383 {
2384         int ret;
2385         struct ctdb_node_map *nodemap=NULL;
2386         struct ctdb_ban_time *bantime;
2387
2388         /* verify the node exists */
2389         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2390         if (ret != 0) {
2391                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2392                 return ret;
2393         }
2394
2395         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2396         if (ret != 0) {
2397                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2398                 return -1;
2399         }       
2400
2401         if (bantime->time == 0) {
2402                 printf("Node %u is not banned\n", bantime->pnn);
2403         } else {
2404                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2405         }
2406
2407         return 0;
2408 }
2409
2410 /*
2411   shutdown a daemon
2412  */
2413 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2414 {
2415         int ret;
2416
2417         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2418         if (ret != 0) {
2419                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2420                 return ret;
2421         }
2422
2423         return 0;
2424 }
2425
2426 /*
2427   trigger a recovery
2428  */
2429 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2430 {
2431         int ret;
2432         uint32_t generation, next_generation;
2433
2434         /* record the current generation number */
2435         generation = get_generation(ctdb);
2436
2437         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2438         if (ret != 0) {
2439                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2440                 return ret;
2441         }
2442
2443         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2444         if (ret != 0) {
2445                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2446                 return ret;
2447         }
2448
2449         /* wait until we are in a new generation */
2450         while (1) {
2451                 next_generation = get_generation(ctdb);
2452                 if (next_generation != generation) {
2453                         return 0;
2454                 }
2455                 sleep(1);
2456         }
2457
2458         return 0;
2459 }
2460
2461
2462 /*
2463   display monitoring mode of a remote node
2464  */
2465 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2466 {
2467         uint32_t monmode;
2468         int ret;
2469
2470         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2471         if (ret != 0) {
2472                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2473                 return ret;
2474         }
2475         if (!options.machinereadable){
2476                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2477         } else {
2478                 printf(":mode:\n");
2479                 printf(":%d:\n",monmode);
2480         }
2481         return 0;
2482 }
2483
2484
2485 /*
2486   display capabilities of a remote node
2487  */
2488 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2489 {
2490         uint32_t capabilities;
2491         int ret;
2492
2493         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2494         if (ret != 0) {
2495                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2496                 return ret;
2497         }
2498         
2499         if (!options.machinereadable){
2500                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2501                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2502                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2503                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2504         } else {
2505                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2506                 printf(":%d:%d:%d:%d:\n",
2507                         !!(capabilities&CTDB_CAP_RECMASTER),
2508                         !!(capabilities&CTDB_CAP_LMASTER),
2509                         !!(capabilities&CTDB_CAP_LVS),
2510                         !!(capabilities&CTDB_CAP_NATGW));
2511         }
2512         return 0;
2513 }
2514
2515 /*
2516   display lvs configuration
2517  */
2518 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2519 {
2520         uint32_t *capabilities;
2521         struct ctdb_node_map *nodemap=NULL;
2522         int i, ret;
2523         int healthy_count = 0;
2524
2525         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2526         if (ret != 0) {
2527                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2528                 return ret;
2529         }
2530
2531         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2532         CTDB_NO_MEMORY(ctdb, capabilities);
2533         
2534         /* collect capabilities for all connected nodes */
2535         for (i=0; i<nodemap->num; i++) {
2536                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2537                         continue;
2538                 }
2539                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2540                         continue;
2541                 }
2542         
2543                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2544                 if (ret != 0) {
2545                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2546                         return ret;
2547                 }
2548
2549                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2550                         continue;
2551                 }
2552
2553                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2554                         healthy_count++;
2555                 }
2556         }
2557
2558         /* Print all LVS nodes */
2559         for (i=0; i<nodemap->num; i++) {
2560                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2561                         continue;
2562                 }
2563                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2564                         continue;
2565                 }
2566                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2567                         continue;
2568                 }
2569
2570                 if (healthy_count != 0) {
2571                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2572                                 continue;
2573                         }
2574                 }
2575
2576                 printf("%d:%s\n", i, 
2577                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2578         }
2579
2580         return 0;
2581 }
2582
2583 /*
2584   display who is the lvs master
2585  */
2586 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2587 {
2588         uint32_t *capabilities;
2589         struct ctdb_node_map *nodemap=NULL;
2590         int i, ret;
2591         int healthy_count = 0;
2592
2593         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2594         if (ret != 0) {
2595                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2596                 return ret;
2597         }
2598
2599         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2600         CTDB_NO_MEMORY(ctdb, capabilities);
2601         
2602         /* collect capabilities for all connected nodes */
2603         for (i=0; i<nodemap->num; i++) {
2604                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2605                         continue;
2606                 }
2607                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2608                         continue;
2609                 }
2610         
2611                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2612                 if (ret != 0) {
2613                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2614                         return ret;
2615                 }
2616
2617                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2618                         continue;
2619                 }
2620
2621                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2622                         healthy_count++;
2623                 }
2624         }
2625
2626         /* find and show the lvsmaster */
2627         for (i=0; i<nodemap->num; i++) {
2628                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2629                         continue;
2630                 }
2631                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2632                         continue;
2633                 }
2634                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2635                         continue;
2636                 }
2637
2638                 if (healthy_count != 0) {
2639                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2640                                 continue;
2641                         }
2642                 }
2643
2644                 if (options.machinereadable){
2645                         printf("%d\n", i);
2646                 } else {
2647                         printf("Node %d is LVS master\n", i);
2648                 }
2649                 return 0;
2650         }
2651
2652         printf("There is no LVS master\n");
2653         return -1;
2654 }
2655
2656 /*
2657   disable monitoring on a  node
2658  */
2659 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2660 {
2661         
2662         int ret;
2663
2664         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2665         if (ret != 0) {
2666                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2667                 return ret;
2668         }
2669         printf("Monitoring mode:%s\n","DISABLED");
2670
2671         return 0;
2672 }
2673
2674 /*
2675   enable monitoring on a  node
2676  */
2677 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2678 {
2679         
2680         int ret;
2681
2682         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2683         if (ret != 0) {
2684                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2685                 return ret;
2686         }
2687         printf("Monitoring mode:%s\n","ACTIVE");
2688
2689         return 0;
2690 }
2691
2692 /*
2693   display remote list of keys/data for a db
2694  */
2695 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2696 {
2697         const char *db_name;
2698         struct ctdb_db_context *ctdb_db;
2699         int ret;
2700
2701         if (argc < 1) {
2702                 usage();
2703         }
2704
2705         db_name = argv[0];
2706
2707
2708         if (db_exists(ctdb, db_name)) {
2709                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2710                 return -1;
2711         }
2712
2713         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2714
2715         if (ctdb_db == NULL) {
2716                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2717                 return -1;
2718         }
2719
2720         /* traverse and dump the cluster tdb */
2721         ret = ctdb_dump_db(ctdb_db, stdout);
2722         if (ret == -1) {
2723                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2724                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2725                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2726                                   db_name));
2727                 return -1;
2728         }
2729         talloc_free(ctdb_db);
2730
2731         printf("Dumped %d records\n", ret);
2732         return 0;
2733 }
2734
2735
2736 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2737                              TDB_DATA data, void *private_data)
2738 {
2739         DEBUG(DEBUG_ERR,("Log data received\n"));
2740         if (data.dsize > 0) {
2741                 printf("%s", data.dptr);
2742         }
2743
2744         exit(0);
2745 }
2746
2747 /*
2748   display a list of log messages from the in memory ringbuffer
2749  */
2750 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2751 {
2752         int ret;
2753         int32_t res;
2754         struct ctdb_get_log_addr log_addr;
2755         TDB_DATA data;
2756         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2757         char *errmsg;
2758         struct timeval tv;
2759
2760         if (argc != 1) {
2761                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2762                 talloc_free(tmp_ctx);
2763                 return -1;
2764         }
2765
2766         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2767         log_addr.srvid = getpid();
2768         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2769                 log_addr.level = get_debug_by_desc(argv[0]);
2770         } else {
2771                 log_addr.level = strtol(argv[0], NULL, 0);
2772         }
2773
2774
2775         data.dptr = (unsigned char *)&log_addr;
2776         data.dsize = sizeof(log_addr);
2777
2778         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2779
2780         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2781         sleep(1);
2782
2783         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2784
2785         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2786                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2787         if (ret != 0 || res != 0) {
2788                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2789                 talloc_free(tmp_ctx);
2790                 return -1;
2791         }
2792
2793
2794         tv = timeval_current();
2795         /* this loop will terminate when we have received the reply */
2796         while (timeval_elapsed(&tv) < 3.0) {    
2797                 event_loop_once(ctdb->ev);
2798         }
2799
2800         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2801
2802         talloc_free(tmp_ctx);
2803         return 0;
2804 }
2805
2806 /*
2807   clear the in memory log area
2808  */
2809 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2810 {
2811         int ret;
2812         int32_t res;
2813         char *errmsg;
2814         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2815
2816         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2817                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2818         if (ret != 0 || res != 0) {
2819                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2820                 talloc_free(tmp_ctx);
2821                 return -1;
2822         }
2823
2824         talloc_free(tmp_ctx);
2825         return 0;
2826 }
2827
2828
2829
2830 /*
2831   display a list of the databases on a remote ctdb
2832  */
2833 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2834 {
2835         int i, ret;
2836         struct ctdb_dbid_map *dbmap=NULL;
2837
2838         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2839         if (ret != 0) {
2840                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2841                 return ret;
2842         }
2843
2844         if(options.machinereadable){
2845                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2846                 for(i=0;i<dbmap->num;i++){
2847                         const char *path;
2848                         const char *name;
2849                         const char *health;
2850                         bool persistent;
2851
2852                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2853                                             dbmap->dbs[i].dbid, ctdb, &path);
2854                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2855                                             dbmap->dbs[i].dbid, ctdb, &name);
2856                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2857                                               dbmap->dbs[i].dbid, ctdb, &health);
2858                         persistent = dbmap->dbs[i].persistent;
2859                         printf(":0x%08X:%s:%s:%d:%d:\n",
2860                                dbmap->dbs[i].dbid, name, path,
2861                                !!(persistent), !!(health));
2862                 }
2863                 return 0;
2864         }
2865
2866         printf("Number of databases:%d\n", dbmap->num);
2867         for(i=0;i<dbmap->num;i++){
2868                 const char *path;
2869                 const char *name;
2870                 const char *health;
2871                 bool persistent;
2872
2873                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2874                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2875                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2876                 persistent = dbmap->dbs[i].persistent;
2877                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2878                        dbmap->dbs[i].dbid, name, path,
2879                        persistent?" PERSISTENT":"",
2880                        health?" UNHEALTHY":"");
2881         }
2882
2883         return 0;
2884 }
2885
2886 /*
2887   display the status of a database on a remote ctdb
2888  */
2889 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2890 {
2891         int i, ret;
2892         struct ctdb_dbid_map *dbmap=NULL;
2893         const char *db_name;
2894
2895         if (argc < 1) {
2896                 usage();
2897         }
2898
2899         db_name = argv[0];
2900
2901         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2902         if (ret != 0) {
2903                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2904                 return ret;
2905         }
2906
2907         for(i=0;i<dbmap->num;i++){
2908                 const char *path;
2909                 const char *name;
2910                 const char *health;
2911                 bool persistent;
2912
2913                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2914                 if (strcmp(name, db_name) != 0) {
2915                         continue;
2916                 }
2917
2918                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2919                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2920                 persistent = dbmap->dbs[i].persistent;
2921                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2922                        dbmap->dbs[i].dbid, name, path,
2923                        persistent?"yes":"no",
2924                        health?health:"OK");
2925                 return 0;
2926         }
2927
2928         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2929         return 0;
2930 }
2931
2932 /*
2933   check if the local node is recmaster or not
2934   it will return 1 if this node is the recmaster and 0 if it is not
2935   or if the local ctdb daemon could not be contacted
2936  */
2937 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2938 {
2939         uint32_t mypnn, recmaster;
2940         int ret;
2941
2942         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2943         if (mypnn == -1) {
2944                 printf("Failed to get pnn of node\n");
2945                 return 1;
2946         }
2947
2948         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2949         if (ret != 0) {
2950                 printf("Failed to get the recmaster\n");
2951                 return 1;
2952         }
2953
2954         if (recmaster != mypnn) {
2955                 printf("this node is not the recmaster\n");
2956                 return 1;
2957         }
2958
2959         printf("this node is the recmaster\n");
2960         return 0;
2961 }
2962
2963 /*
2964   ping a node
2965  */
2966 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2967 {
2968         int ret;
2969         struct timeval tv = timeval_current();
2970         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2971         if (ret == -1) {
2972                 printf("Unable to get ping response from node %u\n", options.pnn);
2973                 return -1;
2974         } else {
2975                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2976                        options.pnn, timeval_elapsed(&tv), ret);
2977         }
2978         return 0;
2979 }
2980
2981
2982 /*
2983   get a tunable
2984  */
2985 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2986 {
2987         const char *name;
2988         uint32_t value;
2989         int ret;
2990
2991         if (argc < 1) {
2992                 usage();
2993         }
2994
2995         name = argv[0];
2996         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2997         if (ret == -1) {
2998                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2999                 return -1;
3000         }
3001
3002         printf("%-19s = %u\n", name, value);
3003         return 0;
3004 }
3005
3006 /*
3007   set a tunable
3008  */
3009 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3010 {
3011         const char *name;
3012         uint32_t value;
3013         int ret;
3014
3015         if (argc < 2) {
3016                 usage();
3017         }
3018
3019         name = argv[0];
3020         value = strtoul(argv[1], NULL, 0);
3021
3022         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3023         if (ret == -1) {
3024                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3025                 return -1;
3026         }
3027         return 0;
3028 }
3029
3030 /*
3031   list all tunables
3032  */
3033 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3034 {
3035         uint32_t count;
3036         const char **list;
3037         int ret, i;
3038
3039         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3040         if (ret == -1) {
3041                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3042                 return -1;
3043         }
3044
3045         for (i=0;i<count;i++) {
3046                 control_getvar(ctdb, 1, &list[i]);
3047         }
3048
3049         talloc_free(list);
3050         
3051         return 0;
3052 }
3053
3054 /*
3055   display debug level on a node
3056  */
3057 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3058 {
3059         int ret;
3060         int32_t level;
3061
3062         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3063         if (ret != 0) {
3064                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3065                 return ret;
3066         } else {
3067                 if (options.machinereadable){
3068                         printf(":Name:Level:\n");
3069                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3070                 } else {
3071                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3072                 }
3073         }
3074         return 0;
3075 }
3076
3077 /*
3078   display reclock file of a node
3079  */
3080 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3081 {
3082         int ret;
3083         const char *reclock;
3084
3085         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3086         if (ret != 0) {
3087                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3088                 return ret;
3089         } else {
3090                 if (options.machinereadable){
3091                         if (reclock != NULL) {
3092                                 printf("%s", reclock);
3093                         }
3094                 } else {
3095                         if (reclock == NULL) {
3096                                 printf("No reclock file used.\n");
3097                         } else {
3098                                 printf("Reclock file:%s\n", reclock);
3099                         }
3100                 }
3101         }
3102         return 0;
3103 }
3104
3105 /*
3106   set the reclock file of a node
3107  */
3108 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3109 {
3110         int ret;
3111         const char *reclock;
3112
3113         if (argc == 0) {
3114                 reclock = NULL;
3115         } else if (argc == 1) {
3116                 reclock = argv[0];
3117         } else {
3118                 usage();
3119         }
3120
3121         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3122         if (ret != 0) {
3123                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3124                 return ret;
3125         }
3126         return 0;
3127 }
3128
3129 /*
3130   set the natgw state on/off
3131  */
3132 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3133 {
3134         int ret;
3135         uint32_t natgwstate;
3136
3137         if (argc == 0) {
3138                 usage();
3139         }
3140
3141         if (!strcmp(argv[0], "on")) {
3142                 natgwstate = 1;
3143         } else if (!strcmp(argv[0], "off")) {
3144                 natgwstate = 0;
3145         } else {
3146                 usage();
3147         }
3148
3149         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3150         if (ret != 0) {
3151                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3152                 return ret;
3153         }
3154
3155         return 0;
3156 }
3157
3158 /*
3159   set the lmaster role on/off
3160  */
3161 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3162 {
3163         int ret;
3164         uint32_t lmasterrole;
3165
3166         if (argc == 0) {
3167                 usage();
3168         }
3169
3170         if (!strcmp(argv[0], "on")) {
3171                 lmasterrole = 1;
3172         } else if (!strcmp(argv[0], "off")) {
3173                 lmasterrole = 0;
3174         } else {
3175                 usage();
3176         }
3177
3178         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3179         if (ret != 0) {
3180                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3181                 return ret;
3182         }
3183
3184         return 0;
3185 }
3186
3187 /*
3188   set the recmaster role on/off
3189  */
3190 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3191 {
3192         int ret;
3193         uint32_t recmasterrole;
3194
3195         if (argc == 0) {
3196                 usage();
3197         }
3198
3199         if (!strcmp(argv[0], "on")) {
3200                 recmasterrole = 1;
3201         } else if (!strcmp(argv[0], "off")) {
3202                 recmasterrole = 0;
3203         } else {
3204                 usage();
3205         }
3206
3207         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3208         if (ret != 0) {
3209                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3210                 return ret;
3211         }
3212
3213         return 0;
3214 }
3215
3216 /*
3217   set debug level on a node or all nodes
3218  */
3219 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3220 {
3221         int i, ret;
3222         int32_t level;
3223
3224         if (argc == 0) {
3225                 printf("You must specify the debug level. Valid levels are:\n");
3226                 for (i=0; debug_levels[i].description != NULL; i++) {
3227                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3228                 }
3229
3230                 return 0;
3231         }
3232
3233         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3234                 level = get_debug_by_desc(argv[0]);
3235         } else {
3236                 level = strtol(argv[0], NULL, 0);
3237         }
3238
3239         for (i=0; debug_levels[i].description != NULL; i++) {
3240                 if (level == debug_levels[i].level) {
3241                         break;
3242                 }
3243         }
3244         if (debug_levels[i].description == NULL) {
3245                 printf("Invalid debug level, must be one of\n");
3246                 for (i=0; debug_levels[i].description != NULL; i++) {
3247                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3248                 }
3249                 return -1;
3250         }
3251
3252         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3253         if (ret != 0) {
3254                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3255         }
3256         return 0;
3257 }
3258
3259
3260 /*
3261   freeze a node
3262  */
3263 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3264 {
3265         int ret;
3266         uint32_t priority;
3267         
3268         if (argc == 1) {
3269                 priority = strtol(argv[0], NULL, 0);
3270         } else {
3271                 priority = 0;
3272         }
3273         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3274
3275         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3276         if (ret != 0) {
3277                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3278         }               
3279         return 0;
3280 }
3281
3282 /*
3283   thaw a node
3284  */
3285 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3286 {
3287         int ret;
3288         uint32_t priority;
3289         
3290         if (argc == 1) {
3291                 priority = strtol(argv[0], NULL, 0);
3292         } else {
3293                 priority = 0;
3294         }
3295         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3296
3297         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3298         if (ret != 0) {
3299                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3300         }               
3301         return 0;
3302 }
3303
3304
3305 /*
3306   attach to a database
3307  */
3308 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3309 {
3310         const char *db_name;
3311         struct ctdb_db_context *ctdb_db;
3312
3313         if (argc < 1) {
3314                 usage();
3315         }
3316         db_name = argv[0];
3317
3318         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3319         if (ctdb_db == NULL) {
3320                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3321                 return -1;
3322         }
3323
3324         return 0;
3325 }
3326
3327 /*
3328   set db priority
3329  */
3330 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3331 {
3332         struct ctdb_db_priority db_prio;
3333         int ret;
3334
3335         if (argc < 2) {
3336                 usage();
3337         }
3338
3339         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3340         db_prio.priority = strtoul(argv[1], NULL, 0);
3341
3342         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3343         if (ret != 0) {
3344                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3345                 return -1;
3346         }
3347
3348         return 0;
3349 }
3350
3351 /*
3352   get db priority
3353  */
3354 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3355 {
3356         uint32_t db_id, priority;
3357         int ret;
3358
3359         if (argc < 1) {
3360                 usage();
3361         }
3362
3363         db_id = strtoul(argv[0], NULL, 0);
3364
3365         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3366         if (ret != 0) {
3367                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3368                 return -1;
3369         }
3370
3371         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3372
3373         return 0;
3374 }
3375
3376 /*
3377   run an eventscript on a node
3378  */
3379 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3380 {
3381         TDB_DATA data;
3382         int ret;
3383         int32_t res;
3384         char *errmsg;
3385         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3386
3387         if (argc != 1) {
3388                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3389                 return -1;
3390         }
3391
3392         data.dptr = (unsigned char *)discard_const(argv[0]);
3393         data.dsize = strlen((char *)data.dptr) + 1;
3394
3395         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3396
3397         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3398                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3399         if (ret != 0 || res != 0) {
3400                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3401                 talloc_free(tmp_ctx);
3402                 return -1;
3403         }
3404         talloc_free(tmp_ctx);
3405         return 0;
3406 }
3407
3408 #define DB_VERSION 1
3409 #define MAX_DB_NAME 64
3410 struct db_file_header {
3411         unsigned long version;
3412         time_t timestamp;
3413         unsigned long persistent;
3414         unsigned long size;
3415         const char name[MAX_DB_NAME];
3416 };
3417
3418 struct backup_data {
3419         struct ctdb_marshall_buffer *records;
3420         uint32_t len;
3421         uint32_t total;
3422         bool traverse_error;
3423 };
3424
3425 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3426 {
3427         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3428         struct ctdb_rec_data *rec;
3429
3430         /* add the record */
3431         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3432         if (rec == NULL) {
3433                 bd->traverse_error = true;
3434                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3435                 return -1;
3436         }
3437         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3438         if (bd->records == NULL) {
3439                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3440                 bd->traverse_error = true;
3441                 return -1;
3442         }
3443         bd->records->count++;
3444         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3445         bd->len += rec->length;
3446         talloc_free(rec);
3447
3448         bd->total++;
3449         return 0;
3450 }
3451
3452 /*
3453  * backup a database to a file 
3454  */
3455 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3456 {
3457         int i, ret;
3458         struct ctdb_dbid_map *dbmap=NULL;
3459         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3460         struct db_file_header dbhdr;
3461         struct ctdb_db_context *ctdb_db;
3462         struct backup_data *bd;
3463         int fh = -1;
3464         int status = -1;
3465         const char *reason = NULL;
3466
3467         if (argc != 2) {
3468                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3469                 return -1;
3470         }
3471
3472         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3473         if (ret != 0) {
3474                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3475                 return ret;
3476         }
3477
3478         for(i=0;i<dbmap->num;i++){
3479                 const char *name;
3480
3481                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3482                 if(!strcmp(argv[0], name)){
3483                         talloc_free(discard_const(name));
3484                         break;
3485                 }
3486                 talloc_free(discard_const(name));
3487         }
3488         if (i == dbmap->num) {
3489                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3490                 talloc_free(tmp_ctx);
3491                 return -1;
3492         }
3493
3494         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3495                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3496         if (ret != 0) {
3497                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3498                                  argv[0]));
3499                 talloc_free(tmp_ctx);
3500                 return -1;
3501         }
3502         if (reason) {
3503                 uint32_t allow_unhealthy = 0;
3504
3505                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3506                                       "AllowUnhealthyDBRead",
3507                                       &allow_unhealthy);
3508
3509                 if (allow_unhealthy != 1) {
3510                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3511                                          argv[0], reason));
3512
3513                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3514                                          allow_unhealthy));
3515                         talloc_free(tmp_ctx);
3516                         return -1;
3517                 }
3518
3519                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3520                                      argv[0], argv[0]));
3521                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3522                                      "tunnable AllowUnhealthyDBRead = %u\n",
3523                                      allow_unhealthy));
3524         }
3525
3526         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3527         if (ctdb_db == NULL) {
3528                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3529                 talloc_free(tmp_ctx);
3530                 return -1;
3531         }
3532
3533
3534         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3535         if (ret == -1) {
3536                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3537                 talloc_free(tmp_ctx);
3538                 return -1;
3539         }
3540
3541
3542         bd = talloc_zero(tmp_ctx, struct backup_data);
3543         if (bd == NULL) {
3544                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3545                 talloc_free(tmp_ctx);
3546                 return -1;
3547         }
3548
3549         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3550         if (bd->records == NULL) {
3551                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3552                 talloc_free(tmp_ctx);
3553                 return -1;
3554         }
3555
3556         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3557         bd->records->db_id = ctdb_db->db_id;
3558         /* traverse the database collecting all records */
3559         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3560             bd->traverse_error) {
3561                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3562                 talloc_free(tmp_ctx);
3563                 return -1;              
3564         }
3565
3566         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3567
3568
3569         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3570         if (fh == -1) {
3571                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3572                 talloc_free(tmp_ctx);
3573                 return -1;
3574         }
3575
3576         dbhdr.version = DB_VERSION;
3577         dbhdr.timestamp = time(NULL);
3578         dbhdr.persistent = dbmap->dbs[i].persistent;
3579         dbhdr.size = bd->len;
3580         if (strlen(argv[0]) >= MAX_DB_NAME) {
3581                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3582                 goto done;
3583         }
3584         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3585         ret = write(fh, &dbhdr, sizeof(dbhdr));
3586         if (ret == -1) {
3587                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3588                 goto done;
3589         }
3590         ret = write(fh, bd->records, bd->len);
3591         if (ret == -1) {
3592                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3593                 goto done;
3594         }
3595
3596         status = 0;
3597 done:
3598         if (fh != -1) {
3599                 ret = close(fh);
3600                 if (ret == -1) {
3601                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3602                 }
3603         }
3604         talloc_free(tmp_ctx);
3605         return status;
3606 }
3607
3608 /*
3609  * restore a database from a file 
3610  */
3611 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3612 {
3613         int ret;
3614         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3615         TDB_DATA outdata;
3616         TDB_DATA data;
3617         struct db_file_header dbhdr;
3618         struct ctdb_db_context *ctdb_db;
3619         struct ctdb_node_map *nodemap=NULL;
3620         struct ctdb_vnn_map *vnnmap=NULL;
3621         int i, fh;
3622         struct ctdb_control_wipe_database w;
3623         uint32_t *nodes;
3624         uint32_t generation;
3625         struct tm *tm;
3626         char tbuf[100];
3627
3628         if (argc != 1) {
3629                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3630                 return -1;
3631         }
3632
3633         fh = open(argv[0], O_RDONLY);
3634         if (fh == -1) {
3635                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3636                 talloc_free(tmp_ctx);
3637                 return -1;
3638         }
3639
3640         read(fh, &dbhdr, sizeof(dbhdr));
3641         if (dbhdr.version != DB_VERSION) {
3642                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3643                 talloc_free(tmp_ctx);
3644                 return -1;
3645         }
3646
3647         outdata.dsize = dbhdr.size;
3648         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3649         if (outdata.dptr == NULL) {
3650                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3651                 close(fh);
3652                 talloc_free(tmp_ctx);
3653                 return -1;
3654         }               
3655         read(fh, outdata.dptr, outdata.dsize);
3656         close(fh);
3657
3658         tm = localtime(&dbhdr.timestamp);
3659         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3660         printf("Restoring database '%s' from backup @ %s\n",
3661                 dbhdr.name, tbuf);
3662
3663
3664         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3665         if (ctdb_db == NULL) {
3666                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3667                 talloc_free(tmp_ctx);
3668                 return -1;
3669         }
3670
3671         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3672         if (ret != 0) {
3673                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3674                 talloc_free(tmp_ctx);
3675                 return ret;
3676         }
3677
3678
3679         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3680         if (ret != 0) {
3681                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3682                 talloc_free(tmp_ctx);
3683                 return ret;
3684         }
3685
3686         /* freeze all nodes */
3687         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3688         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3689                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3690                                         nodes, i,
3691                                         TIMELIMIT(),
3692                                         false, tdb_null,
3693                                         NULL, NULL,
3694                                         NULL) != 0) {
3695                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3696                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3697                         talloc_free(tmp_ctx);
3698                         return -1;
3699                 }
3700         }
3701
3702         generation = vnnmap->generation;
3703         data.dptr = (void *)&generation;
3704         data.dsize = sizeof(generation);
3705
3706         /* start a cluster wide transaction */
3707         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3708         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3709                                         nodes, 0,
3710                                         TIMELIMIT(), false, data,
3711                                         NULL, NULL,
3712                                         NULL) != 0) {
3713                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3714                 return -1;
3715         }
3716
3717
3718         w.db_id = ctdb_db->db_id;
3719         w.transaction_id = generation;
3720
3721         data.dptr = (void *)&w;
3722         data.dsize = sizeof(w);
3723
3724         /* wipe all the remote databases. */
3725         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3726         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3727                                         nodes, 0,
3728                                         TIMELIMIT(), false, data,
3729                                         NULL, NULL,
3730                                         NULL) != 0) {
3731                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3732                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3733                 talloc_free(tmp_ctx);
3734                 return -1;
3735         }
3736         
3737         /* push the database */
3738         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3739         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3740                                         nodes, 0,
3741                                         TIMELIMIT(), false, outdata,
3742                                         NULL, NULL,
3743                                         NULL) != 0) {
3744                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3745                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3746                 talloc_free(tmp_ctx);
3747                 return -1;
3748         }
3749
3750         data.dptr = (void *)&ctdb_db->db_id;
3751         data.dsize = sizeof(ctdb_db->db_id);
3752
3753         /* mark the database as healthy */
3754         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3755         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3756                                         nodes, 0,
3757                                         TIMELIMIT(), false, data,
3758                                         NULL, NULL,
3759                                         NULL) != 0) {
3760                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3761                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3762                 talloc_free(tmp_ctx);
3763                 return -1;
3764         }
3765
3766         data.dptr = (void *)&generation;
3767         data.dsize = sizeof(generation);
3768
3769         /* commit all the changes */
3770         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3771                                         nodes, 0,
3772                                         TIMELIMIT(), false, data,
3773                                         NULL, NULL,
3774                                         NULL) != 0) {
3775                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3776                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3777                 talloc_free(tmp_ctx);
3778                 return -1;
3779         }
3780
3781
3782         /* thaw all nodes */
3783         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3784         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3785                                         nodes, 0,
3786                                         TIMELIMIT(),
3787                                         false, tdb_null,
3788                                         NULL, NULL,
3789                                         NULL) != 0) {
3790                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3791                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3792                 talloc_free(tmp_ctx);
3793                 return -1;
3794         }
3795
3796
3797         talloc_free(tmp_ctx);
3798         return 0;
3799 }
3800
3801 /*
3802  * dump a database backup from a file
3803  */
3804 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3805 {
3806         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3807         TDB_DATA outdata;
3808         struct db_file_header dbhdr;
3809         int i, fh;
3810         struct tm *tm;
3811         char tbuf[100];
3812         struct ctdb_rec_data *rec = NULL;
3813         struct ctdb_marshall_buffer *m;
3814
3815         if (argc != 1) {
3816                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3817                 return -1;
3818         }
3819
3820         fh = open(argv[0], O_RDONLY);
3821         if (fh == -1) {
3822                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3823                 talloc_free(tmp_ctx);
3824                 return -1;
3825         }
3826
3827         read(fh, &dbhdr, sizeof(dbhdr));
3828         if (dbhdr.version != DB_VERSION) {
3829                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3830                 talloc_free(tmp_ctx);
3831                 return -1;
3832         }
3833
3834         outdata.dsize = dbhdr.size;
3835         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3836         if (outdata.dptr == NULL) {
3837                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3838                 close(fh);
3839                 talloc_free(tmp_ctx);
3840                 return -1;
3841         }
3842         read(fh, outdata.dptr, outdata.dsize);
3843         close(fh);
3844         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3845
3846         tm = localtime(&dbhdr.timestamp);
3847         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3848         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3849                 dbhdr.name, m->db_id, tbuf);
3850
3851         for (i=0; i < m->count; i++) {
3852                 uint32_t reqid = 0;
3853                 TDB_DATA key, data;
3854
3855                 /* we do not want the header splitted, so we pass NULL*/
3856                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3857                                               NULL, &key, &data);
3858
3859                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3860         }
3861
3862         printf("Dumped %d records\n", i);
3863         talloc_free(tmp_ctx);
3864         return 0;
3865 }
3866
3867 /*
3868  * wipe a database from a file
3869  */
3870 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3871                           const char **argv)
3872 {
3873         int ret;
3874         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3875         TDB_DATA data;
3876         struct ctdb_db_context *ctdb_db;
3877         struct ctdb_node_map *nodemap = NULL;
3878         struct ctdb_vnn_map *vnnmap = NULL;
3879         int i;
3880         struct ctdb_control_wipe_database w;
3881         uint32_t *nodes;
3882         uint32_t generation;
3883         struct ctdb_dbid_map *dbmap = NULL;
3884
3885         if (argc != 1) {
3886                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3887                 return -1;
3888         }
3889
3890         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3891                                  &dbmap);
3892         if (ret != 0) {
3893                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3894                                   options.pnn));
3895                 return ret;
3896         }
3897
3898         for(i=0;i<dbmap->num;i++){
3899                 const char *name;
3900
3901                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3902                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3903                 if(!strcmp(argv[0], name)){
3904                         talloc_free(discard_const(name));
3905                         break;
3906                 }
3907                 talloc_free(discard_const(name));
3908         }
3909         if (i == dbmap->num) {
3910                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3911                                   argv[0]));
3912                 talloc_free(tmp_ctx);
3913                 return -1;
3914         }
3915
3916         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3917         if (ctdb_db == NULL) {
3918                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3919                                   argv[0]));
3920                 talloc_free(tmp_ctx);
3921                 return -1;
3922         }
3923
3924         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3925                                    &nodemap);
3926         if (ret != 0) {
3927                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3928                                   options.pnn));
3929                 talloc_free(tmp_ctx);
3930                 return ret;
3931         }
3932
3933         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3934                                   &vnnmap);
3935         if (ret != 0) {
3936                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3937                                   options.pnn));
3938                 talloc_free(tmp_ctx);
3939                 return ret;
3940         }
3941
3942         /* freeze all nodes */
3943         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3944         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3945                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3946                                                 nodes, i,
3947                                                 TIMELIMIT(),
3948                                                 false, tdb_null,
3949                                                 NULL, NULL,
3950                                                 NULL);
3951                 if (ret != 0) {
3952                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3953                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3954                                              CTDB_RECOVERY_ACTIVE);
3955                         talloc_free(tmp_ctx);
3956                         return -1;
3957                 }
3958         }
3959
3960         generation = vnnmap->generation;
3961         data.dptr = (void *)&generation;
3962         data.dsize = sizeof(generation);
3963
3964         /* start a cluster wide transaction */
3965         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3966         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3967                                         nodes, 0,
3968                                         TIMELIMIT(), false, data,
3969                                         NULL, NULL,
3970                                         NULL);
3971         if (ret!= 0) {
3972                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3973                                   "transactions.\n"));
3974                 return -1;
3975         }
3976
3977         w.db_id = ctdb_db->db_id;
3978         w.transaction_id = generation;
3979
3980         data.dptr = (void *)&w;
3981         data.dsize = sizeof(w);
3982
3983         /* wipe all the remote databases. */
3984         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3985         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3986                                         nodes, 0,
3987                                         TIMELIMIT(), false, data,
3988                                         NULL, NULL,
3989                                         NULL) != 0) {
3990                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3991                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3992                 talloc_free(tmp_ctx);
3993                 return -1;
3994         }
3995
3996         data.dptr = (void *)&ctdb_db->db_id;
3997         data.dsize = sizeof(ctdb_db->db_id);
3998
3999         /* mark the database as healthy */
4000         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4001         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4002                                         nodes, 0,
4003                                         TIMELIMIT(), false, data,
4004                                         NULL, NULL,
4005                                         NULL) != 0) {
4006                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4007                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4008                 talloc_free(tmp_ctx);
4009                 return -1;
4010         }
4011
4012         data.dptr = (void *)&generation;
4013         data.dsize = sizeof(generation);
4014
4015         /* commit all the changes */
4016         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4017                                         nodes, 0,
4018                                         TIMELIMIT(), false, data,
4019                                         NULL, NULL,
4020                                         NULL) != 0) {
4021                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4022                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4023                 talloc_free(tmp_ctx);
4024                 return -1;
4025         }
4026
4027         /* thaw all nodes */
4028         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4029         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4030                                         nodes, 0,
4031                                         TIMELIMIT(),
4032                                         false, tdb_null,
4033                                         NULL, NULL,
4034                                         NULL) != 0) {
4035                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4036                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4037                 talloc_free(tmp_ctx);
4038                 return -1;
4039         }
4040
4041         talloc_free(tmp_ctx);
4042         return 0;
4043 }
4044
4045 /*
4046  * set flags of a node in the nodemap
4047  */
4048 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4049 {
4050         int ret;
4051         int32_t status;
4052         int node;
4053         int flags;
4054         TDB_DATA data;
4055         struct ctdb_node_flag_change c;
4056
4057         if (argc != 2) {
4058                 usage();
4059                 return -1;
4060         }
4061
4062         if (sscanf(argv[0], "%d", &node) != 1) {
4063                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4064                 usage();
4065                 return -1;
4066         }
4067         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4068                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4069                 usage();
4070                 return -1;
4071         }
4072
4073         c.pnn       = node;
4074         c.old_flags = 0;
4075         c.new_flags = flags;
4076
4077         data.dsize = sizeof(c);
4078         data.dptr = (unsigned char *)&c;
4079
4080         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4081                            data, NULL, NULL, &status, NULL, NULL);
4082         if (ret != 0 || status != 0) {
4083                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4084                 return -1;
4085         }
4086         return 0;
4087 }
4088
4089 /*
4090   dump memory usage
4091  */
4092 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4093 {
4094         TDB_DATA data;
4095         int ret;
4096         int32_t res;
4097         char *errmsg;
4098         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4099         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4100                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4101         if (ret != 0 || res != 0) {
4102                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4103                 talloc_free(tmp_ctx);
4104                 return -1;
4105         }
4106         write(1, data.dptr, data.dsize);
4107         talloc_free(tmp_ctx);
4108         return 0;
4109 }
4110
4111 /*
4112   handler for memory dumps
4113 */
4114 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4115                              TDB_DATA data, void *private_data)
4116 {
4117         write(1, data.dptr, data.dsize);
4118         exit(0);
4119 }
4120
4121 /*
4122   dump memory usage on the recovery daemon
4123  */
4124 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4125 {
4126         int ret;
4127         TDB_DATA data;
4128         struct rd_memdump_reply rd;
4129
4130         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4131         if (rd.pnn == -1) {
4132                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4133                 return -1;
4134         }
4135         rd.srvid = getpid();
4136
4137         /* register a message port for receiveing the reply so that we
4138            can receive the reply
4139         */
4140         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4141
4142
4143         data.dptr = (uint8_t *)&rd;
4144         data.dsize = sizeof(rd);
4145
4146         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4147         if (ret != 0) {
4148                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4149                 return -1;
4150         }
4151
4152         /* this loop will terminate when we have received the reply */
4153         while (1) {     
4154                 event_loop_once(ctdb->ev);
4155         }
4156
4157         return 0;
4158 }
4159
4160 /*
4161   send a message to a srvid
4162  */
4163 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4164 {
4165         unsigned long srvid;
4166         int ret;
4167         TDB_DATA data;
4168
4169         if (argc < 2) {
4170                 usage();
4171         }
4172
4173         srvid      = strtoul(argv[0], NULL, 0);
4174
4175         data.dptr = (uint8_t *)discard_const(argv[1]);
4176         data.dsize= strlen(argv[1]);
4177
4178         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4179         if (ret != 0) {
4180                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4181                 return -1;
4182         }
4183
4184         return 0;
4185 }
4186
4187 /*
4188   handler for msglisten
4189 */
4190 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4191                              TDB_DATA data, void *private_data)
4192 {
4193         int i;
4194
4195         printf("Message received: ");
4196         for (i=0;i<data.dsize;i++) {
4197                 printf("%c", data.dptr[i]);
4198         }
4199         printf("\n");
4200 }
4201
4202 /*
4203   listen for messages on a messageport
4204  */
4205 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4206 {
4207         uint64_t srvid;
4208
4209         srvid = getpid();
4210
4211         /* register a message port and listen for messages
4212         */
4213         ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4214         printf("Listening for messages on srvid:%d\n", (int)srvid);
4215
4216         while (1) {     
4217                 event_loop_once(ctdb->ev);
4218         }
4219
4220         return 0;
4221 }
4222
4223 /*
4224   list all nodes in the cluster
4225   if the daemon is running, we read the data from the daemon.
4226   if the daemon is not running we parse the nodes file directly
4227  */
4228 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4229 {
4230         int i, ret;
4231         struct ctdb_node_map *nodemap=NULL;
4232
4233         if (ctdb != NULL) {
4234                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4235                 if (ret != 0) {
4236                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4237                         return ret;
4238                 }
4239
4240                 for(i=0;i<nodemap->num;i++){
4241                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4242                                 continue;
4243                         }
4244                         if (options.machinereadable){
4245                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4246                         } else {
4247                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4248                         }
4249                 }
4250         } else {
4251                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4252                 struct pnn_node *pnn_nodes;
4253                 struct pnn_node *pnn_node;
4254         
4255                 pnn_nodes = read_nodes_file(mem_ctx);
4256                 if (pnn_nodes == NULL) {
4257                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4258                         talloc_free(mem_ctx);
4259                         return -1;
4260                 }
4261
4262                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4263                         ctdb_sock_addr addr;
4264
4265                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4266                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4267                                 talloc_free(mem_ctx);
4268                                 return -1;
4269                         }
4270
4271                         if (options.machinereadable){
4272                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4273                         } else {
4274                                 printf("%s\n", pnn_node->addr);
4275                         }
4276                 }
4277                 talloc_free(mem_ctx);
4278         }
4279
4280         return 0;
4281 }
4282
4283 /*
4284   reload the nodes file on the local node
4285  */
4286 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4287 {
4288         int i, ret;
4289         int mypnn;
4290         struct ctdb_node_map *nodemap=NULL;
4291
4292         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4293         if (mypnn == -1) {
4294                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4295                 return -1;
4296         }
4297
4298         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4299         if (ret != 0) {
4300                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4301                 return ret;
4302         }
4303
4304         /* reload the nodes file on all remote nodes */
4305         for (i=0;i<nodemap->num;i++) {
4306                 if (nodemap->nodes[i].pnn == mypnn) {
4307                         continue;
4308                 }
4309                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4310                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4311                         nodemap->nodes[i].pnn);
4312                 if (ret != 0) {
4313                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4314                 }
4315         }
4316
4317         /* reload the nodes file on the local node */
4318         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4319         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4320         if (ret != 0) {
4321                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4322         }
4323
4324         /* initiate a recovery */
4325         control_recover(ctdb, argc, argv);
4326
4327         return 0;
4328 }
4329
4330
4331 static const struct {
4332         const char *name;
4333         int (*fn)(struct ctdb_context *, int, const char **);
4334         bool auto_all;
4335         bool without_daemon; /* can be run without daemon running ? */
4336         const char *msg;
4337         const char *args;
4338 } ctdb_commands[] = {
4339 #ifdef CTDB_VERS
4340         { "version",         control_version,           true,   false,  "show version of ctdb" },
4341 #endif
4342         { "status",          control_status,            true,   false,  "show node status" },
4343         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4344         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4345         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4346         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4347         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4348         { "statistics",      control_statistics,        false,  false, "show statistics" },
4349         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4350         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4351         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4352         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4353         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4354         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4355         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4356         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4357         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4358         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4359         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4360         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4361         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4362         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4363         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4364         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4365         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4366         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4367         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4368         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4369         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4370         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4371         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4372         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4373         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4374         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4375         { "stop",            control_stop,              true,   false,  "stop a node" },
4376         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4377         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4378         { "unban",           control_unban,             true,   false,  "unban a node" },
4379         { "showban",         control_showban,           true,   false,  "show ban information"},
4380         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4381         { "recover",         control_recover,           true,   false,  "force recovery" },
4382         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4383         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4384         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4385         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4386         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4387         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4388         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4389         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4390
4391         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4392         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4393         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4394         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4395         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4396         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4397         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4398         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4399         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4400         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4401         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4402         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4403         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4404         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4405         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4406         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4407         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4408         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4409         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4410         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4411         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4412         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4413         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4414         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4415         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4416         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4417         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4418         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4419         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4420         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4421         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4422         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4423 };
4424
4425 /*
4426   show usage message
4427  */
4428 static void usage(void)
4429 {
4430         int i;
4431         printf(
4432 "Usage: ctdb [options] <control>\n" \
4433 "Options:\n" \
4434 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4435 "   -Y                 generate machinereadable output\n"
4436 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4437         printf("Controls:\n");
4438         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4439                 printf("  %-15s %-27s  %s\n", 
4440                        ctdb_commands[i].name, 
4441                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4442                        ctdb_commands[i].msg);
4443         }
4444         exit(1);
4445 }
4446
4447
4448 static void ctdb_alarm(int sig)
4449 {
4450         printf("Maximum runtime exceeded - exiting\n");
4451         _exit(ERR_TIMEOUT);
4452 }
4453
4454 /*
4455   main program
4456 */
4457 int main(int argc, const char *argv[])
4458 {
4459         struct ctdb_context *ctdb;
4460         char *nodestring = NULL;
4461         struct poptOption popt_options[] = {
4462                 POPT_AUTOHELP
4463                 POPT_CTDB_CMDLINE
4464                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4465                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4466                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4467                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4468                 POPT_TABLEEND
4469         };
4470         int opt;
4471         const char **extra_argv;
4472         int extra_argc = 0;
4473         int ret=-1, i;
4474         poptContext pc;
4475         struct event_context *ev;
4476         const char *control;
4477
4478         setlinebuf(stdout);
4479         
4480         /* set some defaults */
4481         options.maxruntime = 0;
4482         options.timelimit = 3;
4483         options.pnn = CTDB_CURRENT_NODE;
4484
4485         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4486
4487         while ((opt = poptGetNextOpt(pc)) != -1) {
4488                 switch (opt) {
4489                 default:
4490                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4491                                 poptBadOption(pc, 0), poptStrerror(opt)));
4492                         exit(1);
4493                 }
4494         }
4495
4496         /* setup the remaining options for the main program to use */
4497         extra_argv = poptGetArgs(pc);
4498         if (extra_argv) {
4499                 extra_argv++;
4500                 while (extra_argv[extra_argc]) extra_argc++;
4501         }
4502
4503         if (extra_argc < 1) {
4504                 usage();
4505         }
4506
4507         if (options.maxruntime == 0) {
4508                 const char *ctdb_timeout;
4509                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4510                 if (ctdb_timeout != NULL) {
4511                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4512                 } else {
4513                         /* default timeout is 120 seconds */
4514                         options.maxruntime = 120;
4515                 }
4516         }
4517
4518         signal(SIGALRM, ctdb_alarm);
4519         alarm(options.maxruntime);
4520
4521         /* setup the node number to contact */
4522         if (nodestring != NULL) {
4523                 if (strcmp(nodestring, "all") == 0) {
4524                         options.pnn = CTDB_BROADCAST_ALL;
4525                 } else {
4526                         options.pnn = strtoul(nodestring, NULL, 0);
4527                 }
4528         }
4529
4530         control = extra_argv[0];
4531
4532         ev = event_context_init(NULL);
4533
4534         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4535                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4536                         int j;
4537
4538                         if (ctdb_commands[i].without_daemon == true) {
4539                                 close(2);
4540                         }
4541
4542                         /* initialise ctdb */
4543                         ctdb = ctdb_cmdline_client(ev);
4544
4545                         if (ctdb_commands[i].without_daemon == false) {
4546                                 if (ctdb == NULL) {
4547                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4548                                         exit(1);
4549                                 }
4550
4551                                 /* verify the node exists */
4552                                 verify_node(ctdb);
4553
4554                                 if (options.pnn == CTDB_CURRENT_NODE) {
4555                                         int pnn;
4556                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4557                                         if (pnn == -1) {
4558                                                 return -1;
4559                                         }
4560                                         options.pnn = pnn;
4561                                 }
4562                         }
4563
4564                         if (ctdb_commands[i].auto_all && 
4565                             options.pnn == CTDB_BROADCAST_ALL) {
4566                                 uint32_t *nodes;
4567                                 uint32_t num_nodes;
4568                                 ret = 0;
4569
4570                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4571                                 CTDB_NO_MEMORY(ctdb, nodes);
4572         
4573                                 for (j=0;j<num_nodes;j++) {
4574                                         options.pnn = nodes[j];
4575                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4576                                 }
4577                                 talloc_free(nodes);
4578                         } else {
4579                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4580                         }
4581                         break;
4582                 }
4583         }
4584
4585         if (i == ARRAY_SIZE(ctdb_commands)) {
4586                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4587                 exit(1);
4588         }
4589
4590         return ret;
4591 }