09c849cdd58d25612b5593fd37a26f556ab37c04
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
49
50 #ifdef CTDB_VERS
51 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
52 {
53 #define STR(x) #x
54 #define XSTR(x) STR(x)
55         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
56         return 0;
57 }
58 #endif
59
60
61 /*
62   verify that a node exists and is reachable
63  */
64 static void verify_node(struct ctdb_context *ctdb)
65 {
66         int ret;
67         struct ctdb_node_map *nodemap=NULL;
68
69         if (options.pnn == CTDB_CURRENT_NODE) {
70                 return;
71         }
72         if (options.pnn == CTDB_BROADCAST_ALL) {
73                 return;
74         }
75
76         /* verify the node exists */
77         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
78                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
79                 exit(10);
80         }
81         if (options.pnn >= nodemap->num) {
82                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
83                 exit(ERR_NONODE);
84         }
85         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
86                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
87                 exit(ERR_DISNODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93
94         /* verify we can access the node */
95         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
96         if (ret == -1) {
97                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
98                 exit(10);
99         }
100 }
101
102 /*
103  check if a database exists
104 */
105 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
106 {
107         int i, ret;
108         struct ctdb_dbid_map *dbmap=NULL;
109
110         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
111         if (ret != 0) {
112                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
113                 return -1;
114         }
115
116         for(i=0;i<dbmap->num;i++){
117                 const char *name;
118
119                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
120                 if (!strcmp(name, db_name)) {
121                         return 0;
122                 }
123         }
124
125         return -1;
126 }
127
128 /*
129   see if a process exists
130  */
131 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
132 {
133         uint32_t pnn, pid;
134         int ret;
135         if (argc < 1) {
136                 usage();
137         }
138
139         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
140                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
141                 return -1;
142         }
143
144         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
145         if (ret == 0) {
146                 printf("%u:%u exists\n", pnn, pid);
147         } else {
148                 printf("%u:%u does not exist\n", pnn, pid);
149         }
150         return ret;
151 }
152
153 /*
154   display statistics structure
155  */
156 static void show_statistics(struct ctdb_statistics *s)
157 {
158         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
159         int i;
160         const char *prefix=NULL;
161         int preflen=0;
162         const struct {
163                 const char *name;
164                 uint32_t offset;
165         } fields[] = {
166 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
167                 STATISTICS_FIELD(num_clients),
168                 STATISTICS_FIELD(frozen),
169                 STATISTICS_FIELD(recovering),
170                 STATISTICS_FIELD(client_packets_sent),
171                 STATISTICS_FIELD(client_packets_recv),
172                 STATISTICS_FIELD(node_packets_sent),
173                 STATISTICS_FIELD(node_packets_recv),
174                 STATISTICS_FIELD(keepalive_packets_sent),
175                 STATISTICS_FIELD(keepalive_packets_recv),
176                 STATISTICS_FIELD(node.req_call),
177                 STATISTICS_FIELD(node.reply_call),
178                 STATISTICS_FIELD(node.req_dmaster),
179                 STATISTICS_FIELD(node.reply_dmaster),
180                 STATISTICS_FIELD(node.reply_error),
181                 STATISTICS_FIELD(node.req_message),
182                 STATISTICS_FIELD(node.req_control),
183                 STATISTICS_FIELD(node.reply_control),
184                 STATISTICS_FIELD(client.req_call),
185                 STATISTICS_FIELD(client.req_message),
186                 STATISTICS_FIELD(client.req_control),
187                 STATISTICS_FIELD(timeouts.call),
188                 STATISTICS_FIELD(timeouts.control),
189                 STATISTICS_FIELD(timeouts.traverse),
190                 STATISTICS_FIELD(total_calls),
191                 STATISTICS_FIELD(pending_calls),
192                 STATISTICS_FIELD(lockwait_calls),
193                 STATISTICS_FIELD(pending_lockwait_calls),
194                 STATISTICS_FIELD(childwrite_calls),
195                 STATISTICS_FIELD(pending_childwrite_calls),
196                 STATISTICS_FIELD(memory_used),
197                 STATISTICS_FIELD(max_hop_count),
198         };
199         printf("CTDB version %u\n", CTDB_VERSION);
200         for (i=0;i<ARRAY_SIZE(fields);i++) {
201                 if (strchr(fields[i].name, '.')) {
202                         preflen = strcspn(fields[i].name, ".")+1;
203                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
204                                 prefix = fields[i].name;
205                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
206                         }
207                 } else {
208                         preflen = 0;
209                 }
210                 printf(" %*s%-22s%*s%10u\n", 
211                        preflen?4:0, "",
212                        fields[i].name+preflen, 
213                        preflen?0:4, "",
214                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
215         }
216         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
217         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
218
219         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
220         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
221         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
222         talloc_free(tmp_ctx);
223 }
224
225 /*
226   display remote ctdb statistics combined from all nodes
227  */
228 static int control_statistics_all(struct ctdb_context *ctdb)
229 {
230         int ret, i;
231         struct ctdb_statistics statistics;
232         uint32_t *nodes;
233         uint32_t num_nodes;
234
235         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
236         CTDB_NO_MEMORY(ctdb, nodes);
237         
238         ZERO_STRUCT(statistics);
239
240         for (i=0;i<num_nodes;i++) {
241                 struct ctdb_statistics s1;
242                 int j;
243                 uint32_t *v1 = (uint32_t *)&s1;
244                 uint32_t *v2 = (uint32_t *)&statistics;
245                 uint32_t num_ints = 
246                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
247                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
248                 if (ret != 0) {
249                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
250                         return ret;
251                 }
252                 for (j=0;j<num_ints;j++) {
253                         v2[j] += v1[j];
254                 }
255                 statistics.max_hop_count = 
256                         MAX(statistics.max_hop_count, s1.max_hop_count);
257                 statistics.max_call_latency = 
258                         MAX(statistics.max_call_latency, s1.max_call_latency);
259                 statistics.max_lockwait_latency = 
260                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
261         }
262         talloc_free(nodes);
263         printf("Gathered statistics for %u nodes\n", num_nodes);
264         show_statistics(&statistics);
265         return 0;
266 }
267
268 /*
269   display remote ctdb statistics
270  */
271 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
272 {
273         int ret;
274         struct ctdb_statistics statistics;
275
276         if (options.pnn == CTDB_BROADCAST_ALL) {
277                 return control_statistics_all(ctdb);
278         }
279
280         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
281         if (ret != 0) {
282                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
283                 return ret;
284         }
285         show_statistics(&statistics);
286         return 0;
287 }
288
289
290 /*
291   reset remote ctdb statistics
292  */
293 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
294 {
295         int ret;
296
297         ret = ctdb_statistics_reset(ctdb, options.pnn);
298         if (ret != 0) {
299                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
300                 return ret;
301         }
302         return 0;
303 }
304
305
306 /*
307   display uptime of remote node
308  */
309 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
310 {
311         int ret;
312         struct ctdb_uptime *uptime = NULL;
313         int tmp, days, hours, minutes, seconds;
314
315         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
316         if (ret != 0) {
317                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
318                 return ret;
319         }
320
321         if (options.machinereadable){
322                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
323                 printf(":%u:%u:%u:%lf\n",
324                         (unsigned int)uptime->current_time.tv_sec,
325                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
326                         (unsigned int)uptime->last_recovery_finished.tv_sec,
327                         timeval_delta(&uptime->last_recovery_finished,
328                                       &uptime->last_recovery_started)
329                 );
330                 return 0;
331         }
332
333         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
334
335         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
336         seconds = tmp%60;
337         tmp    /= 60;
338         minutes = tmp%60;
339         tmp    /= 60;
340         hours   = tmp%24;
341         tmp    /= 24;
342         days    = tmp;
343         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
344
345         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
346         seconds = tmp%60;
347         tmp    /= 60;
348         minutes = tmp%60;
349         tmp    /= 60;
350         hours   = tmp%24;
351         tmp    /= 24;
352         days    = tmp;
353         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
354         
355         printf("Duration of last recovery/failover: %lf seconds\n",
356                 timeval_delta(&uptime->last_recovery_finished,
357                               &uptime->last_recovery_started));
358
359         return 0;
360 }
361
362 /*
363   show the PNN of the current node
364  */
365 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
366 {
367         int mypnn;
368
369         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
370         if (mypnn == -1) {
371                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
372                 return -1;
373         }
374
375         printf("PNN:%d\n", mypnn);
376         return 0;
377 }
378
379
380 struct pnn_node {
381         struct pnn_node *next;
382         const char *addr;
383         int pnn;
384 };
385
386 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
387 {
388         const char *nodes_list;
389         int nlines;
390         char **lines;
391         int i, pnn;
392         struct pnn_node *pnn_nodes = NULL;
393         struct pnn_node *pnn_node;
394         struct pnn_node *tmp_node;
395
396         /* read the nodes file */
397         nodes_list = getenv("CTDB_NODES");
398         if (nodes_list == NULL) {
399                 nodes_list = "/etc/ctdb/nodes";
400         }
401         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
402         if (lines == NULL) {
403                 return NULL;
404         }
405         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
406                 nlines--;
407         }
408         for (i=0, pnn=0; i<nlines; i++) {
409                 char *node;
410
411                 node = lines[i];
412                 /* strip leading spaces */
413                 while((*node == ' ') || (*node == '\t')) {
414                         node++;
415                 }
416                 if (*node == '#') {
417                         pnn++;
418                         continue;
419                 }
420                 if (strcmp(node, "") == 0) {
421                         continue;
422                 }
423                 pnn_node = talloc(mem_ctx, struct pnn_node);
424                 pnn_node->pnn = pnn++;
425                 pnn_node->addr = talloc_strdup(pnn_node, node);
426                 pnn_node->next = pnn_nodes;
427                 pnn_nodes = pnn_node;
428         }
429
430         /* swap them around so we return them in incrementing order */
431         pnn_node = pnn_nodes;
432         pnn_nodes = NULL;
433         while (pnn_node) {
434                 tmp_node = pnn_node;
435                 pnn_node = pnn_node->next;
436
437                 tmp_node->next = pnn_nodes;
438                 pnn_nodes = tmp_node;
439         }
440
441         return pnn_nodes;
442 }
443
444 /*
445   show the PNN of the current node
446   discover the pnn by loading the nodes file and try to bind to all
447   addresses one at a time until the ip address is found.
448  */
449 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
450 {
451         TALLOC_CTX *mem_ctx = talloc_new(NULL);
452         struct pnn_node *pnn_nodes;
453         struct pnn_node *pnn_node;
454
455         pnn_nodes = read_nodes_file(mem_ctx);
456         if (pnn_nodes == NULL) {
457                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
458                 talloc_free(mem_ctx);
459                 return -1;
460         }
461
462         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
463                 ctdb_sock_addr addr;
464
465                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
466                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
467                         talloc_free(mem_ctx);
468                         return -1;
469                 }
470
471                 if (ctdb_sys_have_ip(&addr)) {
472                         printf("PNN:%d\n", pnn_node->pnn);
473                         talloc_free(mem_ctx);
474                         return 0;
475                 }
476         }
477
478         printf("Failed to detect which PNN this node is\n");
479         talloc_free(mem_ctx);
480         return -1;
481 }
482
483 /*
484   display remote ctdb status
485  */
486 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
487 {
488         int i, ret;
489         struct ctdb_vnn_map *vnnmap=NULL;
490         struct ctdb_node_map *nodemap=NULL;
491         uint32_t recmode, recmaster;
492         int mypnn;
493
494         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
495         if (mypnn == -1) {
496                 return -1;
497         }
498
499         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
500         if (ret != 0) {
501                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
502                 return ret;
503         }
504
505         if(options.machinereadable){
506                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
507                 for(i=0;i<nodemap->num;i++){
508                         int partially_online = 0;
509                         int j;
510
511                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
512                                 continue;
513                         }
514                         if (nodemap->nodes[i].flags == 0) {
515                                 struct ctdb_control_get_ifaces *ifaces;
516
517                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
518                                                            nodemap->nodes[i].pnn,
519                                                            ctdb, &ifaces);
520                                 if (ret == 0) {
521                                         for (j=0; j < ifaces->num; j++) {
522                                                 if (ifaces->ifaces[j].link_state != 0) {
523                                                         continue;
524                                                 }
525                                                 partially_online = 1;
526                                                 break;
527                                         }
528                                         talloc_free(ifaces);
529                                 }
530                         }
531                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
532                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
533                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
538                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
539                                partially_online);
540                 }
541                 return 0;
542         }
543
544         printf("Number of nodes:%d\n", nodemap->num);
545         for(i=0;i<nodemap->num;i++){
546                 static const struct {
547                         uint32_t flag;
548                         const char *name;
549                 } flag_names[] = {
550                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
551                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
552                         { NODE_FLAGS_BANNED,                "BANNED" },
553                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
554                         { NODE_FLAGS_DELETED,               "DELETED" },
555                         { NODE_FLAGS_STOPPED,               "STOPPED" },
556                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
557                 };
558                 char *flags_str = NULL;
559                 int j;
560
561                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
562                         continue;
563                 }
564                 if (nodemap->nodes[i].flags == 0) {
565                         struct ctdb_control_get_ifaces *ifaces;
566
567                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
568                                                    nodemap->nodes[i].pnn,
569                                                    ctdb, &ifaces);
570                         if (ret == 0) {
571                                 for (j=0; j < ifaces->num; j++) {
572                                         if (ifaces->ifaces[j].link_state != 0) {
573                                                 continue;
574                                         }
575                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
576                                         break;
577                                 }
578                                 talloc_free(ifaces);
579                         }
580                 }
581                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
582                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
583                                 if (flags_str == NULL) {
584                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
585                                 } else {
586                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
587                                                                            flag_names[j].name);
588                                 }
589                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
590                         }
591                 }
592                 if (flags_str == NULL) {
593                         flags_str = talloc_strdup(ctdb, "OK");
594                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
595                 }
596                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
597                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
598                        flags_str,
599                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
600                 talloc_free(flags_str);
601         }
602
603         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
604         if (ret != 0) {
605                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
606                 return ret;
607         }
608         if (vnnmap->generation == INVALID_GENERATION) {
609                 printf("Generation:INVALID\n");
610         } else {
611                 printf("Generation:%d\n",vnnmap->generation);
612         }
613         printf("Size:%d\n",vnnmap->size);
614         for(i=0;i<vnnmap->size;i++){
615                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
616         }
617
618         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
619         if (ret != 0) {
620                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
621                 return ret;
622         }
623         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
624
625         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
626         if (ret != 0) {
627                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
628                 return ret;
629         }
630         printf("Recovery master:%d\n",recmaster);
631
632         return 0;
633 }
634
635
636 struct natgw_node {
637         struct natgw_node *next;
638         const char *addr;
639 };
640
641 /*
642   display the list of nodes belonging to this natgw configuration
643  */
644 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
645 {
646         int i, ret;
647         const char *natgw_list;
648         int nlines;
649         char **lines;
650         struct natgw_node *natgw_nodes = NULL;
651         struct natgw_node *natgw_node;
652         struct ctdb_node_map *nodemap=NULL;
653
654
655         /* read the natgw nodes file into a linked list */
656         natgw_list = getenv("NATGW_NODES");
657         if (natgw_list == NULL) {
658                 natgw_list = "/etc/ctdb/natgw_nodes";
659         }
660         lines = file_lines_load(natgw_list, &nlines, ctdb);
661         if (lines == NULL) {
662                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
663                 return -1;
664         }
665         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
666                 nlines--;
667         }
668         for (i=0;i<nlines;i++) {
669                 char *node;
670
671                 node = lines[i];
672                 /* strip leading spaces */
673                 while((*node == ' ') || (*node == '\t')) {
674                         node++;
675                 }
676                 if (*node == '#') {
677                         continue;
678                 }
679                 if (strcmp(node, "") == 0) {
680                         continue;
681                 }
682                 natgw_node = talloc(ctdb, struct natgw_node);
683                 natgw_node->addr = talloc_strdup(natgw_node, node);
684                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
685                 natgw_node->next = natgw_nodes;
686                 natgw_nodes = natgw_node;
687         }
688
689         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
690         if (ret != 0) {
691                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
692                 return ret;
693         }
694
695         i=0;
696         while(i<nodemap->num) {
697                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
698                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
699                                 break;
700                         }
701                 }
702
703                 /* this node was not in the natgw so we just remove it from
704                  * the list
705                  */
706                 if ((natgw_node == NULL) 
707                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
708                         int j;
709
710                         for (j=i+1; j<nodemap->num; j++) {
711                                 nodemap->nodes[j-1] = nodemap->nodes[j];
712                         }
713                         nodemap->num--;
714                         continue;
715                 }
716
717                 i++;
718         }               
719
720         /* pick a node to be natgwmaster
721          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
722          */
723         for(i=0;i<nodemap->num;i++){
724                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
725                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
726                         break;
727                 }
728         }
729         /* we couldnt find any healthy node, try unhealthy ones */
730         if (i == nodemap->num) {
731                 for(i=0;i<nodemap->num;i++){
732                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
733                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
734                                 break;
735                         }
736                 }
737         }
738         /* unless all nodes are STOPPED, when we pick one anyway */
739         if (i == nodemap->num) {
740                 for(i=0;i<nodemap->num;i++){
741                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
742                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
743                                 break;
744                         }
745                 }
746                 /* or if we still can not find any */
747                 if (i == nodemap->num) {
748                         printf("-1 0.0.0.0\n");
749                 }
750         }
751
752         /* print the pruned list of nodes belonging to this natgw list */
753         for(i=0;i<nodemap->num;i++){
754                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
758                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
759                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
760                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
761                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
762                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
763                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
764         }
765
766         return 0;
767 }
768
769 /*
770   display the status of the scripts for monitoring (or other events)
771  */
772 static int control_one_scriptstatus(struct ctdb_context *ctdb,
773                                     enum ctdb_eventscript_call type)
774 {
775         struct ctdb_scripts_wire *script_status;
776         int ret, i;
777
778         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
779         if (ret != 0) {
780                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
781                 return ret;
782         }
783
784         if (script_status == NULL) {
785                 if (!options.machinereadable) {
786                         printf("%s cycle never run\n",
787                                ctdb_eventscript_call_names[type]);
788                 }
789                 return 0;
790         }
791
792         if (!options.machinereadable) {
793                 printf("%d scripts were executed last %s cycle\n",
794                        script_status->num_scripts,
795                        ctdb_eventscript_call_names[type]);
796         }
797         for (i=0; i<script_status->num_scripts; i++) {
798                 const char *status = NULL;
799
800                 switch (script_status->scripts[i].status) {
801                 case -ETIME:
802                         status = "TIMEDOUT";
803                         break;
804                 case -ENOEXEC:
805                         status = "DISABLED";
806                         break;
807                 case 0:
808                         status = "OK";
809                         break;
810                 default:
811                         if (script_status->scripts[i].status > 0)
812                                 status = "ERROR";
813                         break;
814                 }
815                 if (options.machinereadable) {
816                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
817                                ctdb_eventscript_call_names[type],
818                                script_status->scripts[i].name,
819                                script_status->scripts[i].status,
820                                status,
821                                (long)script_status->scripts[i].start.tv_sec,
822                                (long)script_status->scripts[i].start.tv_usec,
823                                (long)script_status->scripts[i].finished.tv_sec,
824                                (long)script_status->scripts[i].finished.tv_usec,
825                                script_status->scripts[i].output);
826                         continue;
827                 }
828                 if (status)
829                         printf("%-20s Status:%s    ",
830                                script_status->scripts[i].name, status);
831                 else
832                         /* Some other error, eg from stat. */
833                         printf("%-20s Status:CANNOT RUN (%s)",
834                                script_status->scripts[i].name,
835                                strerror(-script_status->scripts[i].status));
836
837                 if (script_status->scripts[i].status >= 0) {
838                         printf("Duration:%.3lf ",
839                         timeval_delta(&script_status->scripts[i].finished,
840                               &script_status->scripts[i].start));
841                 }
842                 if (script_status->scripts[i].status != -ENOEXEC) {
843                         printf("%s",
844                                ctime(&script_status->scripts[i].start.tv_sec));
845                         if (script_status->scripts[i].status != 0) {
846                                 printf("   OUTPUT:%s\n",
847                                        script_status->scripts[i].output);
848                         }
849                 } else {
850                         printf("\n");
851                 }
852         }
853         return 0;
854 }
855
856
857 static int control_scriptstatus(struct ctdb_context *ctdb,
858                                 int argc, const char **argv)
859 {
860         int ret;
861         enum ctdb_eventscript_call type, min, max;
862         const char *arg;
863
864         if (argc > 1) {
865                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
866                 return -1;
867         }
868
869         if (argc == 0)
870                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
871         else
872                 arg = argv[0];
873
874         for (type = 0; type < CTDB_EVENT_MAX; type++) {
875                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
876                         min = type;
877                         max = type+1;
878                         break;
879                 }
880         }
881         if (type == CTDB_EVENT_MAX) {
882                 if (strcmp(arg, "all") == 0) {
883                         min = 0;
884                         max = CTDB_EVENT_MAX;
885                 } else {
886                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
887                         return -1;
888                 }
889         }
890
891         if (options.machinereadable) {
892                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
893         }
894
895         for (type = min; type < max; type++) {
896                 ret = control_one_scriptstatus(ctdb, type);
897                 if (ret != 0) {
898                         return ret;
899                 }
900         }
901
902         return 0;
903 }
904
905 /*
906   enable an eventscript
907  */
908 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
909 {
910         int ret;
911
912         if (argc < 1) {
913                 usage();
914         }
915
916         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
917         if (ret != 0) {
918           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
919                 return ret;
920         }
921
922         return 0;
923 }
924
925 /*
926   disable an eventscript
927  */
928 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
929 {
930         int ret;
931
932         if (argc < 1) {
933                 usage();
934         }
935
936         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
937         if (ret != 0) {
938           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
939                 return ret;
940         }
941
942         return 0;
943 }
944
945 /*
946   display the pnn of the recovery master
947  */
948 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
949 {
950         int ret;
951         uint32_t recmaster;
952
953         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
954         if (ret != 0) {
955                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
956                 return ret;
957         }
958         printf("%d\n",recmaster);
959
960         return 0;
961 }
962
963 /*
964   get a list of all tickles for this pnn
965  */
966 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
967 {
968         struct ctdb_control_tcp_tickle_list *list;
969         ctdb_sock_addr addr;
970         int i, ret;
971
972         if (argc < 1) {
973                 usage();
974         }
975
976         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
977                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
978                 return -1;
979         }
980
981         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
982         if (ret == -1) {
983                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
984                 return -1;
985         }
986
987         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
988         printf("Num tickles:%u\n", list->tickles.num);
989         for (i=0;i<list->tickles.num;i++) {
990                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
991                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
992         }
993
994         talloc_free(list);
995         
996         return 0;
997 }
998
999
1000 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1001 {
1002         struct ctdb_all_public_ips *ips;
1003         struct ctdb_public_ip ip;
1004         int i, ret;
1005         uint32_t *nodes;
1006         uint32_t disable_time;
1007         TDB_DATA data;
1008         struct ctdb_node_map *nodemap=NULL;
1009         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1010
1011         disable_time = 30;
1012         data.dptr  = (uint8_t*)&disable_time;
1013         data.dsize = sizeof(disable_time);
1014         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1017                 return -1;
1018         }
1019
1020
1021
1022         /* read the public ip list from the node */
1023         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         for (i=0;i<ips->num;i++) {
1031                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1032                         break;
1033                 }
1034         }
1035         if (i==ips->num) {
1036                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1037                         pnn, ctdb_addr_to_str(addr)));
1038                 talloc_free(tmp_ctx);
1039                 return -1;
1040         }
1041
1042         ip.pnn  = pnn;
1043         ip.addr = *addr;
1044
1045         data.dptr  = (uint8_t *)&ip;
1046         data.dsize = sizeof(ip);
1047
1048         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1049         if (ret != 0) {
1050                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1051                 talloc_free(tmp_ctx);
1052                 return ret;
1053         }
1054
1055         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1056         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1057                                         nodes, 0,
1058                                         LONGTIMELIMIT(),
1059                                         false, data,
1060                                         NULL, NULL,
1061                                         NULL);
1062         if (ret != 0) {
1063                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1064                 talloc_free(tmp_ctx);
1065                 return -1;
1066         }
1067
1068         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1071                 talloc_free(tmp_ctx);
1072                 return -1;
1073         }
1074
1075         talloc_free(tmp_ctx);
1076         return 0;
1077 }
1078
1079 /*
1080   move/failover an ip address to a specific node
1081  */
1082 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1083 {
1084         uint32_t pnn;
1085         ctdb_sock_addr addr;
1086
1087         if (argc < 2) {
1088                 usage();
1089                 return -1;
1090         }
1091
1092         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1093                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1094                 return -1;
1095         }
1096
1097
1098         if (sscanf(argv[1], "%u", &pnn) != 1) {
1099                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1100                 return -1;
1101         }
1102
1103         if (move_ip(ctdb, &addr, pnn) != 0) {
1104                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1105                 return -1;
1106         }
1107
1108         return 0;
1109 }
1110
1111 void getips_store_callback(void *param, void *data)
1112 {
1113         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1114         struct ctdb_all_public_ips *ips = param;
1115         int i;
1116
1117         i = ips->num++;
1118         ips->ips[i].pnn  = node_ip->pnn;
1119         ips->ips[i].addr = node_ip->addr;
1120 }
1121
1122 void getips_count_callback(void *param, void *data)
1123 {
1124         uint32_t *count = param;
1125
1126         (*count)++;
1127 }
1128
1129 #define IP_KEYLEN       4
1130 static uint32_t *ip_key(ctdb_sock_addr *ip)
1131 {
1132         static uint32_t key[IP_KEYLEN];
1133
1134         bzero(key, sizeof(key));
1135
1136         switch (ip->sa.sa_family) {
1137         case AF_INET:
1138                 key[0]  = ip->ip.sin_addr.s_addr;
1139                 break;
1140         case AF_INET6:
1141                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1142                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1143                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1144                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1145                 break;
1146         default:
1147                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1148                 return key;
1149         }
1150
1151         return key;
1152 }
1153
1154 static void *add_ip_callback(void *parm, void *data)
1155 {
1156         return parm;
1157 }
1158
1159 static int
1160 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1161 {
1162         struct ctdb_all_public_ips *tmp_ips;
1163         struct ctdb_node_map *nodemap=NULL;
1164         trbt_tree_t *ip_tree;
1165         int i, j, len, ret;
1166         uint32_t count;
1167
1168         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1169         if (ret != 0) {
1170                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1171                 return ret;
1172         }
1173
1174         ip_tree = trbt_create(tmp_ctx, 0);
1175
1176         for(i=0;i<nodemap->num;i++){
1177                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1178                         continue;
1179                 }
1180                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1181                         continue;
1182                 }
1183
1184                 /* read the public ip list from this node */
1185                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1186                 if (ret != 0) {
1187                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1188                         return -1;
1189                 }
1190         
1191                 for (j=0; j<tmp_ips->num;j++) {
1192                         struct ctdb_public_ip *node_ip;
1193
1194                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1195                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1196                         node_ip->addr = tmp_ips->ips[j].addr;
1197
1198                         trbt_insertarray32_callback(ip_tree,
1199                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1200                                 add_ip_callback,
1201                                 node_ip);
1202                 }
1203                 talloc_free(tmp_ips);
1204         }
1205
1206         /* traverse */
1207         count = 0;
1208         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1209
1210         len = offsetof(struct ctdb_all_public_ips, ips) + 
1211                 count*sizeof(struct ctdb_public_ip);
1212         tmp_ips = talloc_zero_size(tmp_ctx, len);
1213         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1214
1215         *ips = tmp_ips;
1216
1217         return 0;
1218 }
1219
1220
1221 /* 
1222  * scans all other nodes and returns a pnn for another node that can host this 
1223  * ip address or -1
1224  */
1225 static int
1226 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1227 {
1228         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1229         struct ctdb_all_public_ips *ips;
1230         struct ctdb_node_map *nodemap=NULL;
1231         int i, j, ret;
1232
1233         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1234         if (ret != 0) {
1235                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1236                 talloc_free(tmp_ctx);
1237                 return ret;
1238         }
1239
1240         for(i=0;i<nodemap->num;i++){
1241                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1242                         continue;
1243                 }
1244                 if (nodemap->nodes[i].pnn == options.pnn) {
1245                         continue;
1246                 }
1247
1248                 /* read the public ip list from this node */
1249                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1250                 if (ret != 0) {
1251                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1252                         return -1;
1253                 }
1254
1255                 for (j=0;j<ips->num;j++) {
1256                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1257                                 talloc_free(tmp_ctx);
1258                                 return nodemap->nodes[i].pnn;
1259                         }
1260                 }
1261                 talloc_free(ips);
1262         }
1263
1264         talloc_free(tmp_ctx);
1265         return -1;
1266 }
1267
1268 /*
1269   add a public ip address to a node
1270  */
1271 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1272 {
1273         int i, ret;
1274         int len;
1275         uint32_t pnn;
1276         unsigned mask;
1277         ctdb_sock_addr addr;
1278         struct ctdb_control_ip_iface *pub;
1279         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1280         struct ctdb_all_public_ips *ips;
1281
1282
1283         if (argc != 2) {
1284                 talloc_free(tmp_ctx);
1285                 usage();
1286         }
1287
1288         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1289                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1290                 talloc_free(tmp_ctx);
1291                 return -1;
1292         }
1293
1294         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1297                 talloc_free(tmp_ctx);
1298                 return ret;
1299         }
1300
1301
1302         /* check if some other node is already serving this ip, if not,
1303          * we will claim it
1304          */
1305         for (i=0;i<ips->num;i++) {
1306                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1307                         break;
1308                 }
1309         }
1310
1311         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1312         pub = talloc_size(tmp_ctx, len); 
1313         CTDB_NO_MEMORY(ctdb, pub);
1314
1315         pub->addr  = addr;
1316         pub->mask  = mask;
1317         pub->len   = strlen(argv[1])+1;
1318         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1319
1320         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1321         if (ret != 0) {
1322                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1323                 talloc_free(tmp_ctx);
1324                 return ret;
1325         }
1326
1327         if (i == ips->num) {
1328                 /* no one has this ip so we claim it */
1329                 pnn  = options.pnn;
1330         } else {
1331                 pnn  = ips->ips[i].pnn;
1332         }
1333
1334         if (move_ip(ctdb, &addr, pnn) != 0) {
1335                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1336                 return -1;
1337         }
1338
1339         talloc_free(tmp_ctx);
1340         return 0;
1341 }
1342
1343 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1344
1345 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1346 {
1347         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1348         struct ctdb_node_map *nodemap=NULL;
1349         struct ctdb_all_public_ips *ips;
1350         int ret, i, j;
1351
1352         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1353         if (ret != 0) {
1354                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1355                 return ret;
1356         }
1357
1358         /* remove it from the nodes that are not hosting the ip currently */
1359         for(i=0;i<nodemap->num;i++){
1360                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1361                         continue;
1362                 }
1363                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1364                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1365                         continue;
1366                 }
1367
1368                 for (j=0;j<ips->num;j++) {
1369                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1370                                 break;
1371                         }
1372                 }
1373                 if (j==ips->num) {
1374                         continue;
1375                 }
1376
1377                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1378                         continue;
1379                 }
1380
1381                 options.pnn = nodemap->nodes[i].pnn;
1382                 control_delip(ctdb, argc, argv);
1383         }
1384
1385
1386         /* remove it from every node (also the one hosting it) */
1387         for(i=0;i<nodemap->num;i++){
1388                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1389                         continue;
1390                 }
1391                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1392                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1393                         continue;
1394                 }
1395
1396                 for (j=0;j<ips->num;j++) {
1397                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1398                                 break;
1399                         }
1400                 }
1401                 if (j==ips->num) {
1402                         continue;
1403                 }
1404
1405                 options.pnn = nodemap->nodes[i].pnn;
1406                 control_delip(ctdb, argc, argv);
1407         }
1408
1409         talloc_free(tmp_ctx);
1410         return 0;
1411 }
1412         
1413 /*
1414   delete a public ip address from a node
1415  */
1416 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1417 {
1418         int i, ret;
1419         ctdb_sock_addr addr;
1420         struct ctdb_control_ip_iface pub;
1421         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1422         struct ctdb_all_public_ips *ips;
1423
1424         if (argc != 1) {
1425                 talloc_free(tmp_ctx);
1426                 usage();
1427         }
1428
1429         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1430                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1431                 return -1;
1432         }
1433
1434         if (options.pnn == CTDB_BROADCAST_ALL) {
1435                 return control_delip_all(ctdb, argc, argv, &addr);
1436         }
1437
1438         pub.addr  = addr;
1439         pub.mask  = 0;
1440         pub.len   = 0;
1441
1442         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1445                 talloc_free(tmp_ctx);
1446                 return ret;
1447         }
1448         
1449         for (i=0;i<ips->num;i++) {
1450                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1451                         break;
1452                 }
1453         }
1454
1455         if (i==ips->num) {
1456                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1457                         ctdb_addr_to_str(&addr)));
1458                 talloc_free(tmp_ctx);
1459                 return -1;
1460         }
1461
1462         if (ips->ips[i].pnn == options.pnn) {
1463                 ret = find_other_host_for_public_ip(ctdb, &addr);
1464                 if (ret != -1) {
1465                         if (move_ip(ctdb, &addr, ret) != 0) {
1466                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1467                                 return -1;
1468                         }
1469                 }
1470         }
1471
1472         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1473         if (ret != 0) {
1474                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1475                 talloc_free(tmp_ctx);
1476                 return ret;
1477         }
1478
1479         talloc_free(tmp_ctx);
1480         return 0;
1481 }
1482
1483 /*
1484   kill a tcp connection
1485  */
1486 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1487 {
1488         int ret;
1489         struct ctdb_control_killtcp killtcp;
1490
1491         if (argc < 2) {
1492                 usage();
1493         }
1494
1495         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1496                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1497                 return -1;
1498         }
1499
1500         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1501                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1502                 return -1;
1503         }
1504
1505         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1506         if (ret != 0) {
1507                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1508                 return ret;
1509         }
1510
1511         return 0;
1512 }
1513
1514
1515 /*
1516   send a gratious arp
1517  */
1518 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1519 {
1520         int ret;
1521         ctdb_sock_addr addr;
1522
1523         if (argc < 2) {
1524                 usage();
1525         }
1526
1527         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1528                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1529                 return -1;
1530         }
1531
1532         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1533         if (ret != 0) {
1534                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1535                 return ret;
1536         }
1537
1538         return 0;
1539 }
1540
1541 /*
1542   register a server id
1543  */
1544 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1545 {
1546         int ret;
1547         struct ctdb_server_id server_id;
1548
1549         if (argc < 3) {
1550                 usage();
1551         }
1552
1553         server_id.pnn       = strtoul(argv[0], NULL, 0);
1554         server_id.type      = strtoul(argv[1], NULL, 0);
1555         server_id.server_id = strtoul(argv[2], NULL, 0);
1556
1557         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1558         if (ret != 0) {
1559                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1560                 return ret;
1561         }
1562         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1563         sleep(999);
1564         return -1;
1565 }
1566
1567 /*
1568   unregister a server id
1569  */
1570 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1571 {
1572         int ret;
1573         struct ctdb_server_id server_id;
1574
1575         if (argc < 3) {
1576                 usage();
1577         }
1578
1579         server_id.pnn       = strtoul(argv[0], NULL, 0);
1580         server_id.type      = strtoul(argv[1], NULL, 0);
1581         server_id.server_id = strtoul(argv[2], NULL, 0);
1582
1583         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1584         if (ret != 0) {
1585                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1586                 return ret;
1587         }
1588         return -1;
1589 }
1590
1591 /*
1592   check if a server id exists
1593  */
1594 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1595 {
1596         uint32_t status;
1597         int ret;
1598         struct ctdb_server_id server_id;
1599
1600         if (argc < 3) {
1601                 usage();
1602         }
1603
1604         server_id.pnn       = strtoul(argv[0], NULL, 0);
1605         server_id.type      = strtoul(argv[1], NULL, 0);
1606         server_id.server_id = strtoul(argv[2], NULL, 0);
1607
1608         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1609         if (ret != 0) {
1610                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1611                 return ret;
1612         }
1613
1614         if (status) {
1615                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1616         } else {
1617                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1618         }
1619         return 0;
1620 }
1621
1622 /*
1623   get a list of all server ids that are registered on a node
1624  */
1625 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1626 {
1627         int i, ret;
1628         struct ctdb_server_id_list *server_ids;
1629
1630         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1631         if (ret != 0) {
1632                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1633                 return ret;
1634         }
1635
1636         for (i=0; i<server_ids->num; i++) {
1637                 printf("Server id %d:%d:%d\n", 
1638                         server_ids->server_ids[i].pnn, 
1639                         server_ids->server_ids[i].type, 
1640                         server_ids->server_ids[i].server_id); 
1641         }
1642
1643         return -1;
1644 }
1645
1646 /*
1647   send a tcp tickle ack
1648  */
1649 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1650 {
1651         int ret;
1652         ctdb_sock_addr  src, dst;
1653
1654         if (argc < 2) {
1655                 usage();
1656         }
1657
1658         if (!parse_ip_port(argv[0], &src)) {
1659                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1660                 return -1;
1661         }
1662
1663         if (!parse_ip_port(argv[1], &dst)) {
1664                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1665                 return -1;
1666         }
1667
1668         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1669         if (ret==0) {
1670                 return 0;
1671         }
1672         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1673
1674         return -1;
1675 }
1676
1677
1678 /*
1679   display public ip status
1680  */
1681 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1682 {
1683         int i, ret;
1684         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1685         struct ctdb_all_public_ips *ips;
1686
1687         if (options.pnn == CTDB_BROADCAST_ALL) {
1688                 /* read the list of public ips from all nodes */
1689                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1690         } else {
1691                 /* read the public ip list from this node */
1692                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1693         }
1694         if (ret != 0) {
1695                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1696                 talloc_free(tmp_ctx);
1697                 return ret;
1698         }
1699
1700         if (options.machinereadable){
1701                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1702         } else {
1703                 if (options.pnn == CTDB_BROADCAST_ALL) {
1704                         printf("Public IPs on ALL nodes\n");
1705                 } else {
1706                         printf("Public IPs on node %u\n", options.pnn);
1707                 }
1708         }
1709
1710         for (i=1;i<=ips->num;i++) {
1711                 struct ctdb_control_public_ip_info *info = NULL;
1712                 int32_t pnn;
1713                 char *aciface = NULL;
1714                 char *avifaces = NULL;
1715                 char *cifaces = NULL;
1716
1717                 if (options.pnn == CTDB_BROADCAST_ALL) {
1718                         pnn = ips->ips[ips->num-i].pnn;
1719                 } else {
1720                         pnn = options.pnn;
1721                 }
1722
1723                 if (pnn != -1) {
1724                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1725                                                    &ips->ips[ips->num-i].addr, &info);
1726                 } else {
1727                         ret = -1;
1728                 }
1729
1730                 if (ret == 0) {
1731                         int j;
1732                         for (j=0; j < info->num; j++) {
1733                                 if (cifaces == NULL) {
1734                                         cifaces = talloc_strdup(info,
1735                                                                 info->ifaces[j].name);
1736                                 } else {
1737                                         cifaces = talloc_asprintf_append(cifaces,
1738                                                                          ",%s",
1739                                                                          info->ifaces[j].name);
1740                                 }
1741
1742                                 if (info->active_idx == j) {
1743                                         aciface = info->ifaces[j].name;
1744                                 }
1745
1746                                 if (info->ifaces[j].link_state == 0) {
1747                                         continue;
1748                                 }
1749
1750                                 if (avifaces == NULL) {
1751                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1752                                 } else {
1753                                         avifaces = talloc_asprintf_append(avifaces,
1754                                                                           ",%s",
1755                                                                           info->ifaces[j].name);
1756                                 }
1757                         }
1758                 }
1759
1760                 if (options.machinereadable){
1761                         printf(":%s:%d:%s:%s:%s:\n",
1762                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1763                                ips->ips[ips->num-i].pnn,
1764                                aciface?aciface:"",
1765                                avifaces?avifaces:"",
1766                                cifaces?cifaces:"");
1767                 } else {
1768                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1769                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1770                                ips->ips[ips->num-i].pnn,
1771                                aciface?aciface:"",
1772                                avifaces?avifaces:"",
1773                                cifaces?cifaces:"");
1774                 }
1775                 talloc_free(info);
1776         }
1777
1778         talloc_free(tmp_ctx);
1779         return 0;
1780 }
1781
1782 /*
1783   public ip info
1784  */
1785 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1786 {
1787         int i, ret;
1788         ctdb_sock_addr addr;
1789         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1790         struct ctdb_control_public_ip_info *info;
1791
1792         if (argc != 1) {
1793                 talloc_free(tmp_ctx);
1794                 usage();
1795         }
1796
1797         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1798                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1799                 return -1;
1800         }
1801
1802         /* read the public ip info from this node */
1803         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1804                                            tmp_ctx, &addr, &info);
1805         if (ret != 0) {
1806                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1807                                   argv[0], options.pnn));
1808                 talloc_free(tmp_ctx);
1809                 return ret;
1810         }
1811
1812         printf("Public IP[%s] info on node %u\n",
1813                ctdb_addr_to_str(&info->ip.addr),
1814                options.pnn);
1815
1816         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1817                ctdb_addr_to_str(&info->ip.addr),
1818                info->ip.pnn, info->num);
1819
1820         for (i=0; i<info->num; i++) {
1821                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1822
1823                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1824                        i+1, info->ifaces[i].name,
1825                        info->ifaces[i].link_state?"up":"down",
1826                        (unsigned int)info->ifaces[i].references,
1827                        (i==info->active_idx)?" (active)":"");
1828         }
1829
1830         talloc_free(tmp_ctx);
1831         return 0;
1832 }
1833
1834 /*
1835   display interfaces status
1836  */
1837 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1838 {
1839         int i, ret;
1840         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1841         struct ctdb_control_get_ifaces *ifaces;
1842
1843         /* read the public ip list from this node */
1844         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1845                                    tmp_ctx, &ifaces);
1846         if (ret != 0) {
1847                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1848                                   options.pnn));
1849                 talloc_free(tmp_ctx);
1850                 return ret;
1851         }
1852
1853         if (options.machinereadable){
1854                 printf(":Name:LinkStatus:References:\n");
1855         } else {
1856                 printf("Interfaces on node %u\n", options.pnn);
1857         }
1858
1859         for (i=0; i<ifaces->num; i++) {
1860                 if (options.machinereadable){
1861                         printf(":%s:%s:%u\n",
1862                                ifaces->ifaces[i].name,
1863                                ifaces->ifaces[i].link_state?"1":"0",
1864                                (unsigned int)ifaces->ifaces[i].references);
1865                 } else {
1866                         printf("name:%s link:%s references:%u\n",
1867                                ifaces->ifaces[i].name,
1868                                ifaces->ifaces[i].link_state?"up":"down",
1869                                (unsigned int)ifaces->ifaces[i].references);
1870                 }
1871         }
1872
1873         talloc_free(tmp_ctx);
1874         return 0;
1875 }
1876
1877
1878 /*
1879   set link status of an interface
1880  */
1881 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1882 {
1883         int ret;
1884         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1885         struct ctdb_control_iface_info info;
1886
1887         ZERO_STRUCT(info);
1888
1889         if (argc != 2) {
1890                 usage();
1891         }
1892
1893         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1894                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1895                                   argv[0]));
1896                 talloc_free(tmp_ctx);
1897                 return -1;
1898         }
1899         strcpy(info.name, argv[0]);
1900
1901         if (strcmp(argv[1], "up") == 0) {
1902                 info.link_state = 1;
1903         } else if (strcmp(argv[1], "down") == 0) {
1904                 info.link_state = 0;
1905         } else {
1906                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1907                                   argv[1]));
1908                 talloc_free(tmp_ctx);
1909                 return -1;
1910         }
1911
1912         /* read the public ip list from this node */
1913         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1914                                    tmp_ctx, &info);
1915         if (ret != 0) {
1916                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1917                                   argv[0], options.pnn));
1918                 talloc_free(tmp_ctx);
1919                 return ret;
1920         }
1921
1922         talloc_free(tmp_ctx);
1923         return 0;
1924 }
1925
1926 /*
1927   display pid of a ctdb daemon
1928  */
1929 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1930 {
1931         uint32_t pid;
1932         int ret;
1933
1934         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1935         if (ret != 0) {
1936                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1937                 return ret;
1938         }
1939         printf("Pid:%d\n", pid);
1940
1941         return 0;
1942 }
1943
1944 static uint32_t ipreallocate_finished;
1945
1946 /*
1947   handler for receiving the response to ipreallocate
1948 */
1949 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1950                              TDB_DATA data, void *private_data)
1951 {
1952         ipreallocate_finished = 1;
1953 }
1954
1955 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1956 {
1957         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1958
1959         event_add_timed(ctdb->ev, ctdb, 
1960                                 timeval_current_ofs(1, 0),
1961                                 ctdb_every_second, ctdb);
1962 }
1963
1964 /*
1965   ask the recovery daemon on the recovery master to perform a ip reallocation
1966  */
1967 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1968 {
1969         int i, ret;
1970         TDB_DATA data;
1971         struct takeover_run_reply rd;
1972         uint32_t recmaster;
1973         struct ctdb_node_map *nodemap=NULL;
1974         int retries=0;
1975         struct timeval tv = timeval_current();
1976
1977         /* we need some events to trigger so we can timeout and restart
1978            the loop
1979         */
1980         event_add_timed(ctdb->ev, ctdb, 
1981                                 timeval_current_ofs(1, 0),
1982                                 ctdb_every_second, ctdb);
1983
1984         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1985         if (rd.pnn == -1) {
1986                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1987                 return -1;
1988         }
1989         rd.srvid = getpid();
1990
1991         /* register a message port for receiveing the reply so that we
1992            can receive the reply
1993         */
1994         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1995
1996         data.dptr = (uint8_t *)&rd;
1997         data.dsize = sizeof(rd);
1998
1999 again:
2000         if (retries>5) {
2001                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
2002                 exit(10);
2003         }
2004
2005         /* check that there are valid nodes available */
2006         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2007                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2008                 exit(10);
2009         }
2010         for (i=0; i<nodemap->num;i++) {
2011                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2012                         break;
2013                 }
2014         }
2015         if (i==nodemap->num) {
2016                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2017                 return 0;
2018         }
2019
2020
2021         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2022         if (ret != 0) {
2023                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2024                 return ret;
2025         }
2026
2027         /* verify the node exists */
2028         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2029                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2030                 exit(10);
2031         }
2032
2033
2034         /* check tha there are nodes available that can act as a recmaster */
2035         for (i=0; i<nodemap->num; i++) {
2036                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2037                         continue;
2038                 }
2039                 break;
2040         }
2041         if (i == nodemap->num) {
2042                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2043                 return 0;
2044         }
2045
2046         /* verify the recovery master is not STOPPED, nor BANNED */
2047         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2048                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2049                 retries++;
2050                 sleep(1);
2051                 goto again;
2052         } 
2053
2054         
2055         /* verify the recovery master is not STOPPED, nor BANNED */
2056         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2057                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2058                 retries++;
2059                 sleep(1);
2060                 goto again;
2061         } 
2062
2063         ipreallocate_finished = 0;
2064         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2065         if (ret != 0) {
2066                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2067                 return -1;
2068         }
2069
2070         tv = timeval_current();
2071         /* this loop will terminate when we have received the reply */
2072         while (timeval_elapsed(&tv) < 3.0) {    
2073                 event_loop_once(ctdb->ev);
2074         }
2075         if (ipreallocate_finished == 1) {
2076                 return 0;
2077         }
2078
2079         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2080         retries++;
2081         sleep(1);
2082         goto again;
2083
2084         return 0;
2085 }
2086
2087
2088 /*
2089   disable a remote node
2090  */
2091 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2092 {
2093         int ret;
2094         struct ctdb_node_map *nodemap=NULL;
2095
2096         /* check if the node is already disabled */
2097         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2098                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2099                 exit(10);
2100         }
2101         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2102                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2103                 return 0;
2104         }
2105
2106         do {
2107                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2108                 if (ret != 0) {
2109                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2110                         return ret;
2111                 }
2112
2113                 sleep(1);
2114
2115                 /* read the nodemap and verify the change took effect */
2116                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2117                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2118                         exit(10);
2119                 }
2120
2121         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2122         ret = control_ipreallocate(ctdb, argc, argv);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2125                 return ret;
2126         }
2127
2128         return 0;
2129 }
2130
2131 /*
2132   enable a disabled remote node
2133  */
2134 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2135 {
2136         int ret;
2137
2138         struct ctdb_node_map *nodemap=NULL;
2139
2140
2141         /* check if the node is already enabled */
2142         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2143                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2144                 exit(10);
2145         }
2146         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2147                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2148                 return 0;
2149         }
2150
2151         do {
2152                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2153                 if (ret != 0) {
2154                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2155                         return ret;
2156                 }
2157
2158                 sleep(1);
2159
2160                 /* read the nodemap and verify the change took effect */
2161                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2162                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2163                         exit(10);
2164                 }
2165
2166         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2167
2168         ret = control_ipreallocate(ctdb, argc, argv);
2169         if (ret != 0) {
2170                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2171                 return ret;
2172         }
2173
2174         return 0;
2175 }
2176
2177 /*
2178   stop a remote node
2179  */
2180 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2181 {
2182         int ret;
2183         struct ctdb_node_map *nodemap=NULL;
2184
2185         do {
2186                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2187                 if (ret != 0) {
2188                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2189                 }
2190         
2191                 sleep(1);
2192
2193                 /* read the nodemap and verify the change took effect */
2194                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2195                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2196                         exit(10);
2197                 }
2198
2199         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2200         ret = control_ipreallocate(ctdb, argc, argv);
2201         if (ret != 0) {
2202                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2203                 return ret;
2204         }
2205
2206         return 0;
2207 }
2208
2209 /*
2210   restart a stopped remote node
2211  */
2212 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2213 {
2214         int ret;
2215
2216         struct ctdb_node_map *nodemap=NULL;
2217
2218         do {
2219                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2220                 if (ret != 0) {
2221                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2222                         return ret;
2223                 }
2224         
2225                 sleep(1);
2226
2227                 /* read the nodemap and verify the change took effect */
2228                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2229                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2230                         exit(10);
2231                 }
2232
2233         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2234         ret = control_ipreallocate(ctdb, argc, argv);
2235         if (ret != 0) {
2236                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2237                 return ret;
2238         }
2239
2240         return 0;
2241 }
2242
2243 static uint32_t get_generation(struct ctdb_context *ctdb)
2244 {
2245         struct ctdb_vnn_map *vnnmap=NULL;
2246         int ret;
2247
2248         /* wait until the recmaster is not in recovery mode */
2249         while (1) {
2250                 uint32_t recmode, recmaster;
2251                 
2252                 if (vnnmap != NULL) {
2253                         talloc_free(vnnmap);
2254                         vnnmap = NULL;
2255                 }
2256
2257                 /* get the recmaster */
2258                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2259                 if (ret != 0) {
2260                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2261                         exit(10);
2262                 }
2263
2264                 /* get recovery mode */
2265                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2266                 if (ret != 0) {
2267                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2268                         exit(10);
2269                 }
2270
2271                 /* get the current generation number */
2272                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2273                 if (ret != 0) {
2274                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2275                         exit(10);
2276                 }
2277
2278                 if ((recmode == CTDB_RECOVERY_NORMAL)
2279                 &&  (vnnmap->generation != 1)){
2280                         return vnnmap->generation;
2281                 }
2282                 sleep(1);
2283         }
2284 }
2285
2286 /*
2287   ban a node from the cluster
2288  */
2289 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2290 {
2291         int ret;
2292         struct ctdb_node_map *nodemap=NULL;
2293         struct ctdb_ban_time bantime;
2294
2295         if (argc < 1) {
2296                 usage();
2297         }
2298         
2299         /* verify the node exists */
2300         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2301         if (ret != 0) {
2302                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2303                 return ret;
2304         }
2305
2306         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2307                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2308                 return -1;
2309         }
2310
2311         bantime.pnn  = options.pnn;
2312         bantime.time = strtoul(argv[0], NULL, 0);
2313
2314         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2315         if (ret != 0) {
2316                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2317                 return -1;
2318         }       
2319
2320         ret = control_ipreallocate(ctdb, argc, argv);
2321         if (ret != 0) {
2322                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2323                 return ret;
2324         }
2325
2326         return 0;
2327 }
2328
2329
2330 /*
2331   unban a node from the cluster
2332  */
2333 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2334 {
2335         int ret;
2336         struct ctdb_node_map *nodemap=NULL;
2337         struct ctdb_ban_time bantime;
2338
2339         /* verify the node exists */
2340         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2341         if (ret != 0) {
2342                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2343                 return ret;
2344         }
2345
2346         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2347                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2348                 return -1;
2349         }
2350
2351         bantime.pnn  = options.pnn;
2352         bantime.time = 0;
2353
2354         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2355         if (ret != 0) {
2356                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2357                 return -1;
2358         }       
2359
2360         ret = control_ipreallocate(ctdb, argc, argv);
2361         if (ret != 0) {
2362                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2363                 return ret;
2364         }
2365
2366         return 0;
2367 }
2368
2369
2370 /*
2371   show ban information for a node
2372  */
2373 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2374 {
2375         int ret;
2376         struct ctdb_node_map *nodemap=NULL;
2377         struct ctdb_ban_time *bantime;
2378
2379         /* verify the node exists */
2380         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2381         if (ret != 0) {
2382                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2383                 return ret;
2384         }
2385
2386         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2387         if (ret != 0) {
2388                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2389                 return -1;
2390         }       
2391
2392         if (bantime->time == 0) {
2393                 printf("Node %u is not banned\n", bantime->pnn);
2394         } else {
2395                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2396         }
2397
2398         return 0;
2399 }
2400
2401 /*
2402   shutdown a daemon
2403  */
2404 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2405 {
2406         int ret;
2407
2408         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2409         if (ret != 0) {
2410                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2411                 return ret;
2412         }
2413
2414         return 0;
2415 }
2416
2417 /*
2418   trigger a recovery
2419  */
2420 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2421 {
2422         int ret;
2423         uint32_t generation, next_generation;
2424
2425         /* record the current generation number */
2426         generation = get_generation(ctdb);
2427
2428         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2429         if (ret != 0) {
2430                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2431                 return ret;
2432         }
2433
2434         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2435         if (ret != 0) {
2436                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2437                 return ret;
2438         }
2439
2440         /* wait until we are in a new generation */
2441         while (1) {
2442                 next_generation = get_generation(ctdb);
2443                 if (next_generation != generation) {
2444                         return 0;
2445                 }
2446                 sleep(1);
2447         }
2448
2449         return 0;
2450 }
2451
2452
2453 /*
2454   display monitoring mode of a remote node
2455  */
2456 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2457 {
2458         uint32_t monmode;
2459         int ret;
2460
2461         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2462         if (ret != 0) {
2463                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2464                 return ret;
2465         }
2466         if (!options.machinereadable){
2467                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2468         } else {
2469                 printf(":mode:\n");
2470                 printf(":%d:\n",monmode);
2471         }
2472         return 0;
2473 }
2474
2475
2476 /*
2477   display capabilities of a remote node
2478  */
2479 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2480 {
2481         uint32_t capabilities;
2482         int ret;
2483
2484         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2485         if (ret != 0) {
2486                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2487                 return ret;
2488         }
2489         
2490         if (!options.machinereadable){
2491                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2492                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2493                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2494                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2495         } else {
2496                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2497                 printf(":%d:%d:%d:%d:\n",
2498                         !!(capabilities&CTDB_CAP_RECMASTER),
2499                         !!(capabilities&CTDB_CAP_LMASTER),
2500                         !!(capabilities&CTDB_CAP_LVS),
2501                         !!(capabilities&CTDB_CAP_NATGW));
2502         }
2503         return 0;
2504 }
2505
2506 /*
2507   display lvs configuration
2508  */
2509 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2510 {
2511         uint32_t *capabilities;
2512         struct ctdb_node_map *nodemap=NULL;
2513         int i, ret;
2514         int healthy_count = 0;
2515
2516         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2517         if (ret != 0) {
2518                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2519                 return ret;
2520         }
2521
2522         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2523         CTDB_NO_MEMORY(ctdb, capabilities);
2524         
2525         /* collect capabilities for all connected nodes */
2526         for (i=0; i<nodemap->num; i++) {
2527                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2528                         continue;
2529                 }
2530                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2531                         continue;
2532                 }
2533         
2534                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2535                 if (ret != 0) {
2536                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2537                         return ret;
2538                 }
2539
2540                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2541                         continue;
2542                 }
2543
2544                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2545                         healthy_count++;
2546                 }
2547         }
2548
2549         /* Print all LVS nodes */
2550         for (i=0; i<nodemap->num; i++) {
2551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2552                         continue;
2553                 }
2554                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2555                         continue;
2556                 }
2557                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2558                         continue;
2559                 }
2560
2561                 if (healthy_count != 0) {
2562                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2563                                 continue;
2564                         }
2565                 }
2566
2567                 printf("%d:%s\n", i, 
2568                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2569         }
2570
2571         return 0;
2572 }
2573
2574 /*
2575   display who is the lvs master
2576  */
2577 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2578 {
2579         uint32_t *capabilities;
2580         struct ctdb_node_map *nodemap=NULL;
2581         int i, ret;
2582         int healthy_count = 0;
2583
2584         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2585         if (ret != 0) {
2586                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2587                 return ret;
2588         }
2589
2590         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2591         CTDB_NO_MEMORY(ctdb, capabilities);
2592         
2593         /* collect capabilities for all connected nodes */
2594         for (i=0; i<nodemap->num; i++) {
2595                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2596                         continue;
2597                 }
2598                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2599                         continue;
2600                 }
2601         
2602                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2603                 if (ret != 0) {
2604                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2605                         return ret;
2606                 }
2607
2608                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2609                         continue;
2610                 }
2611
2612                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2613                         healthy_count++;
2614                 }
2615         }
2616
2617         /* find and show the lvsmaster */
2618         for (i=0; i<nodemap->num; i++) {
2619                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2620                         continue;
2621                 }
2622                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2623                         continue;
2624                 }
2625                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2626                         continue;
2627                 }
2628
2629                 if (healthy_count != 0) {
2630                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2631                                 continue;
2632                         }
2633                 }
2634
2635                 if (options.machinereadable){
2636                         printf("%d\n", i);
2637                 } else {
2638                         printf("Node %d is LVS master\n", i);
2639                 }
2640                 return 0;
2641         }
2642
2643         printf("There is no LVS master\n");
2644         return -1;
2645 }
2646
2647 /*
2648   disable monitoring on a  node
2649  */
2650 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2651 {
2652         
2653         int ret;
2654
2655         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2656         if (ret != 0) {
2657                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2658                 return ret;
2659         }
2660         printf("Monitoring mode:%s\n","DISABLED");
2661
2662         return 0;
2663 }
2664
2665 /*
2666   enable monitoring on a  node
2667  */
2668 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2669 {
2670         
2671         int ret;
2672
2673         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2674         if (ret != 0) {
2675                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2676                 return ret;
2677         }
2678         printf("Monitoring mode:%s\n","ACTIVE");
2679
2680         return 0;
2681 }
2682
2683 /*
2684   display remote list of keys/data for a db
2685  */
2686 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2687 {
2688         const char *db_name;
2689         struct ctdb_db_context *ctdb_db;
2690         int ret;
2691
2692         if (argc < 1) {
2693                 usage();
2694         }
2695
2696         db_name = argv[0];
2697
2698
2699         if (db_exists(ctdb, db_name)) {
2700                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2701                 return -1;
2702         }
2703
2704         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2705
2706         if (ctdb_db == NULL) {
2707                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2708                 return -1;
2709         }
2710
2711         /* traverse and dump the cluster tdb */
2712         ret = ctdb_dump_db(ctdb_db, stdout);
2713         if (ret == -1) {
2714                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2715                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2716                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2717                                   db_name));
2718                 return -1;
2719         }
2720         talloc_free(ctdb_db);
2721
2722         printf("Dumped %d records\n", ret);
2723         return 0;
2724 }
2725
2726
2727 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2728                              TDB_DATA data, void *private_data)
2729 {
2730         DEBUG(DEBUG_ERR,("Log data received\n"));
2731         if (data.dsize > 0) {
2732                 printf("%s", data.dptr);
2733         }
2734
2735         exit(0);
2736 }
2737
2738 /*
2739   display a list of log messages from the in memory ringbuffer
2740  */
2741 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2742 {
2743         int ret;
2744         int32_t res;
2745         struct ctdb_get_log_addr log_addr;
2746         TDB_DATA data;
2747         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2748         char *errmsg;
2749         struct timeval tv;
2750
2751         if (argc != 1) {
2752                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2753                 talloc_free(tmp_ctx);
2754                 return -1;
2755         }
2756
2757         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2758         log_addr.srvid = getpid();
2759         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2760                 log_addr.level = get_debug_by_desc(argv[0]);
2761         } else {
2762                 log_addr.level = strtol(argv[0], NULL, 0);
2763         }
2764
2765
2766         data.dptr = (unsigned char *)&log_addr;
2767         data.dsize = sizeof(log_addr);
2768
2769         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2770
2771         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2772         sleep(1);
2773
2774         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2775
2776         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2777                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2778         if (ret != 0 || res != 0) {
2779                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2780                 talloc_free(tmp_ctx);
2781                 return -1;
2782         }
2783
2784
2785         tv = timeval_current();
2786         /* this loop will terminate when we have received the reply */
2787         while (timeval_elapsed(&tv) < 3.0) {    
2788                 event_loop_once(ctdb->ev);
2789         }
2790
2791         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2792
2793         talloc_free(tmp_ctx);
2794         return 0;
2795 }
2796
2797 /*
2798   clear the in memory log area
2799  */
2800 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2801 {
2802         int ret;
2803         int32_t res;
2804         char *errmsg;
2805         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2806
2807         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2808                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2809         if (ret != 0 || res != 0) {
2810                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2811                 talloc_free(tmp_ctx);
2812                 return -1;
2813         }
2814
2815         talloc_free(tmp_ctx);
2816         return 0;
2817 }
2818
2819
2820
2821 /*
2822   display a list of the databases on a remote ctdb
2823  */
2824 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2825 {
2826         int i, ret;
2827         struct ctdb_dbid_map *dbmap=NULL;
2828
2829         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2830         if (ret != 0) {
2831                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2832                 return ret;
2833         }
2834
2835         if(options.machinereadable){
2836                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2837                 for(i=0;i<dbmap->num;i++){
2838                         const char *path;
2839                         const char *name;
2840                         const char *health;
2841                         bool persistent;
2842
2843                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2844                                             dbmap->dbs[i].dbid, ctdb, &path);
2845                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2846                                             dbmap->dbs[i].dbid, ctdb, &name);
2847                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2848                                               dbmap->dbs[i].dbid, ctdb, &health);
2849                         persistent = dbmap->dbs[i].persistent;
2850                         printf(":0x%08X:%s:%s:%d:%d:\n",
2851                                dbmap->dbs[i].dbid, name, path,
2852                                !!(persistent), !!(health));
2853                 }
2854                 return 0;
2855         }
2856
2857         printf("Number of databases:%d\n", dbmap->num);
2858         for(i=0;i<dbmap->num;i++){
2859                 const char *path;
2860                 const char *name;
2861                 const char *health;
2862                 bool persistent;
2863
2864                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2865                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2866                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2867                 persistent = dbmap->dbs[i].persistent;
2868                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2869                        dbmap->dbs[i].dbid, name, path,
2870                        persistent?" PERSISTENT":"",
2871                        health?" UNHEALTHY":"");
2872         }
2873
2874         return 0;
2875 }
2876
2877 /*
2878   display the status of a database on a remote ctdb
2879  */
2880 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2881 {
2882         int i, ret;
2883         struct ctdb_dbid_map *dbmap=NULL;
2884         const char *db_name;
2885
2886         if (argc < 1) {
2887                 usage();
2888         }
2889
2890         db_name = argv[0];
2891
2892         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2893         if (ret != 0) {
2894                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2895                 return ret;
2896         }
2897
2898         for(i=0;i<dbmap->num;i++){
2899                 const char *path;
2900                 const char *name;
2901                 const char *health;
2902                 bool persistent;
2903
2904                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2905                 if (strcmp(name, db_name) != 0) {
2906                         continue;
2907                 }
2908
2909                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2910                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2911                 persistent = dbmap->dbs[i].persistent;
2912                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2913                        dbmap->dbs[i].dbid, name, path,
2914                        persistent?"yes":"no",
2915                        health?health:"OK");
2916                 return 0;
2917         }
2918
2919         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2920         return 0;
2921 }
2922
2923 /*
2924   check if the local node is recmaster or not
2925   it will return 1 if this node is the recmaster and 0 if it is not
2926   or if the local ctdb daemon could not be contacted
2927  */
2928 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2929 {
2930         uint32_t mypnn, recmaster;
2931         int ret;
2932
2933         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2934         if (mypnn == -1) {
2935                 printf("Failed to get pnn of node\n");
2936                 return 1;
2937         }
2938
2939         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2940         if (ret != 0) {
2941                 printf("Failed to get the recmaster\n");
2942                 return 1;
2943         }
2944
2945         if (recmaster != mypnn) {
2946                 printf("this node is not the recmaster\n");
2947                 return 1;
2948         }
2949
2950         printf("this node is the recmaster\n");
2951         return 0;
2952 }
2953
2954 /*
2955   ping a node
2956  */
2957 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2958 {
2959         int ret;
2960         struct timeval tv = timeval_current();
2961         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2962         if (ret == -1) {
2963                 printf("Unable to get ping response from node %u\n", options.pnn);
2964                 return -1;
2965         } else {
2966                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2967                        options.pnn, timeval_elapsed(&tv), ret);
2968         }
2969         return 0;
2970 }
2971
2972
2973 /*
2974   get a tunable
2975  */
2976 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2977 {
2978         const char *name;
2979         uint32_t value;
2980         int ret;
2981
2982         if (argc < 1) {
2983                 usage();
2984         }
2985
2986         name = argv[0];
2987         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2988         if (ret == -1) {
2989                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2990                 return -1;
2991         }
2992
2993         printf("%-19s = %u\n", name, value);
2994         return 0;
2995 }
2996
2997 /*
2998   set a tunable
2999  */
3000 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3001 {
3002         const char *name;
3003         uint32_t value;
3004         int ret;
3005
3006         if (argc < 2) {
3007                 usage();
3008         }
3009
3010         name = argv[0];
3011         value = strtoul(argv[1], NULL, 0);
3012
3013         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3014         if (ret == -1) {
3015                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3016                 return -1;
3017         }
3018         return 0;
3019 }
3020
3021 /*
3022   list all tunables
3023  */
3024 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3025 {
3026         uint32_t count;
3027         const char **list;
3028         int ret, i;
3029
3030         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3031         if (ret == -1) {
3032                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3033                 return -1;
3034         }
3035
3036         for (i=0;i<count;i++) {
3037                 control_getvar(ctdb, 1, &list[i]);
3038         }
3039
3040         talloc_free(list);
3041         
3042         return 0;
3043 }
3044
3045 /*
3046   display debug level on a node
3047  */
3048 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3049 {
3050         int ret;
3051         int32_t level;
3052
3053         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3054         if (ret != 0) {
3055                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3056                 return ret;
3057         } else {
3058                 if (options.machinereadable){
3059                         printf(":Name:Level:\n");
3060                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3061                 } else {
3062                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3063                 }
3064         }
3065         return 0;
3066 }
3067
3068 /*
3069   display reclock file of a node
3070  */
3071 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3072 {
3073         int ret;
3074         const char *reclock;
3075
3076         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3077         if (ret != 0) {
3078                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3079                 return ret;
3080         } else {
3081                 if (options.machinereadable){
3082                         if (reclock != NULL) {
3083                                 printf("%s", reclock);
3084                         }
3085                 } else {
3086                         if (reclock == NULL) {
3087                                 printf("No reclock file used.\n");
3088                         } else {
3089                                 printf("Reclock file:%s\n", reclock);
3090                         }
3091                 }
3092         }
3093         return 0;
3094 }
3095
3096 /*
3097   set the reclock file of a node
3098  */
3099 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3100 {
3101         int ret;
3102         const char *reclock;
3103
3104         if (argc == 0) {
3105                 reclock = NULL;
3106         } else if (argc == 1) {
3107                 reclock = argv[0];
3108         } else {
3109                 usage();
3110         }
3111
3112         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3113         if (ret != 0) {
3114                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3115                 return ret;
3116         }
3117         return 0;
3118 }
3119
3120 /*
3121   set the natgw state on/off
3122  */
3123 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3124 {
3125         int ret;
3126         uint32_t natgwstate;
3127
3128         if (argc == 0) {
3129                 usage();
3130         }
3131
3132         if (!strcmp(argv[0], "on")) {
3133                 natgwstate = 1;
3134         } else if (!strcmp(argv[0], "off")) {
3135                 natgwstate = 0;
3136         } else {
3137                 usage();
3138         }
3139
3140         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3141         if (ret != 0) {
3142                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3143                 return ret;
3144         }
3145
3146         return 0;
3147 }
3148
3149 /*
3150   set the lmaster role on/off
3151  */
3152 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3153 {
3154         int ret;
3155         uint32_t lmasterrole;
3156
3157         if (argc == 0) {
3158                 usage();
3159         }
3160
3161         if (!strcmp(argv[0], "on")) {
3162                 lmasterrole = 1;
3163         } else if (!strcmp(argv[0], "off")) {
3164                 lmasterrole = 0;
3165         } else {
3166                 usage();
3167         }
3168
3169         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3170         if (ret != 0) {
3171                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3172                 return ret;
3173         }
3174
3175         return 0;
3176 }
3177
3178 /*
3179   set the recmaster role on/off
3180  */
3181 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3182 {
3183         int ret;
3184         uint32_t recmasterrole;
3185
3186         if (argc == 0) {
3187                 usage();
3188         }
3189
3190         if (!strcmp(argv[0], "on")) {
3191                 recmasterrole = 1;
3192         } else if (!strcmp(argv[0], "off")) {
3193                 recmasterrole = 0;
3194         } else {
3195                 usage();
3196         }
3197
3198         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3199         if (ret != 0) {
3200                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3201                 return ret;
3202         }
3203
3204         return 0;
3205 }
3206
3207 /*
3208   set debug level on a node or all nodes
3209  */
3210 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3211 {
3212         int i, ret;
3213         int32_t level;
3214
3215         if (argc == 0) {
3216                 printf("You must specify the debug level. Valid levels are:\n");
3217                 for (i=0; debug_levels[i].description != NULL; i++) {
3218                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3219                 }
3220
3221                 return 0;
3222         }
3223
3224         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3225                 level = get_debug_by_desc(argv[0]);
3226         } else {
3227                 level = strtol(argv[0], NULL, 0);
3228         }
3229
3230         for (i=0; debug_levels[i].description != NULL; i++) {
3231                 if (level == debug_levels[i].level) {
3232                         break;
3233                 }
3234         }
3235         if (debug_levels[i].description == NULL) {
3236                 printf("Invalid debug level, must be one of\n");
3237                 for (i=0; debug_levels[i].description != NULL; i++) {
3238                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3239                 }
3240                 return -1;
3241         }
3242
3243         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3244         if (ret != 0) {
3245                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3246         }
3247         return 0;
3248 }
3249
3250
3251 /*
3252   freeze a node
3253  */
3254 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3255 {
3256         int ret;
3257         uint32_t priority;
3258         
3259         if (argc == 1) {
3260                 priority = strtol(argv[0], NULL, 0);
3261         } else {
3262                 priority = 0;
3263         }
3264         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3265
3266         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3267         if (ret != 0) {
3268                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3269         }               
3270         return 0;
3271 }
3272
3273 /*
3274   thaw a node
3275  */
3276 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3277 {
3278         int ret;
3279         uint32_t priority;
3280         
3281         if (argc == 1) {
3282                 priority = strtol(argv[0], NULL, 0);
3283         } else {
3284                 priority = 0;
3285         }
3286         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3287
3288         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3289         if (ret != 0) {
3290                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3291         }               
3292         return 0;
3293 }
3294
3295
3296 /*
3297   attach to a database
3298  */
3299 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3300 {
3301         const char *db_name;
3302         struct ctdb_db_context *ctdb_db;
3303
3304         if (argc < 1) {
3305                 usage();
3306         }
3307         db_name = argv[0];
3308
3309         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3310         if (ctdb_db == NULL) {
3311                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3312                 return -1;
3313         }
3314
3315         return 0;
3316 }
3317
3318 /*
3319   set db priority
3320  */
3321 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3322 {
3323         struct ctdb_db_priority db_prio;
3324         int ret;
3325
3326         if (argc < 2) {
3327                 usage();
3328         }
3329
3330         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3331         db_prio.priority = strtoul(argv[1], NULL, 0);
3332
3333         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3336                 return -1;
3337         }
3338
3339         return 0;
3340 }
3341
3342 /*
3343   get db priority
3344  */
3345 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3346 {
3347         uint32_t db_id, priority;
3348         int ret;
3349
3350         if (argc < 1) {
3351                 usage();
3352         }
3353
3354         db_id = strtoul(argv[0], NULL, 0);
3355
3356         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3357         if (ret != 0) {
3358                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3359                 return -1;
3360         }
3361
3362         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3363
3364         return 0;
3365 }
3366
3367 /*
3368   run an eventscript on a node
3369  */
3370 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3371 {
3372         TDB_DATA data;
3373         int ret;
3374         int32_t res;
3375         char *errmsg;
3376         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3377
3378         if (argc != 1) {
3379                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3380                 return -1;
3381         }
3382
3383         data.dptr = (unsigned char *)discard_const(argv[0]);
3384         data.dsize = strlen((char *)data.dptr) + 1;
3385
3386         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3387
3388         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3389                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3390         if (ret != 0 || res != 0) {
3391                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3392                 talloc_free(tmp_ctx);
3393                 return -1;
3394         }
3395         talloc_free(tmp_ctx);
3396         return 0;
3397 }
3398
3399 #define DB_VERSION 1
3400 #define MAX_DB_NAME 64
3401 struct db_file_header {
3402         unsigned long version;
3403         time_t timestamp;
3404         unsigned long persistent;
3405         unsigned long size;
3406         const char name[MAX_DB_NAME];
3407 };
3408
3409 struct backup_data {
3410         struct ctdb_marshall_buffer *records;
3411         uint32_t len;
3412         uint32_t total;
3413         bool traverse_error;
3414 };
3415
3416 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3417 {
3418         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3419         struct ctdb_rec_data *rec;
3420
3421         /* add the record */
3422         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3423         if (rec == NULL) {
3424                 bd->traverse_error = true;
3425                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3426                 return -1;
3427         }
3428         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3429         if (bd->records == NULL) {
3430                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3431                 bd->traverse_error = true;
3432                 return -1;
3433         }
3434         bd->records->count++;
3435         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3436         bd->len += rec->length;
3437         talloc_free(rec);
3438
3439         bd->total++;
3440         return 0;
3441 }
3442
3443 /*
3444  * backup a database to a file 
3445  */
3446 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3447 {
3448         int i, ret;
3449         struct ctdb_dbid_map *dbmap=NULL;
3450         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3451         struct db_file_header dbhdr;
3452         struct ctdb_db_context *ctdb_db;
3453         struct backup_data *bd;
3454         int fh = -1;
3455         int status = -1;
3456         const char *reason = NULL;
3457
3458         if (argc != 2) {
3459                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3460                 return -1;
3461         }
3462
3463         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3464         if (ret != 0) {
3465                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3466                 return ret;
3467         }
3468
3469         for(i=0;i<dbmap->num;i++){
3470                 const char *name;
3471
3472                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3473                 if(!strcmp(argv[0], name)){
3474                         talloc_free(discard_const(name));
3475                         break;
3476                 }
3477                 talloc_free(discard_const(name));
3478         }
3479         if (i == dbmap->num) {
3480                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3481                 talloc_free(tmp_ctx);
3482                 return -1;
3483         }
3484
3485         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3486                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3487         if (ret != 0) {
3488                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3489                                  argv[0]));
3490                 talloc_free(tmp_ctx);
3491                 return -1;
3492         }
3493         if (reason) {
3494                 uint32_t allow_unhealthy = 0;
3495
3496                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3497                                       "AllowUnhealthyDBRead",
3498                                       &allow_unhealthy);
3499
3500                 if (allow_unhealthy != 1) {
3501                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3502                                          argv[0], reason));
3503
3504                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3505                                          allow_unhealthy));
3506                         talloc_free(tmp_ctx);
3507                         return -1;
3508                 }
3509
3510                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3511                                      argv[0], argv[0]));
3512                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3513                                      "tunnable AllowUnhealthyDBRead = %u\n",
3514                                      allow_unhealthy));
3515         }
3516
3517         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3518         if (ctdb_db == NULL) {
3519                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524
3525         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3526         if (ret == -1) {
3527                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3528                 talloc_free(tmp_ctx);
3529                 return -1;
3530         }
3531
3532
3533         bd = talloc_zero(tmp_ctx, struct backup_data);
3534         if (bd == NULL) {
3535                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3536                 talloc_free(tmp_ctx);
3537                 return -1;
3538         }
3539
3540         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3541         if (bd->records == NULL) {
3542                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3543                 talloc_free(tmp_ctx);
3544                 return -1;
3545         }
3546
3547         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3548         bd->records->db_id = ctdb_db->db_id;
3549         /* traverse the database collecting all records */
3550         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3551             bd->traverse_error) {
3552                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3553                 talloc_free(tmp_ctx);
3554                 return -1;              
3555         }
3556
3557         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3558
3559
3560         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3561         if (fh == -1) {
3562                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3563                 talloc_free(tmp_ctx);
3564                 return -1;
3565         }
3566
3567         dbhdr.version = DB_VERSION;
3568         dbhdr.timestamp = time(NULL);
3569         dbhdr.persistent = dbmap->dbs[i].persistent;
3570         dbhdr.size = bd->len;
3571         if (strlen(argv[0]) >= MAX_DB_NAME) {
3572                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3573                 goto done;
3574         }
3575         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3576         ret = write(fh, &dbhdr, sizeof(dbhdr));
3577         if (ret == -1) {
3578                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3579                 goto done;
3580         }
3581         ret = write(fh, bd->records, bd->len);
3582         if (ret == -1) {
3583                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3584                 goto done;
3585         }
3586
3587         status = 0;
3588 done:
3589         if (fh != -1) {
3590                 ret = close(fh);
3591                 if (ret == -1) {
3592                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3593                 }
3594         }
3595         talloc_free(tmp_ctx);
3596         return status;
3597 }
3598
3599 /*
3600  * restore a database from a file 
3601  */
3602 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3603 {
3604         int ret;
3605         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3606         TDB_DATA outdata;
3607         TDB_DATA data;
3608         struct db_file_header dbhdr;
3609         struct ctdb_db_context *ctdb_db;
3610         struct ctdb_node_map *nodemap=NULL;
3611         struct ctdb_vnn_map *vnnmap=NULL;
3612         int i, fh;
3613         struct ctdb_control_wipe_database w;
3614         uint32_t *nodes;
3615         uint32_t generation;
3616         struct tm *tm;
3617         char tbuf[100];
3618
3619         if (argc != 1) {
3620                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3621                 return -1;
3622         }
3623
3624         fh = open(argv[0], O_RDONLY);
3625         if (fh == -1) {
3626                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3627                 talloc_free(tmp_ctx);
3628                 return -1;
3629         }
3630
3631         read(fh, &dbhdr, sizeof(dbhdr));
3632         if (dbhdr.version != DB_VERSION) {
3633                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3634                 talloc_free(tmp_ctx);
3635                 return -1;
3636         }
3637
3638         outdata.dsize = dbhdr.size;
3639         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3640         if (outdata.dptr == NULL) {
3641                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3642                 close(fh);
3643                 talloc_free(tmp_ctx);
3644                 return -1;
3645         }               
3646         read(fh, outdata.dptr, outdata.dsize);
3647         close(fh);
3648
3649         tm = localtime(&dbhdr.timestamp);
3650         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3651         printf("Restoring database '%s' from backup @ %s\n",
3652                 dbhdr.name, tbuf);
3653
3654
3655         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3656         if (ctdb_db == NULL) {
3657                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3658                 talloc_free(tmp_ctx);
3659                 return -1;
3660         }
3661
3662         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3663         if (ret != 0) {
3664                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3665                 talloc_free(tmp_ctx);
3666                 return ret;
3667         }
3668
3669
3670         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3671         if (ret != 0) {
3672                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3673                 talloc_free(tmp_ctx);
3674                 return ret;
3675         }
3676
3677         /* freeze all nodes */
3678         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3679         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3680                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3681                                         nodes, i,
3682                                         TIMELIMIT(),
3683                                         false, tdb_null,
3684                                         NULL, NULL,
3685                                         NULL) != 0) {
3686                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3687                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3688                         talloc_free(tmp_ctx);
3689                         return -1;
3690                 }
3691         }
3692
3693         generation = vnnmap->generation;
3694         data.dptr = (void *)&generation;
3695         data.dsize = sizeof(generation);
3696
3697         /* start a cluster wide transaction */
3698         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3699         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3700                                         nodes, 0,
3701                                         TIMELIMIT(), false, data,
3702                                         NULL, NULL,
3703                                         NULL) != 0) {
3704                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3705                 return -1;
3706         }
3707
3708
3709         w.db_id = ctdb_db->db_id;
3710         w.transaction_id = generation;
3711
3712         data.dptr = (void *)&w;
3713         data.dsize = sizeof(w);
3714
3715         /* wipe all the remote databases. */
3716         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3717         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3718                                         nodes, 0,
3719                                         TIMELIMIT(), false, data,
3720                                         NULL, NULL,
3721                                         NULL) != 0) {
3722                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3723                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3724                 talloc_free(tmp_ctx);
3725                 return -1;
3726         }
3727         
3728         /* push the database */
3729         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3730         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3731                                         nodes, 0,
3732                                         TIMELIMIT(), false, outdata,
3733                                         NULL, NULL,
3734                                         NULL) != 0) {
3735                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3736                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3737                 talloc_free(tmp_ctx);
3738                 return -1;
3739         }
3740
3741         data.dptr = (void *)&ctdb_db->db_id;
3742         data.dsize = sizeof(ctdb_db->db_id);
3743
3744         /* mark the database as healthy */
3745         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3746         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3747                                         nodes, 0,
3748                                         TIMELIMIT(), false, data,
3749                                         NULL, NULL,
3750                                         NULL) != 0) {
3751                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3752                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3753                 talloc_free(tmp_ctx);
3754                 return -1;
3755         }
3756
3757         data.dptr = (void *)&generation;
3758         data.dsize = sizeof(generation);
3759
3760         /* commit all the changes */
3761         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3762                                         nodes, 0,
3763                                         TIMELIMIT(), false, data,
3764                                         NULL, NULL,
3765                                         NULL) != 0) {
3766                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3767                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3768                 talloc_free(tmp_ctx);
3769                 return -1;
3770         }
3771
3772
3773         /* thaw all nodes */
3774         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3775         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3776                                         nodes, 0,
3777                                         TIMELIMIT(),
3778                                         false, tdb_null,
3779                                         NULL, NULL,
3780                                         NULL) != 0) {
3781                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3782                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3783                 talloc_free(tmp_ctx);
3784                 return -1;
3785         }
3786
3787
3788         talloc_free(tmp_ctx);
3789         return 0;
3790 }
3791
3792 /*
3793  * dump a database backup from a file
3794  */
3795 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3796 {
3797         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3798         TDB_DATA outdata;
3799         struct db_file_header dbhdr;
3800         int i, fh;
3801         struct tm *tm;
3802         char tbuf[100];
3803         struct ctdb_rec_data *rec = NULL;
3804         struct ctdb_marshall_buffer *m;
3805
3806         if (argc != 1) {
3807                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3808                 return -1;
3809         }
3810
3811         fh = open(argv[0], O_RDONLY);
3812         if (fh == -1) {
3813                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3814                 talloc_free(tmp_ctx);
3815                 return -1;
3816         }
3817
3818         read(fh, &dbhdr, sizeof(dbhdr));
3819         if (dbhdr.version != DB_VERSION) {
3820                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3821                 talloc_free(tmp_ctx);
3822                 return -1;
3823         }
3824
3825         outdata.dsize = dbhdr.size;
3826         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3827         if (outdata.dptr == NULL) {
3828                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3829                 close(fh);
3830                 talloc_free(tmp_ctx);
3831                 return -1;
3832         }
3833         read(fh, outdata.dptr, outdata.dsize);
3834         close(fh);
3835         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3836
3837         tm = localtime(&dbhdr.timestamp);
3838         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3839         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3840                 dbhdr.name, m->db_id, tbuf);
3841
3842         for (i=0; i < m->count; i++) {
3843                 uint32_t reqid = 0;
3844                 TDB_DATA key, data;
3845
3846                 /* we do not want the header splitted, so we pass NULL*/
3847                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3848                                               NULL, &key, &data);
3849
3850                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3851         }
3852
3853         printf("Dumped %d records\n", i);
3854         talloc_free(tmp_ctx);
3855         return 0;
3856 }
3857
3858 /*
3859  * wipe a database from a file
3860  */
3861 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3862                           const char **argv)
3863 {
3864         int ret;
3865         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3866         TDB_DATA data;
3867         struct ctdb_db_context *ctdb_db;
3868         struct ctdb_node_map *nodemap = NULL;
3869         struct ctdb_vnn_map *vnnmap = NULL;
3870         int i;
3871         struct ctdb_control_wipe_database w;
3872         uint32_t *nodes;
3873         uint32_t generation;
3874         struct ctdb_dbid_map *dbmap = NULL;
3875
3876         if (argc != 1) {
3877                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3878                 return -1;
3879         }
3880
3881         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3882                                  &dbmap);
3883         if (ret != 0) {
3884                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3885                                   options.pnn));
3886                 return ret;
3887         }
3888
3889         for(i=0;i<dbmap->num;i++){
3890                 const char *name;
3891
3892                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3893                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3894                 if(!strcmp(argv[0], name)){
3895                         talloc_free(discard_const(name));
3896                         break;
3897                 }
3898                 talloc_free(discard_const(name));
3899         }
3900         if (i == dbmap->num) {
3901                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3902                                   argv[0]));
3903                 talloc_free(tmp_ctx);
3904                 return -1;
3905         }
3906
3907         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3908         if (ctdb_db == NULL) {
3909                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3910                                   argv[0]));
3911                 talloc_free(tmp_ctx);
3912                 return -1;
3913         }
3914
3915         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3916                                    &nodemap);
3917         if (ret != 0) {
3918                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3919                                   options.pnn));
3920                 talloc_free(tmp_ctx);
3921                 return ret;
3922         }
3923
3924         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3925                                   &vnnmap);
3926         if (ret != 0) {
3927                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3928                                   options.pnn));
3929                 talloc_free(tmp_ctx);
3930                 return ret;
3931         }
3932
3933         /* freeze all nodes */
3934         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3935         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3936                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3937                                                 nodes, i,
3938                                                 TIMELIMIT(),
3939                                                 false, tdb_null,
3940                                                 NULL, NULL,
3941                                                 NULL);
3942                 if (ret != 0) {
3943                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3944                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3945                                              CTDB_RECOVERY_ACTIVE);
3946                         talloc_free(tmp_ctx);
3947                         return -1;
3948                 }
3949         }
3950
3951         generation = vnnmap->generation;
3952         data.dptr = (void *)&generation;
3953         data.dsize = sizeof(generation);
3954
3955         /* start a cluster wide transaction */
3956         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3957         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3958                                         nodes, 0,
3959                                         TIMELIMIT(), false, data,
3960                                         NULL, NULL,
3961                                         NULL);
3962         if (ret!= 0) {
3963                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3964                                   "transactions.\n"));
3965                 return -1;
3966         }
3967
3968         w.db_id = ctdb_db->db_id;
3969         w.transaction_id = generation;
3970
3971         data.dptr = (void *)&w;
3972         data.dsize = sizeof(w);
3973
3974         /* wipe all the remote databases. */
3975         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3976         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3977                                         nodes, 0,
3978                                         TIMELIMIT(), false, data,
3979                                         NULL, NULL,
3980                                         NULL) != 0) {
3981                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3982                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3983                 talloc_free(tmp_ctx);
3984                 return -1;
3985         }
3986
3987         data.dptr = (void *)&ctdb_db->db_id;
3988         data.dsize = sizeof(ctdb_db->db_id);
3989
3990         /* mark the database as healthy */
3991         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3992         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3993                                         nodes, 0,
3994                                         TIMELIMIT(), false, data,
3995                                         NULL, NULL,
3996                                         NULL) != 0) {
3997                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3998                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3999                 talloc_free(tmp_ctx);
4000                 return -1;
4001         }
4002
4003         data.dptr = (void *)&generation;
4004         data.dsize = sizeof(generation);
4005
4006         /* commit all the changes */
4007         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4008                                         nodes, 0,
4009                                         TIMELIMIT(), false, data,
4010                                         NULL, NULL,
4011                                         NULL) != 0) {
4012                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4013                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4014                 talloc_free(tmp_ctx);
4015                 return -1;
4016         }
4017
4018         /* thaw all nodes */
4019         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4020         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4021                                         nodes, 0,
4022                                         TIMELIMIT(),
4023                                         false, tdb_null,
4024                                         NULL, NULL,
4025                                         NULL) != 0) {
4026                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4027                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4028                 talloc_free(tmp_ctx);
4029                 return -1;
4030         }
4031
4032         talloc_free(tmp_ctx);
4033         return 0;
4034 }
4035
4036 /*
4037  * set flags of a node in the nodemap
4038  */
4039 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4040 {
4041         int ret;
4042         int32_t status;
4043         int node;
4044         int flags;
4045         TDB_DATA data;
4046         struct ctdb_node_flag_change c;
4047
4048         if (argc != 2) {
4049                 usage();
4050                 return -1;
4051         }
4052
4053         if (sscanf(argv[0], "%d", &node) != 1) {
4054                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4055                 usage();
4056                 return -1;
4057         }
4058         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4059                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4060                 usage();
4061                 return -1;
4062         }
4063
4064         c.pnn       = node;
4065         c.old_flags = 0;
4066         c.new_flags = flags;
4067
4068         data.dsize = sizeof(c);
4069         data.dptr = (unsigned char *)&c;
4070
4071         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4072                            data, NULL, NULL, &status, NULL, NULL);
4073         if (ret != 0 || status != 0) {
4074                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4075                 return -1;
4076         }
4077         return 0;
4078 }
4079
4080 /*
4081   dump memory usage
4082  */
4083 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4084 {
4085         TDB_DATA data;
4086         int ret;
4087         int32_t res;
4088         char *errmsg;
4089         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4090         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4091                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4092         if (ret != 0 || res != 0) {
4093                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4094                 talloc_free(tmp_ctx);
4095                 return -1;
4096         }
4097         write(1, data.dptr, data.dsize);
4098         talloc_free(tmp_ctx);
4099         return 0;
4100 }
4101
4102 /*
4103   handler for memory dumps
4104 */
4105 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4106                              TDB_DATA data, void *private_data)
4107 {
4108         write(1, data.dptr, data.dsize);
4109         exit(0);
4110 }
4111
4112 /*
4113   dump memory usage on the recovery daemon
4114  */
4115 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4116 {
4117         int ret;
4118         TDB_DATA data;
4119         struct rd_memdump_reply rd;
4120
4121         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4122         if (rd.pnn == -1) {
4123                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4124                 return -1;
4125         }
4126         rd.srvid = getpid();
4127
4128         /* register a message port for receiveing the reply so that we
4129            can receive the reply
4130         */
4131         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4132
4133
4134         data.dptr = (uint8_t *)&rd;
4135         data.dsize = sizeof(rd);
4136
4137         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4138         if (ret != 0) {
4139                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4140                 return -1;
4141         }
4142
4143         /* this loop will terminate when we have received the reply */
4144         while (1) {     
4145                 event_loop_once(ctdb->ev);
4146         }
4147
4148         return 0;
4149 }
4150
4151 /*
4152   send a message to a srvid
4153  */
4154 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4155 {
4156         unsigned long srvid;
4157         int ret;
4158         TDB_DATA data;
4159
4160         if (argc < 2) {
4161                 usage();
4162         }
4163
4164         srvid      = strtoul(argv[0], NULL, 0);
4165
4166         data.dptr = (uint8_t *)discard_const(argv[1]);
4167         data.dsize= strlen(argv[1]);
4168
4169         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4170         if (ret != 0) {
4171                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4172                 return -1;
4173         }
4174
4175         return 0;
4176 }
4177
4178 /*
4179   handler for msglisten
4180 */
4181 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4182                              TDB_DATA data, void *private_data)
4183 {
4184         int i;
4185
4186         printf("Message received: ");
4187         for (i=0;i<data.dsize;i++) {
4188                 printf("%c", data.dptr[i]);
4189         }
4190         printf("\n");
4191 }
4192
4193 /*
4194   listen for messages on a messageport
4195  */
4196 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4197 {
4198         uint64_t srvid;
4199
4200         srvid = getpid();
4201
4202         /* register a message port and listen for messages
4203         */
4204         ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4205         printf("Listening for messages on srvid:%d\n", (int)srvid);
4206
4207         while (1) {     
4208                 event_loop_once(ctdb->ev);
4209         }
4210
4211         return 0;
4212 }
4213
4214 /*
4215   list all nodes in the cluster
4216   if the daemon is running, we read the data from the daemon.
4217   if the daemon is not running we parse the nodes file directly
4218  */
4219 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4220 {
4221         int i, ret;
4222         struct ctdb_node_map *nodemap=NULL;
4223
4224         if (ctdb != NULL) {
4225                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4226                 if (ret != 0) {
4227                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4228                         return ret;
4229                 }
4230
4231                 for(i=0;i<nodemap->num;i++){
4232                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4233                                 continue;
4234                         }
4235                         if (options.machinereadable){
4236                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4237                         } else {
4238                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4239                         }
4240                 }
4241         } else {
4242                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4243                 struct pnn_node *pnn_nodes;
4244                 struct pnn_node *pnn_node;
4245         
4246                 pnn_nodes = read_nodes_file(mem_ctx);
4247                 if (pnn_nodes == NULL) {
4248                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4249                         talloc_free(mem_ctx);
4250                         return -1;
4251                 }
4252
4253                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4254                         ctdb_sock_addr addr;
4255
4256                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4257                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4258                                 talloc_free(mem_ctx);
4259                                 return -1;
4260                         }
4261
4262                         if (options.machinereadable){
4263                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4264                         } else {
4265                                 printf("%s\n", pnn_node->addr);
4266                         }
4267                 }
4268                 talloc_free(mem_ctx);
4269         }
4270
4271         return 0;
4272 }
4273
4274 /*
4275   reload the nodes file on the local node
4276  */
4277 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4278 {
4279         int i, ret;
4280         int mypnn;
4281         struct ctdb_node_map *nodemap=NULL;
4282
4283         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4284         if (mypnn == -1) {
4285                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4286                 return -1;
4287         }
4288
4289         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4290         if (ret != 0) {
4291                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4292                 return ret;
4293         }
4294
4295         /* reload the nodes file on all remote nodes */
4296         for (i=0;i<nodemap->num;i++) {
4297                 if (nodemap->nodes[i].pnn == mypnn) {
4298                         continue;
4299                 }
4300                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4301                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4302                         nodemap->nodes[i].pnn);
4303                 if (ret != 0) {
4304                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4305                 }
4306         }
4307
4308         /* reload the nodes file on the local node */
4309         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4310         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4311         if (ret != 0) {
4312                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4313         }
4314
4315         /* initiate a recovery */
4316         control_recover(ctdb, argc, argv);
4317
4318         return 0;
4319 }
4320
4321
4322 static const struct {
4323         const char *name;
4324         int (*fn)(struct ctdb_context *, int, const char **);
4325         bool auto_all;
4326         bool without_daemon; /* can be run without daemon running ? */
4327         const char *msg;
4328         const char *args;
4329 } ctdb_commands[] = {
4330 #ifdef CTDB_VERS
4331         { "version",         control_version,           true,   false,  "show version of ctdb" },
4332 #endif
4333         { "status",          control_status,            true,   false,  "show node status" },
4334         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4335         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4336         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4337         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4338         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4339         { "statistics",      control_statistics,        false,  false, "show statistics" },
4340         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4341         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4342         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4343         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4344         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4345         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4346         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4347         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4348         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4349         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4350         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4351         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4352         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4353         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4354         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4355         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4356         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4357         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4358         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4359         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4360         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4361         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4362         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4363         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4364         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4365         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4366         { "stop",            control_stop,              true,   false,  "stop a node" },
4367         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4368         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4369         { "unban",           control_unban,             true,   false,  "unban a node" },
4370         { "showban",         control_showban,           true,   false,  "show ban information"},
4371         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4372         { "recover",         control_recover,           true,   false,  "force recovery" },
4373         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4374         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4375         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4376         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4377         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4378         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4379         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4380         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4381
4382         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4383         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4384         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4385         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4386         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4387         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4388         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4389         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4390         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4391         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4392         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4393         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4394         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4395         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4396         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4397         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4398         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4399         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4400         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4401         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4402         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4403         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4404         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4405         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4406         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4407         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4408         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4409         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4410         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4411         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4412         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4413         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4414 };
4415
4416 /*
4417   show usage message
4418  */
4419 static void usage(void)
4420 {
4421         int i;
4422         printf(
4423 "Usage: ctdb [options] <control>\n" \
4424 "Options:\n" \
4425 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4426 "   -Y                 generate machinereadable output\n"
4427 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4428         printf("Controls:\n");
4429         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4430                 printf("  %-15s %-27s  %s\n", 
4431                        ctdb_commands[i].name, 
4432                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4433                        ctdb_commands[i].msg);
4434         }
4435         exit(1);
4436 }
4437
4438
4439 static void ctdb_alarm(int sig)
4440 {
4441         printf("Maximum runtime exceeded - exiting\n");
4442         _exit(ERR_TIMEOUT);
4443 }
4444
4445 /*
4446   main program
4447 */
4448 int main(int argc, const char *argv[])
4449 {
4450         struct ctdb_context *ctdb;
4451         char *nodestring = NULL;
4452         struct poptOption popt_options[] = {
4453                 POPT_AUTOHELP
4454                 POPT_CTDB_CMDLINE
4455                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4456                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4457                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4458                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4459                 POPT_TABLEEND
4460         };
4461         int opt;
4462         const char **extra_argv;
4463         int extra_argc = 0;
4464         int ret=-1, i;
4465         poptContext pc;
4466         struct event_context *ev;
4467         const char *control;
4468
4469         setlinebuf(stdout);
4470         
4471         /* set some defaults */
4472         options.maxruntime = 0;
4473         options.timelimit = 3;
4474         options.pnn = CTDB_CURRENT_NODE;
4475
4476         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4477
4478         while ((opt = poptGetNextOpt(pc)) != -1) {
4479                 switch (opt) {
4480                 default:
4481                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4482                                 poptBadOption(pc, 0), poptStrerror(opt)));
4483                         exit(1);
4484                 }
4485         }
4486
4487         /* setup the remaining options for the main program to use */
4488         extra_argv = poptGetArgs(pc);
4489         if (extra_argv) {
4490                 extra_argv++;
4491                 while (extra_argv[extra_argc]) extra_argc++;
4492         }
4493
4494         if (extra_argc < 1) {
4495                 usage();
4496         }
4497
4498         if (options.maxruntime == 0) {
4499                 const char *ctdb_timeout;
4500                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4501                 if (ctdb_timeout != NULL) {
4502                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4503                 } else {
4504                         /* default timeout is 120 seconds */
4505                         options.maxruntime = 120;
4506                 }
4507         }
4508
4509         signal(SIGALRM, ctdb_alarm);
4510         alarm(options.maxruntime);
4511
4512         /* setup the node number to contact */
4513         if (nodestring != NULL) {
4514                 if (strcmp(nodestring, "all") == 0) {
4515                         options.pnn = CTDB_BROADCAST_ALL;
4516                 } else {
4517                         options.pnn = strtoul(nodestring, NULL, 0);
4518                 }
4519         }
4520
4521         control = extra_argv[0];
4522
4523         ev = event_context_init(NULL);
4524
4525         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4526                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4527                         int j;
4528
4529                         if (ctdb_commands[i].without_daemon == true) {
4530                                 close(2);
4531                         }
4532
4533                         /* initialise ctdb */
4534                         ctdb = ctdb_cmdline_client(ev);
4535
4536                         if (ctdb_commands[i].without_daemon == false) {
4537                                 if (ctdb == NULL) {
4538                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4539                                         exit(1);
4540                                 }
4541
4542                                 /* verify the node exists */
4543                                 verify_node(ctdb);
4544
4545                                 if (options.pnn == CTDB_CURRENT_NODE) {
4546                                         int pnn;
4547                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4548                                         if (pnn == -1) {
4549                                                 return -1;
4550                                         }
4551                                         options.pnn = pnn;
4552                                 }
4553                         }
4554
4555                         if (ctdb_commands[i].auto_all && 
4556                             options.pnn == CTDB_BROADCAST_ALL) {
4557                                 uint32_t *nodes;
4558                                 uint32_t num_nodes;
4559                                 ret = 0;
4560
4561                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4562                                 CTDB_NO_MEMORY(ctdb, nodes);
4563         
4564                                 for (j=0;j<num_nodes;j++) {
4565                                         options.pnn = nodes[j];
4566                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4567                                 }
4568                                 talloc_free(nodes);
4569                         } else {
4570                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4571                         }
4572                         break;
4573                 }
4574         }
4575
4576         if (i == ARRAY_SIZE(ctdb_commands)) {
4577                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4578                 exit(1);
4579         }
4580
4581         return ret;
4582 }