dc8860215982dff98f5d80b2b6f0cb2183d1557d
[rusty/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
49
50 #ifdef CTDB_VERS
51 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
52 {
53 #define STR(x) #x
54 #define XSTR(x) STR(x)
55         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
56         return 0;
57 }
58 #endif
59
60
61 /*
62   verify that a node exists and is reachable
63  */
64 static void verify_node(struct ctdb_context *ctdb)
65 {
66         int ret;
67         struct ctdb_node_map *nodemap=NULL;
68
69         if (options.pnn == CTDB_CURRENT_NODE) {
70                 return;
71         }
72         if (options.pnn == CTDB_BROADCAST_ALL) {
73                 return;
74         }
75
76         /* verify the node exists */
77         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
78                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
79                 exit(10);
80         }
81         if (options.pnn >= nodemap->num) {
82                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
83                 exit(ERR_NONODE);
84         }
85         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
86                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
87                 exit(ERR_DISNODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93
94         /* verify we can access the node */
95         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
96         if (ret == -1) {
97                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
98                 exit(10);
99         }
100 }
101
102 /*
103  check if a database exists
104 */
105 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
106 {
107         int i, ret;
108         struct ctdb_dbid_map *dbmap=NULL;
109
110         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
111         if (ret != 0) {
112                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
113                 return -1;
114         }
115
116         for(i=0;i<dbmap->num;i++){
117                 const char *name;
118
119                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
120                 if (!strcmp(name, db_name)) {
121                         return 0;
122                 }
123         }
124
125         return -1;
126 }
127
128 /*
129   see if a process exists
130  */
131 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
132 {
133         uint32_t pnn, pid;
134         int ret;
135         if (argc < 1) {
136                 usage();
137         }
138
139         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
140                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
141                 return -1;
142         }
143
144         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
145         if (ret == 0) {
146                 printf("%u:%u exists\n", pnn, pid);
147         } else {
148                 printf("%u:%u does not exist\n", pnn, pid);
149         }
150         return ret;
151 }
152
153 /*
154   display statistics structure
155  */
156 static void show_statistics(struct ctdb_statistics *s)
157 {
158         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
159         int i;
160         const char *prefix=NULL;
161         int preflen=0;
162         const struct {
163                 const char *name;
164                 uint32_t offset;
165         } fields[] = {
166 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
167                 STATISTICS_FIELD(num_clients),
168                 STATISTICS_FIELD(frozen),
169                 STATISTICS_FIELD(recovering),
170                 STATISTICS_FIELD(num_recoveries),
171                 STATISTICS_FIELD(client_packets_sent),
172                 STATISTICS_FIELD(client_packets_recv),
173                 STATISTICS_FIELD(node_packets_sent),
174                 STATISTICS_FIELD(node_packets_recv),
175                 STATISTICS_FIELD(keepalive_packets_sent),
176                 STATISTICS_FIELD(keepalive_packets_recv),
177                 STATISTICS_FIELD(node.req_call),
178                 STATISTICS_FIELD(node.reply_call),
179                 STATISTICS_FIELD(node.req_dmaster),
180                 STATISTICS_FIELD(node.reply_dmaster),
181                 STATISTICS_FIELD(node.reply_error),
182                 STATISTICS_FIELD(node.req_message),
183                 STATISTICS_FIELD(node.req_control),
184                 STATISTICS_FIELD(node.reply_control),
185                 STATISTICS_FIELD(client.req_call),
186                 STATISTICS_FIELD(client.req_message),
187                 STATISTICS_FIELD(client.req_control),
188                 STATISTICS_FIELD(timeouts.call),
189                 STATISTICS_FIELD(timeouts.control),
190                 STATISTICS_FIELD(timeouts.traverse),
191                 STATISTICS_FIELD(total_calls),
192                 STATISTICS_FIELD(pending_calls),
193                 STATISTICS_FIELD(lockwait_calls),
194                 STATISTICS_FIELD(pending_lockwait_calls),
195                 STATISTICS_FIELD(childwrite_calls),
196                 STATISTICS_FIELD(pending_childwrite_calls),
197                 STATISTICS_FIELD(memory_used),
198                 STATISTICS_FIELD(max_hop_count),
199         };
200         printf("CTDB version %u\n", CTDB_VERSION);
201         for (i=0;i<ARRAY_SIZE(fields);i++) {
202                 if (strchr(fields[i].name, '.')) {
203                         preflen = strcspn(fields[i].name, ".")+1;
204                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
205                                 prefix = fields[i].name;
206                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
207                         }
208                 } else {
209                         preflen = 0;
210                 }
211                 printf(" %*s%-22s%*s%10u\n", 
212                        preflen?4:0, "",
213                        fields[i].name+preflen, 
214                        preflen?0:4, "",
215                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
216         }
217         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
218         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
219
220         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
221         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
222         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
223         talloc_free(tmp_ctx);
224 }
225
226 /*
227   display remote ctdb statistics combined from all nodes
228  */
229 static int control_statistics_all(struct ctdb_context *ctdb)
230 {
231         int ret, i;
232         struct ctdb_statistics statistics;
233         uint32_t *nodes;
234         uint32_t num_nodes;
235
236         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
237         CTDB_NO_MEMORY(ctdb, nodes);
238         
239         ZERO_STRUCT(statistics);
240
241         for (i=0;i<num_nodes;i++) {
242                 struct ctdb_statistics s1;
243                 int j;
244                 uint32_t *v1 = (uint32_t *)&s1;
245                 uint32_t *v2 = (uint32_t *)&statistics;
246                 uint32_t num_ints = 
247                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
248                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
249                 if (ret != 0) {
250                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
251                         return ret;
252                 }
253                 for (j=0;j<num_ints;j++) {
254                         v2[j] += v1[j];
255                 }
256                 statistics.max_hop_count = 
257                         MAX(statistics.max_hop_count, s1.max_hop_count);
258                 statistics.max_call_latency = 
259                         MAX(statistics.max_call_latency, s1.max_call_latency);
260                 statistics.max_lockwait_latency = 
261                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
262         }
263         talloc_free(nodes);
264         printf("Gathered statistics for %u nodes\n", num_nodes);
265         show_statistics(&statistics);
266         return 0;
267 }
268
269 /*
270   display remote ctdb statistics
271  */
272 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
273 {
274         int ret;
275         struct ctdb_statistics statistics;
276
277         if (options.pnn == CTDB_BROADCAST_ALL) {
278                 return control_statistics_all(ctdb);
279         }
280
281         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
282         if (ret != 0) {
283                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
284                 return ret;
285         }
286         show_statistics(&statistics);
287         return 0;
288 }
289
290
291 /*
292   reset remote ctdb statistics
293  */
294 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
295 {
296         int ret;
297
298         ret = ctdb_statistics_reset(ctdb, options.pnn);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
301                 return ret;
302         }
303         return 0;
304 }
305
306
307 /*
308   display uptime of remote node
309  */
310 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
311 {
312         int ret;
313         struct ctdb_uptime *uptime = NULL;
314         int tmp, days, hours, minutes, seconds;
315
316         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
317         if (ret != 0) {
318                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
319                 return ret;
320         }
321
322         if (options.machinereadable){
323                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
324                 printf(":%u:%u:%u:%lf\n",
325                         (unsigned int)uptime->current_time.tv_sec,
326                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
327                         (unsigned int)uptime->last_recovery_finished.tv_sec,
328                         timeval_delta(&uptime->last_recovery_finished,
329                                       &uptime->last_recovery_started)
330                 );
331                 return 0;
332         }
333
334         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
335
336         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
337         seconds = tmp%60;
338         tmp    /= 60;
339         minutes = tmp%60;
340         tmp    /= 60;
341         hours   = tmp%24;
342         tmp    /= 24;
343         days    = tmp;
344         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
345
346         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
347         seconds = tmp%60;
348         tmp    /= 60;
349         minutes = tmp%60;
350         tmp    /= 60;
351         hours   = tmp%24;
352         tmp    /= 24;
353         days    = tmp;
354         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
355         
356         printf("Duration of last recovery/failover: %lf seconds\n",
357                 timeval_delta(&uptime->last_recovery_finished,
358                               &uptime->last_recovery_started));
359
360         return 0;
361 }
362
363 /*
364   show the PNN of the current node
365  */
366 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
367 {
368         int mypnn;
369
370         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
371         if (mypnn == -1) {
372                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
373                 return -1;
374         }
375
376         printf("PNN:%d\n", mypnn);
377         return 0;
378 }
379
380
381 struct pnn_node {
382         struct pnn_node *next;
383         const char *addr;
384         int pnn;
385 };
386
387 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
388 {
389         const char *nodes_list;
390         int nlines;
391         char **lines;
392         int i, pnn;
393         struct pnn_node *pnn_nodes = NULL;
394         struct pnn_node *pnn_node;
395         struct pnn_node *tmp_node;
396
397         /* read the nodes file */
398         nodes_list = getenv("CTDB_NODES");
399         if (nodes_list == NULL) {
400                 nodes_list = "/etc/ctdb/nodes";
401         }
402         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
403         if (lines == NULL) {
404                 return NULL;
405         }
406         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
407                 nlines--;
408         }
409         for (i=0, pnn=0; i<nlines; i++) {
410                 char *node;
411
412                 node = lines[i];
413                 /* strip leading spaces */
414                 while((*node == ' ') || (*node == '\t')) {
415                         node++;
416                 }
417                 if (*node == '#') {
418                         pnn++;
419                         continue;
420                 }
421                 if (strcmp(node, "") == 0) {
422                         continue;
423                 }
424                 pnn_node = talloc(mem_ctx, struct pnn_node);
425                 pnn_node->pnn = pnn++;
426                 pnn_node->addr = talloc_strdup(pnn_node, node);
427                 pnn_node->next = pnn_nodes;
428                 pnn_nodes = pnn_node;
429         }
430
431         /* swap them around so we return them in incrementing order */
432         pnn_node = pnn_nodes;
433         pnn_nodes = NULL;
434         while (pnn_node) {
435                 tmp_node = pnn_node;
436                 pnn_node = pnn_node->next;
437
438                 tmp_node->next = pnn_nodes;
439                 pnn_nodes = tmp_node;
440         }
441
442         return pnn_nodes;
443 }
444
445 /*
446   show the PNN of the current node
447   discover the pnn by loading the nodes file and try to bind to all
448   addresses one at a time until the ip address is found.
449  */
450 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
451 {
452         TALLOC_CTX *mem_ctx = talloc_new(NULL);
453         struct pnn_node *pnn_nodes;
454         struct pnn_node *pnn_node;
455
456         pnn_nodes = read_nodes_file(mem_ctx);
457         if (pnn_nodes == NULL) {
458                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
459                 talloc_free(mem_ctx);
460                 return -1;
461         }
462
463         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
464                 ctdb_sock_addr addr;
465
466                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
467                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
468                         talloc_free(mem_ctx);
469                         return -1;
470                 }
471
472                 if (ctdb_sys_have_ip(&addr)) {
473                         printf("PNN:%d\n", pnn_node->pnn);
474                         talloc_free(mem_ctx);
475                         return 0;
476                 }
477         }
478
479         printf("Failed to detect which PNN this node is\n");
480         talloc_free(mem_ctx);
481         return -1;
482 }
483
484 /*
485   display remote ctdb status
486  */
487 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
488 {
489         int i, ret;
490         struct ctdb_vnn_map *vnnmap=NULL;
491         struct ctdb_node_map *nodemap=NULL;
492         uint32_t recmode, recmaster;
493         int mypnn;
494
495         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
496         if (mypnn == -1) {
497                 return -1;
498         }
499
500         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
501         if (ret != 0) {
502                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
503                 return ret;
504         }
505
506         if(options.machinereadable){
507                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
508                 for(i=0;i<nodemap->num;i++){
509                         int partially_online = 0;
510                         int j;
511
512                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
513                                 continue;
514                         }
515                         if (nodemap->nodes[i].flags == 0) {
516                                 struct ctdb_control_get_ifaces *ifaces;
517
518                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
519                                                            nodemap->nodes[i].pnn,
520                                                            ctdb, &ifaces);
521                                 if (ret == 0) {
522                                         for (j=0; j < ifaces->num; j++) {
523                                                 if (ifaces->ifaces[j].link_state != 0) {
524                                                         continue;
525                                                 }
526                                                 partially_online = 1;
527                                                 break;
528                                         }
529                                         talloc_free(ifaces);
530                                 }
531                         }
532                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
533                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
538                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
539                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
540                                partially_online);
541                 }
542                 return 0;
543         }
544
545         printf("Number of nodes:%d\n", nodemap->num);
546         for(i=0;i<nodemap->num;i++){
547                 static const struct {
548                         uint32_t flag;
549                         const char *name;
550                 } flag_names[] = {
551                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
552                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
553                         { NODE_FLAGS_BANNED,                "BANNED" },
554                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
555                         { NODE_FLAGS_DELETED,               "DELETED" },
556                         { NODE_FLAGS_STOPPED,               "STOPPED" },
557                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
558                 };
559                 char *flags_str = NULL;
560                 int j;
561
562                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
563                         continue;
564                 }
565                 if (nodemap->nodes[i].flags == 0) {
566                         struct ctdb_control_get_ifaces *ifaces;
567
568                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
569                                                    nodemap->nodes[i].pnn,
570                                                    ctdb, &ifaces);
571                         if (ret == 0) {
572                                 for (j=0; j < ifaces->num; j++) {
573                                         if (ifaces->ifaces[j].link_state != 0) {
574                                                 continue;
575                                         }
576                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
577                                         break;
578                                 }
579                                 talloc_free(ifaces);
580                         }
581                 }
582                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
583                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
584                                 if (flags_str == NULL) {
585                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
586                                 } else {
587                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
588                                                                            flag_names[j].name);
589                                 }
590                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
591                         }
592                 }
593                 if (flags_str == NULL) {
594                         flags_str = talloc_strdup(ctdb, "OK");
595                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
596                 }
597                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
598                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
599                        flags_str,
600                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
601                 talloc_free(flags_str);
602         }
603
604         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
605         if (ret != 0) {
606                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
607                 return ret;
608         }
609         if (vnnmap->generation == INVALID_GENERATION) {
610                 printf("Generation:INVALID\n");
611         } else {
612                 printf("Generation:%d\n",vnnmap->generation);
613         }
614         printf("Size:%d\n",vnnmap->size);
615         for(i=0;i<vnnmap->size;i++){
616                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
617         }
618
619         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
620         if (ret != 0) {
621                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
622                 return ret;
623         }
624         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
625
626         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
627         if (ret != 0) {
628                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
629                 return ret;
630         }
631         printf("Recovery master:%d\n",recmaster);
632
633         return 0;
634 }
635
636
637 struct natgw_node {
638         struct natgw_node *next;
639         const char *addr;
640 };
641
642 /*
643   display the list of nodes belonging to this natgw configuration
644  */
645 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
646 {
647         int i, ret;
648         const char *natgw_list;
649         int nlines;
650         char **lines;
651         struct natgw_node *natgw_nodes = NULL;
652         struct natgw_node *natgw_node;
653         struct ctdb_node_map *nodemap=NULL;
654
655
656         /* read the natgw nodes file into a linked list */
657         natgw_list = getenv("NATGW_NODES");
658         if (natgw_list == NULL) {
659                 natgw_list = "/etc/ctdb/natgw_nodes";
660         }
661         lines = file_lines_load(natgw_list, &nlines, ctdb);
662         if (lines == NULL) {
663                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
664                 return -1;
665         }
666         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
667                 nlines--;
668         }
669         for (i=0;i<nlines;i++) {
670                 char *node;
671
672                 node = lines[i];
673                 /* strip leading spaces */
674                 while((*node == ' ') || (*node == '\t')) {
675                         node++;
676                 }
677                 if (*node == '#') {
678                         continue;
679                 }
680                 if (strcmp(node, "") == 0) {
681                         continue;
682                 }
683                 natgw_node = talloc(ctdb, struct natgw_node);
684                 natgw_node->addr = talloc_strdup(natgw_node, node);
685                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
686                 natgw_node->next = natgw_nodes;
687                 natgw_nodes = natgw_node;
688         }
689
690         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
691         if (ret != 0) {
692                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
693                 return ret;
694         }
695
696         i=0;
697         while(i<nodemap->num) {
698                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
699                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
700                                 break;
701                         }
702                 }
703
704                 /* this node was not in the natgw so we just remove it from
705                  * the list
706                  */
707                 if ((natgw_node == NULL) 
708                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
709                         int j;
710
711                         for (j=i+1; j<nodemap->num; j++) {
712                                 nodemap->nodes[j-1] = nodemap->nodes[j];
713                         }
714                         nodemap->num--;
715                         continue;
716                 }
717
718                 i++;
719         }               
720
721         /* pick a node to be natgwmaster
722          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
723          */
724         for(i=0;i<nodemap->num;i++){
725                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
726                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
727                         break;
728                 }
729         }
730         /* we couldnt find any healthy node, try unhealthy ones */
731         if (i == nodemap->num) {
732                 for(i=0;i<nodemap->num;i++){
733                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
734                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
735                                 break;
736                         }
737                 }
738         }
739         /* unless all nodes are STOPPED, when we pick one anyway */
740         if (i == nodemap->num) {
741                 for(i=0;i<nodemap->num;i++){
742                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
743                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
744                                 break;
745                         }
746                 }
747                 /* or if we still can not find any */
748                 if (i == nodemap->num) {
749                         printf("-1 0.0.0.0\n");
750                 }
751         }
752
753         /* print the pruned list of nodes belonging to this natgw list */
754         for(i=0;i<nodemap->num;i++){
755                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
756                         continue;
757                 }
758                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
759                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
760                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
761                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
762                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
763                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
764                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
765         }
766
767         return 0;
768 }
769
770 /*
771   display the status of the scripts for monitoring (or other events)
772  */
773 static int control_one_scriptstatus(struct ctdb_context *ctdb,
774                                     enum ctdb_eventscript_call type)
775 {
776         struct ctdb_scripts_wire *script_status;
777         int ret, i;
778
779         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
782                 return ret;
783         }
784
785         if (script_status == NULL) {
786                 if (!options.machinereadable) {
787                         printf("%s cycle never run\n",
788                                ctdb_eventscript_call_names[type]);
789                 }
790                 return 0;
791         }
792
793         if (!options.machinereadable) {
794                 printf("%d scripts were executed last %s cycle\n",
795                        script_status->num_scripts,
796                        ctdb_eventscript_call_names[type]);
797         }
798         for (i=0; i<script_status->num_scripts; i++) {
799                 const char *status = NULL;
800
801                 switch (script_status->scripts[i].status) {
802                 case -ETIME:
803                         status = "TIMEDOUT";
804                         break;
805                 case -ENOEXEC:
806                         status = "DISABLED";
807                         break;
808                 case 0:
809                         status = "OK";
810                         break;
811                 default:
812                         if (script_status->scripts[i].status > 0)
813                                 status = "ERROR";
814                         break;
815                 }
816                 if (options.machinereadable) {
817                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
818                                ctdb_eventscript_call_names[type],
819                                script_status->scripts[i].name,
820                                script_status->scripts[i].status,
821                                status,
822                                (long)script_status->scripts[i].start.tv_sec,
823                                (long)script_status->scripts[i].start.tv_usec,
824                                (long)script_status->scripts[i].finished.tv_sec,
825                                (long)script_status->scripts[i].finished.tv_usec,
826                                script_status->scripts[i].output);
827                         continue;
828                 }
829                 if (status)
830                         printf("%-20s Status:%s    ",
831                                script_status->scripts[i].name, status);
832                 else
833                         /* Some other error, eg from stat. */
834                         printf("%-20s Status:CANNOT RUN (%s)",
835                                script_status->scripts[i].name,
836                                strerror(-script_status->scripts[i].status));
837
838                 if (script_status->scripts[i].status >= 0) {
839                         printf("Duration:%.3lf ",
840                         timeval_delta(&script_status->scripts[i].finished,
841                               &script_status->scripts[i].start));
842                 }
843                 if (script_status->scripts[i].status != -ENOEXEC) {
844                         printf("%s",
845                                ctime(&script_status->scripts[i].start.tv_sec));
846                         if (script_status->scripts[i].status != 0) {
847                                 printf("   OUTPUT:%s\n",
848                                        script_status->scripts[i].output);
849                         }
850                 } else {
851                         printf("\n");
852                 }
853         }
854         return 0;
855 }
856
857
858 static int control_scriptstatus(struct ctdb_context *ctdb,
859                                 int argc, const char **argv)
860 {
861         int ret;
862         enum ctdb_eventscript_call type, min, max;
863         const char *arg;
864
865         if (argc > 1) {
866                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
867                 return -1;
868         }
869
870         if (argc == 0)
871                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
872         else
873                 arg = argv[0];
874
875         for (type = 0; type < CTDB_EVENT_MAX; type++) {
876                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
877                         min = type;
878                         max = type+1;
879                         break;
880                 }
881         }
882         if (type == CTDB_EVENT_MAX) {
883                 if (strcmp(arg, "all") == 0) {
884                         min = 0;
885                         max = CTDB_EVENT_MAX;
886                 } else {
887                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
888                         return -1;
889                 }
890         }
891
892         if (options.machinereadable) {
893                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
894         }
895
896         for (type = min; type < max; type++) {
897                 ret = control_one_scriptstatus(ctdb, type);
898                 if (ret != 0) {
899                         return ret;
900                 }
901         }
902
903         return 0;
904 }
905
906 /*
907   enable an eventscript
908  */
909 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
910 {
911         int ret;
912
913         if (argc < 1) {
914                 usage();
915         }
916
917         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
918         if (ret != 0) {
919           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
920                 return ret;
921         }
922
923         return 0;
924 }
925
926 /*
927   disable an eventscript
928  */
929 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
930 {
931         int ret;
932
933         if (argc < 1) {
934                 usage();
935         }
936
937         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
938         if (ret != 0) {
939           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
940                 return ret;
941         }
942
943         return 0;
944 }
945
946 /*
947   display the pnn of the recovery master
948  */
949 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
950 {
951         int ret;
952         uint32_t recmaster;
953
954         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
957                 return ret;
958         }
959         printf("%d\n",recmaster);
960
961         return 0;
962 }
963
964 /*
965   get a list of all tickles for this pnn
966  */
967 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
968 {
969         struct ctdb_control_tcp_tickle_list *list;
970         ctdb_sock_addr addr;
971         int i, ret;
972
973         if (argc < 1) {
974                 usage();
975         }
976
977         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
978                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
979                 return -1;
980         }
981
982         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
983         if (ret == -1) {
984                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
985                 return -1;
986         }
987
988         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
989         printf("Num tickles:%u\n", list->tickles.num);
990         for (i=0;i<list->tickles.num;i++) {
991                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
992                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
993         }
994
995         talloc_free(list);
996         
997         return 0;
998 }
999
1000
1001 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1002 {
1003         struct ctdb_all_public_ips *ips;
1004         struct ctdb_public_ip ip;
1005         int i, ret;
1006         uint32_t *nodes;
1007         uint32_t disable_time;
1008         TDB_DATA data;
1009         struct ctdb_node_map *nodemap=NULL;
1010         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1011
1012         disable_time = 30;
1013         data.dptr  = (uint8_t*)&disable_time;
1014         data.dsize = sizeof(disable_time);
1015         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1016         if (ret != 0) {
1017                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1018                 return -1;
1019         }
1020
1021
1022
1023         /* read the public ip list from the node */
1024         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1025         if (ret != 0) {
1026                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1027                 talloc_free(tmp_ctx);
1028                 return -1;
1029         }
1030
1031         for (i=0;i<ips->num;i++) {
1032                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1033                         break;
1034                 }
1035         }
1036         if (i==ips->num) {
1037                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1038                         pnn, ctdb_addr_to_str(addr)));
1039                 talloc_free(tmp_ctx);
1040                 return -1;
1041         }
1042
1043         ip.pnn  = pnn;
1044         ip.addr = *addr;
1045
1046         data.dptr  = (uint8_t *)&ip;
1047         data.dsize = sizeof(ip);
1048
1049         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1050         if (ret != 0) {
1051                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1052                 talloc_free(tmp_ctx);
1053                 return ret;
1054         }
1055
1056         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1057         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1058                                         nodes, 0,
1059                                         LONGTIMELIMIT(),
1060                                         false, data,
1061                                         NULL, NULL,
1062                                         NULL);
1063         if (ret != 0) {
1064                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1065                 talloc_free(tmp_ctx);
1066                 return -1;
1067         }
1068
1069         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1072                 talloc_free(tmp_ctx);
1073                 return -1;
1074         }
1075
1076         /* update the recovery daemon so it now knows to expect the new
1077            node assignment for this ip.
1078         */
1079         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1080         if (ret != 0) {
1081                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1082                 return -1;
1083         }
1084
1085         talloc_free(tmp_ctx);
1086         return 0;
1087 }
1088
1089 /*
1090   move/failover an ip address to a specific node
1091  */
1092 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1093 {
1094         uint32_t pnn;
1095         ctdb_sock_addr addr;
1096
1097         if (argc < 2) {
1098                 usage();
1099                 return -1;
1100         }
1101
1102         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1103                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1104                 return -1;
1105         }
1106
1107
1108         if (sscanf(argv[1], "%u", &pnn) != 1) {
1109                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1110                 return -1;
1111         }
1112
1113         if (move_ip(ctdb, &addr, pnn) != 0) {
1114                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1115                 return -1;
1116         }
1117
1118         return 0;
1119 }
1120
1121 void getips_store_callback(void *param, void *data)
1122 {
1123         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1124         struct ctdb_all_public_ips *ips = param;
1125         int i;
1126
1127         i = ips->num++;
1128         ips->ips[i].pnn  = node_ip->pnn;
1129         ips->ips[i].addr = node_ip->addr;
1130 }
1131
1132 void getips_count_callback(void *param, void *data)
1133 {
1134         uint32_t *count = param;
1135
1136         (*count)++;
1137 }
1138
1139 #define IP_KEYLEN       4
1140 static uint32_t *ip_key(ctdb_sock_addr *ip)
1141 {
1142         static uint32_t key[IP_KEYLEN];
1143
1144         bzero(key, sizeof(key));
1145
1146         switch (ip->sa.sa_family) {
1147         case AF_INET:
1148                 key[0]  = ip->ip.sin_addr.s_addr;
1149                 break;
1150         case AF_INET6:
1151                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1152                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1153                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1154                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1155                 break;
1156         default:
1157                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1158                 return key;
1159         }
1160
1161         return key;
1162 }
1163
1164 static void *add_ip_callback(void *parm, void *data)
1165 {
1166         return parm;
1167 }
1168
1169 static int
1170 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1171 {
1172         struct ctdb_all_public_ips *tmp_ips;
1173         struct ctdb_node_map *nodemap=NULL;
1174         trbt_tree_t *ip_tree;
1175         int i, j, len, ret;
1176         uint32_t count;
1177
1178         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1179         if (ret != 0) {
1180                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1181                 return ret;
1182         }
1183
1184         ip_tree = trbt_create(tmp_ctx, 0);
1185
1186         for(i=0;i<nodemap->num;i++){
1187                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1188                         continue;
1189                 }
1190                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1191                         continue;
1192                 }
1193
1194                 /* read the public ip list from this node */
1195                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1196                 if (ret != 0) {
1197                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1198                         return -1;
1199                 }
1200         
1201                 for (j=0; j<tmp_ips->num;j++) {
1202                         struct ctdb_public_ip *node_ip;
1203
1204                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1205                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1206                         node_ip->addr = tmp_ips->ips[j].addr;
1207
1208                         trbt_insertarray32_callback(ip_tree,
1209                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1210                                 add_ip_callback,
1211                                 node_ip);
1212                 }
1213                 talloc_free(tmp_ips);
1214         }
1215
1216         /* traverse */
1217         count = 0;
1218         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1219
1220         len = offsetof(struct ctdb_all_public_ips, ips) + 
1221                 count*sizeof(struct ctdb_public_ip);
1222         tmp_ips = talloc_zero_size(tmp_ctx, len);
1223         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1224
1225         *ips = tmp_ips;
1226
1227         return 0;
1228 }
1229
1230
1231 /* 
1232  * scans all other nodes and returns a pnn for another node that can host this 
1233  * ip address or -1
1234  */
1235 static int
1236 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1237 {
1238         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1239         struct ctdb_all_public_ips *ips;
1240         struct ctdb_node_map *nodemap=NULL;
1241         int i, j, ret;
1242
1243         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1244         if (ret != 0) {
1245                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1246                 talloc_free(tmp_ctx);
1247                 return ret;
1248         }
1249
1250         for(i=0;i<nodemap->num;i++){
1251                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1252                         continue;
1253                 }
1254                 if (nodemap->nodes[i].pnn == options.pnn) {
1255                         continue;
1256                 }
1257
1258                 /* read the public ip list from this node */
1259                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1260                 if (ret != 0) {
1261                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1262                         return -1;
1263                 }
1264
1265                 for (j=0;j<ips->num;j++) {
1266                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1267                                 talloc_free(tmp_ctx);
1268                                 return nodemap->nodes[i].pnn;
1269                         }
1270                 }
1271                 talloc_free(ips);
1272         }
1273
1274         talloc_free(tmp_ctx);
1275         return -1;
1276 }
1277
1278 /*
1279   add a public ip address to a node
1280  */
1281 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1282 {
1283         int i, ret;
1284         int len;
1285         uint32_t pnn;
1286         unsigned mask;
1287         ctdb_sock_addr addr;
1288         struct ctdb_control_ip_iface *pub;
1289         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1290         struct ctdb_all_public_ips *ips;
1291
1292
1293         if (argc != 2) {
1294                 talloc_free(tmp_ctx);
1295                 usage();
1296         }
1297
1298         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1299                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1300                 talloc_free(tmp_ctx);
1301                 return -1;
1302         }
1303
1304         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1305         if (ret != 0) {
1306                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1307                 talloc_free(tmp_ctx);
1308                 return ret;
1309         }
1310
1311
1312         /* check if some other node is already serving this ip, if not,
1313          * we will claim it
1314          */
1315         for (i=0;i<ips->num;i++) {
1316                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1317                         break;
1318                 }
1319         }
1320
1321         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1322         pub = talloc_size(tmp_ctx, len); 
1323         CTDB_NO_MEMORY(ctdb, pub);
1324
1325         pub->addr  = addr;
1326         pub->mask  = mask;
1327         pub->len   = strlen(argv[1])+1;
1328         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1329
1330         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1331         if (ret != 0) {
1332                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1333                 talloc_free(tmp_ctx);
1334                 return ret;
1335         }
1336
1337         if (i == ips->num) {
1338                 /* no one has this ip so we claim it */
1339                 pnn  = options.pnn;
1340         } else {
1341                 pnn  = ips->ips[i].pnn;
1342         }
1343
1344         if (move_ip(ctdb, &addr, pnn) != 0) {
1345                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1346                 return -1;
1347         }
1348
1349         talloc_free(tmp_ctx);
1350         return 0;
1351 }
1352
1353 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1354
1355 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1356 {
1357         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1358         struct ctdb_node_map *nodemap=NULL;
1359         struct ctdb_all_public_ips *ips;
1360         int ret, i, j;
1361
1362         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1363         if (ret != 0) {
1364                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1365                 return ret;
1366         }
1367
1368         /* remove it from the nodes that are not hosting the ip currently */
1369         for(i=0;i<nodemap->num;i++){
1370                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1371                         continue;
1372                 }
1373                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1374                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1375                         continue;
1376                 }
1377
1378                 for (j=0;j<ips->num;j++) {
1379                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1380                                 break;
1381                         }
1382                 }
1383                 if (j==ips->num) {
1384                         continue;
1385                 }
1386
1387                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1388                         continue;
1389                 }
1390
1391                 options.pnn = nodemap->nodes[i].pnn;
1392                 control_delip(ctdb, argc, argv);
1393         }
1394
1395
1396         /* remove it from every node (also the one hosting it) */
1397         for(i=0;i<nodemap->num;i++){
1398                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1399                         continue;
1400                 }
1401                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1402                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1403                         continue;
1404                 }
1405
1406                 for (j=0;j<ips->num;j++) {
1407                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1408                                 break;
1409                         }
1410                 }
1411                 if (j==ips->num) {
1412                         continue;
1413                 }
1414
1415                 options.pnn = nodemap->nodes[i].pnn;
1416                 control_delip(ctdb, argc, argv);
1417         }
1418
1419         talloc_free(tmp_ctx);
1420         return 0;
1421 }
1422         
1423 /*
1424   delete a public ip address from a node
1425  */
1426 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1427 {
1428         int i, ret;
1429         ctdb_sock_addr addr;
1430         struct ctdb_control_ip_iface pub;
1431         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1432         struct ctdb_all_public_ips *ips;
1433
1434         if (argc != 1) {
1435                 talloc_free(tmp_ctx);
1436                 usage();
1437         }
1438
1439         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1440                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1441                 return -1;
1442         }
1443
1444         if (options.pnn == CTDB_BROADCAST_ALL) {
1445                 return control_delip_all(ctdb, argc, argv, &addr);
1446         }
1447
1448         pub.addr  = addr;
1449         pub.mask  = 0;
1450         pub.len   = 0;
1451
1452         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1453         if (ret != 0) {
1454                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1455                 talloc_free(tmp_ctx);
1456                 return ret;
1457         }
1458         
1459         for (i=0;i<ips->num;i++) {
1460                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1461                         break;
1462                 }
1463         }
1464
1465         if (i==ips->num) {
1466                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1467                         ctdb_addr_to_str(&addr)));
1468                 talloc_free(tmp_ctx);
1469                 return -1;
1470         }
1471
1472         if (ips->ips[i].pnn == options.pnn) {
1473                 ret = find_other_host_for_public_ip(ctdb, &addr);
1474                 if (ret != -1) {
1475                         if (move_ip(ctdb, &addr, ret) != 0) {
1476                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1477                                 return -1;
1478                         }
1479                 }
1480         }
1481
1482         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1483         if (ret != 0) {
1484                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1485                 talloc_free(tmp_ctx);
1486                 return ret;
1487         }
1488
1489         talloc_free(tmp_ctx);
1490         return 0;
1491 }
1492
1493 /*
1494   kill a tcp connection
1495  */
1496 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1497 {
1498         int ret;
1499         struct ctdb_control_killtcp killtcp;
1500
1501         if (argc < 2) {
1502                 usage();
1503         }
1504
1505         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1506                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1507                 return -1;
1508         }
1509
1510         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1511                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1512                 return -1;
1513         }
1514
1515         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1516         if (ret != 0) {
1517                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1518                 return ret;
1519         }
1520
1521         return 0;
1522 }
1523
1524
1525 /*
1526   send a gratious arp
1527  */
1528 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1529 {
1530         int ret;
1531         ctdb_sock_addr addr;
1532
1533         if (argc < 2) {
1534                 usage();
1535         }
1536
1537         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1538                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1539                 return -1;
1540         }
1541
1542         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1543         if (ret != 0) {
1544                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1545                 return ret;
1546         }
1547
1548         return 0;
1549 }
1550
1551 /*
1552   register a server id
1553  */
1554 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1555 {
1556         int ret;
1557         struct ctdb_server_id server_id;
1558
1559         if (argc < 3) {
1560                 usage();
1561         }
1562
1563         server_id.pnn       = strtoul(argv[0], NULL, 0);
1564         server_id.type      = strtoul(argv[1], NULL, 0);
1565         server_id.server_id = strtoul(argv[2], NULL, 0);
1566
1567         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1570                 return ret;
1571         }
1572         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1573         sleep(999);
1574         return -1;
1575 }
1576
1577 /*
1578   unregister a server id
1579  */
1580 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1581 {
1582         int ret;
1583         struct ctdb_server_id server_id;
1584
1585         if (argc < 3) {
1586                 usage();
1587         }
1588
1589         server_id.pnn       = strtoul(argv[0], NULL, 0);
1590         server_id.type      = strtoul(argv[1], NULL, 0);
1591         server_id.server_id = strtoul(argv[2], NULL, 0);
1592
1593         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1594         if (ret != 0) {
1595                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1596                 return ret;
1597         }
1598         return -1;
1599 }
1600
1601 /*
1602   check if a server id exists
1603  */
1604 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1605 {
1606         uint32_t status;
1607         int ret;
1608         struct ctdb_server_id server_id;
1609
1610         if (argc < 3) {
1611                 usage();
1612         }
1613
1614         server_id.pnn       = strtoul(argv[0], NULL, 0);
1615         server_id.type      = strtoul(argv[1], NULL, 0);
1616         server_id.server_id = strtoul(argv[2], NULL, 0);
1617
1618         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1619         if (ret != 0) {
1620                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1621                 return ret;
1622         }
1623
1624         if (status) {
1625                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1626         } else {
1627                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1628         }
1629         return 0;
1630 }
1631
1632 /*
1633   get a list of all server ids that are registered on a node
1634  */
1635 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1636 {
1637         int i, ret;
1638         struct ctdb_server_id_list *server_ids;
1639
1640         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1641         if (ret != 0) {
1642                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1643                 return ret;
1644         }
1645
1646         for (i=0; i<server_ids->num; i++) {
1647                 printf("Server id %d:%d:%d\n", 
1648                         server_ids->server_ids[i].pnn, 
1649                         server_ids->server_ids[i].type, 
1650                         server_ids->server_ids[i].server_id); 
1651         }
1652
1653         return -1;
1654 }
1655
1656 /*
1657   send a tcp tickle ack
1658  */
1659 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1660 {
1661         int ret;
1662         ctdb_sock_addr  src, dst;
1663
1664         if (argc < 2) {
1665                 usage();
1666         }
1667
1668         if (!parse_ip_port(argv[0], &src)) {
1669                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1670                 return -1;
1671         }
1672
1673         if (!parse_ip_port(argv[1], &dst)) {
1674                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1675                 return -1;
1676         }
1677
1678         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1679         if (ret==0) {
1680                 return 0;
1681         }
1682         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1683
1684         return -1;
1685 }
1686
1687
1688 /*
1689   display public ip status
1690  */
1691 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1692 {
1693         int i, ret;
1694         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1695         struct ctdb_all_public_ips *ips;
1696
1697         if (options.pnn == CTDB_BROADCAST_ALL) {
1698                 /* read the list of public ips from all nodes */
1699                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1700         } else {
1701                 /* read the public ip list from this node */
1702                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1703         }
1704         if (ret != 0) {
1705                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1706                 talloc_free(tmp_ctx);
1707                 return ret;
1708         }
1709
1710         if (options.machinereadable){
1711                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1712         } else {
1713                 if (options.pnn == CTDB_BROADCAST_ALL) {
1714                         printf("Public IPs on ALL nodes\n");
1715                 } else {
1716                         printf("Public IPs on node %u\n", options.pnn);
1717                 }
1718         }
1719
1720         for (i=1;i<=ips->num;i++) {
1721                 struct ctdb_control_public_ip_info *info = NULL;
1722                 int32_t pnn;
1723                 char *aciface = NULL;
1724                 char *avifaces = NULL;
1725                 char *cifaces = NULL;
1726
1727                 if (options.pnn == CTDB_BROADCAST_ALL) {
1728                         pnn = ips->ips[ips->num-i].pnn;
1729                 } else {
1730                         pnn = options.pnn;
1731                 }
1732
1733                 if (pnn != -1) {
1734                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1735                                                    &ips->ips[ips->num-i].addr, &info);
1736                 } else {
1737                         ret = -1;
1738                 }
1739
1740                 if (ret == 0) {
1741                         int j;
1742                         for (j=0; j < info->num; j++) {
1743                                 if (cifaces == NULL) {
1744                                         cifaces = talloc_strdup(info,
1745                                                                 info->ifaces[j].name);
1746                                 } else {
1747                                         cifaces = talloc_asprintf_append(cifaces,
1748                                                                          ",%s",
1749                                                                          info->ifaces[j].name);
1750                                 }
1751
1752                                 if (info->active_idx == j) {
1753                                         aciface = info->ifaces[j].name;
1754                                 }
1755
1756                                 if (info->ifaces[j].link_state == 0) {
1757                                         continue;
1758                                 }
1759
1760                                 if (avifaces == NULL) {
1761                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1762                                 } else {
1763                                         avifaces = talloc_asprintf_append(avifaces,
1764                                                                           ",%s",
1765                                                                           info->ifaces[j].name);
1766                                 }
1767                         }
1768                 }
1769
1770                 if (options.machinereadable){
1771                         printf(":%s:%d:%s:%s:%s:\n",
1772                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1773                                ips->ips[ips->num-i].pnn,
1774                                aciface?aciface:"",
1775                                avifaces?avifaces:"",
1776                                cifaces?cifaces:"");
1777                 } else {
1778                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1779                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1780                                ips->ips[ips->num-i].pnn,
1781                                aciface?aciface:"",
1782                                avifaces?avifaces:"",
1783                                cifaces?cifaces:"");
1784                 }
1785                 talloc_free(info);
1786         }
1787
1788         talloc_free(tmp_ctx);
1789         return 0;
1790 }
1791
1792 /*
1793   public ip info
1794  */
1795 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1796 {
1797         int i, ret;
1798         ctdb_sock_addr addr;
1799         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1800         struct ctdb_control_public_ip_info *info;
1801
1802         if (argc != 1) {
1803                 talloc_free(tmp_ctx);
1804                 usage();
1805         }
1806
1807         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1808                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1809                 return -1;
1810         }
1811
1812         /* read the public ip info from this node */
1813         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1814                                            tmp_ctx, &addr, &info);
1815         if (ret != 0) {
1816                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1817                                   argv[0], options.pnn));
1818                 talloc_free(tmp_ctx);
1819                 return ret;
1820         }
1821
1822         printf("Public IP[%s] info on node %u\n",
1823                ctdb_addr_to_str(&info->ip.addr),
1824                options.pnn);
1825
1826         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1827                ctdb_addr_to_str(&info->ip.addr),
1828                info->ip.pnn, info->num);
1829
1830         for (i=0; i<info->num; i++) {
1831                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1832
1833                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1834                        i+1, info->ifaces[i].name,
1835                        info->ifaces[i].link_state?"up":"down",
1836                        (unsigned int)info->ifaces[i].references,
1837                        (i==info->active_idx)?" (active)":"");
1838         }
1839
1840         talloc_free(tmp_ctx);
1841         return 0;
1842 }
1843
1844 /*
1845   display interfaces status
1846  */
1847 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1848 {
1849         int i, ret;
1850         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1851         struct ctdb_control_get_ifaces *ifaces;
1852
1853         /* read the public ip list from this node */
1854         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1855                                    tmp_ctx, &ifaces);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1858                                   options.pnn));
1859                 talloc_free(tmp_ctx);
1860                 return ret;
1861         }
1862
1863         if (options.machinereadable){
1864                 printf(":Name:LinkStatus:References:\n");
1865         } else {
1866                 printf("Interfaces on node %u\n", options.pnn);
1867         }
1868
1869         for (i=0; i<ifaces->num; i++) {
1870                 if (options.machinereadable){
1871                         printf(":%s:%s:%u\n",
1872                                ifaces->ifaces[i].name,
1873                                ifaces->ifaces[i].link_state?"1":"0",
1874                                (unsigned int)ifaces->ifaces[i].references);
1875                 } else {
1876                         printf("name:%s link:%s references:%u\n",
1877                                ifaces->ifaces[i].name,
1878                                ifaces->ifaces[i].link_state?"up":"down",
1879                                (unsigned int)ifaces->ifaces[i].references);
1880                 }
1881         }
1882
1883         talloc_free(tmp_ctx);
1884         return 0;
1885 }
1886
1887
1888 /*
1889   set link status of an interface
1890  */
1891 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1892 {
1893         int ret;
1894         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1895         struct ctdb_control_iface_info info;
1896
1897         ZERO_STRUCT(info);
1898
1899         if (argc != 2) {
1900                 usage();
1901         }
1902
1903         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1904                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1905                                   argv[0]));
1906                 talloc_free(tmp_ctx);
1907                 return -1;
1908         }
1909         strcpy(info.name, argv[0]);
1910
1911         if (strcmp(argv[1], "up") == 0) {
1912                 info.link_state = 1;
1913         } else if (strcmp(argv[1], "down") == 0) {
1914                 info.link_state = 0;
1915         } else {
1916                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1917                                   argv[1]));
1918                 talloc_free(tmp_ctx);
1919                 return -1;
1920         }
1921
1922         /* read the public ip list from this node */
1923         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1924                                    tmp_ctx, &info);
1925         if (ret != 0) {
1926                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1927                                   argv[0], options.pnn));
1928                 talloc_free(tmp_ctx);
1929                 return ret;
1930         }
1931
1932         talloc_free(tmp_ctx);
1933         return 0;
1934 }
1935
1936 /*
1937   display pid of a ctdb daemon
1938  */
1939 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1940 {
1941         uint32_t pid;
1942         int ret;
1943
1944         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1945         if (ret != 0) {
1946                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1947                 return ret;
1948         }
1949         printf("Pid:%d\n", pid);
1950
1951         return 0;
1952 }
1953
1954 static uint32_t ipreallocate_finished;
1955
1956 /*
1957   handler for receiving the response to ipreallocate
1958 */
1959 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1960                              TDB_DATA data, void *private_data)
1961 {
1962         ipreallocate_finished = 1;
1963 }
1964
1965 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1966 {
1967         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1968
1969         event_add_timed(ctdb->ev, ctdb, 
1970                                 timeval_current_ofs(1, 0),
1971                                 ctdb_every_second, ctdb);
1972 }
1973
1974 /*
1975   ask the recovery daemon on the recovery master to perform a ip reallocation
1976  */
1977 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1978 {
1979         int i, ret;
1980         TDB_DATA data;
1981         struct takeover_run_reply rd;
1982         uint32_t recmaster;
1983         struct ctdb_node_map *nodemap=NULL;
1984         int retries=0;
1985         struct timeval tv = timeval_current();
1986
1987         /* we need some events to trigger so we can timeout and restart
1988            the loop
1989         */
1990         event_add_timed(ctdb->ev, ctdb, 
1991                                 timeval_current_ofs(1, 0),
1992                                 ctdb_every_second, ctdb);
1993
1994         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1995         if (rd.pnn == -1) {
1996                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1997                 return -1;
1998         }
1999         rd.srvid = getpid();
2000
2001         /* register a message port for receiveing the reply so that we
2002            can receive the reply
2003         */
2004         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2005
2006         data.dptr = (uint8_t *)&rd;
2007         data.dsize = sizeof(rd);
2008
2009 again:
2010         if (retries>5) {
2011                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
2012                 exit(10);
2013         }
2014
2015         /* check that there are valid nodes available */
2016         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2017                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2018                 exit(10);
2019         }
2020         for (i=0; i<nodemap->num;i++) {
2021                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2022                         break;
2023                 }
2024         }
2025         if (i==nodemap->num) {
2026                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2027                 return 0;
2028         }
2029
2030
2031         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2032         if (ret != 0) {
2033                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2034                 return ret;
2035         }
2036
2037         /* verify the node exists */
2038         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2039                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2040                 exit(10);
2041         }
2042
2043
2044         /* check tha there are nodes available that can act as a recmaster */
2045         for (i=0; i<nodemap->num; i++) {
2046                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2047                         continue;
2048                 }
2049                 break;
2050         }
2051         if (i == nodemap->num) {
2052                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2053                 return 0;
2054         }
2055
2056         /* verify the recovery master is not STOPPED, nor BANNED */
2057         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2058                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2059                 retries++;
2060                 sleep(1);
2061                 goto again;
2062         } 
2063
2064         
2065         /* verify the recovery master is not STOPPED, nor BANNED */
2066         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2067                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2068                 retries++;
2069                 sleep(1);
2070                 goto again;
2071         } 
2072
2073         ipreallocate_finished = 0;
2074         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2075         if (ret != 0) {
2076                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2077                 return -1;
2078         }
2079
2080         tv = timeval_current();
2081         /* this loop will terminate when we have received the reply */
2082         while (timeval_elapsed(&tv) < 3.0) {    
2083                 event_loop_once(ctdb->ev);
2084         }
2085         if (ipreallocate_finished == 1) {
2086                 return 0;
2087         }
2088
2089         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2090         retries++;
2091         sleep(1);
2092         goto again;
2093
2094         return 0;
2095 }
2096
2097
2098 /*
2099   disable a remote node
2100  */
2101 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2102 {
2103         int ret;
2104         struct ctdb_node_map *nodemap=NULL;
2105
2106         /* check if the node is already disabled */
2107         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2108                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2109                 exit(10);
2110         }
2111         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2112                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2113                 return 0;
2114         }
2115
2116         do {
2117                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2118                 if (ret != 0) {
2119                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2120                         return ret;
2121                 }
2122
2123                 sleep(1);
2124
2125                 /* read the nodemap and verify the change took effect */
2126                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2127                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2128                         exit(10);
2129                 }
2130
2131         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2132         ret = control_ipreallocate(ctdb, argc, argv);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2135                 return ret;
2136         }
2137
2138         return 0;
2139 }
2140
2141 /*
2142   enable a disabled remote node
2143  */
2144 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2145 {
2146         int ret;
2147
2148         struct ctdb_node_map *nodemap=NULL;
2149
2150
2151         /* check if the node is already enabled */
2152         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2153                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2154                 exit(10);
2155         }
2156         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2157                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2158                 return 0;
2159         }
2160
2161         do {
2162                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2163                 if (ret != 0) {
2164                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2165                         return ret;
2166                 }
2167
2168                 sleep(1);
2169
2170                 /* read the nodemap and verify the change took effect */
2171                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2172                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2173                         exit(10);
2174                 }
2175
2176         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2177
2178         ret = control_ipreallocate(ctdb, argc, argv);
2179         if (ret != 0) {
2180                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2181                 return ret;
2182         }
2183
2184         return 0;
2185 }
2186
2187 /*
2188   stop a remote node
2189  */
2190 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2191 {
2192         int ret;
2193         struct ctdb_node_map *nodemap=NULL;
2194
2195         do {
2196                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2197                 if (ret != 0) {
2198                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2199                 }
2200         
2201                 sleep(1);
2202
2203                 /* read the nodemap and verify the change took effect */
2204                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2205                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2206                         exit(10);
2207                 }
2208
2209         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2210         ret = control_ipreallocate(ctdb, argc, argv);
2211         if (ret != 0) {
2212                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2213                 return ret;
2214         }
2215
2216         return 0;
2217 }
2218
2219 /*
2220   restart a stopped remote node
2221  */
2222 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2223 {
2224         int ret;
2225
2226         struct ctdb_node_map *nodemap=NULL;
2227
2228         do {
2229                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2230                 if (ret != 0) {
2231                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2232                         return ret;
2233                 }
2234         
2235                 sleep(1);
2236
2237                 /* read the nodemap and verify the change took effect */
2238                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2239                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2240                         exit(10);
2241                 }
2242
2243         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2244         ret = control_ipreallocate(ctdb, argc, argv);
2245         if (ret != 0) {
2246                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2247                 return ret;
2248         }
2249
2250         return 0;
2251 }
2252
2253 static uint32_t get_generation(struct ctdb_context *ctdb)
2254 {
2255         struct ctdb_vnn_map *vnnmap=NULL;
2256         int ret;
2257
2258         /* wait until the recmaster is not in recovery mode */
2259         while (1) {
2260                 uint32_t recmode, recmaster;
2261                 
2262                 if (vnnmap != NULL) {
2263                         talloc_free(vnnmap);
2264                         vnnmap = NULL;
2265                 }
2266
2267                 /* get the recmaster */
2268                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2269                 if (ret != 0) {
2270                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2271                         exit(10);
2272                 }
2273
2274                 /* get recovery mode */
2275                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2276                 if (ret != 0) {
2277                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2278                         exit(10);
2279                 }
2280
2281                 /* get the current generation number */
2282                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2283                 if (ret != 0) {
2284                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2285                         exit(10);
2286                 }
2287
2288                 if ((recmode == CTDB_RECOVERY_NORMAL)
2289                 &&  (vnnmap->generation != 1)){
2290                         return vnnmap->generation;
2291                 }
2292                 sleep(1);
2293         }
2294 }
2295
2296 /*
2297   ban a node from the cluster
2298  */
2299 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2300 {
2301         int ret;
2302         struct ctdb_node_map *nodemap=NULL;
2303         struct ctdb_ban_time bantime;
2304
2305         if (argc < 1) {
2306                 usage();
2307         }
2308         
2309         /* verify the node exists */
2310         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2313                 return ret;
2314         }
2315
2316         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2317                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2318                 return -1;
2319         }
2320
2321         bantime.pnn  = options.pnn;
2322         bantime.time = strtoul(argv[0], NULL, 0);
2323
2324         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2325         if (ret != 0) {
2326                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2327                 return -1;
2328         }       
2329
2330         ret = control_ipreallocate(ctdb, argc, argv);
2331         if (ret != 0) {
2332                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2333                 return ret;
2334         }
2335
2336         return 0;
2337 }
2338
2339
2340 /*
2341   unban a node from the cluster
2342  */
2343 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2344 {
2345         int ret;
2346         struct ctdb_node_map *nodemap=NULL;
2347         struct ctdb_ban_time bantime;
2348
2349         /* verify the node exists */
2350         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2351         if (ret != 0) {
2352                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2353                 return ret;
2354         }
2355
2356         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2357                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2358                 return -1;
2359         }
2360
2361         bantime.pnn  = options.pnn;
2362         bantime.time = 0;
2363
2364         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2367                 return -1;
2368         }       
2369
2370         ret = control_ipreallocate(ctdb, argc, argv);
2371         if (ret != 0) {
2372                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2373                 return ret;
2374         }
2375
2376         return 0;
2377 }
2378
2379
2380 /*
2381   show ban information for a node
2382  */
2383 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2384 {
2385         int ret;
2386         struct ctdb_node_map *nodemap=NULL;
2387         struct ctdb_ban_time *bantime;
2388
2389         /* verify the node exists */
2390         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2391         if (ret != 0) {
2392                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2393                 return ret;
2394         }
2395
2396         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2397         if (ret != 0) {
2398                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2399                 return -1;
2400         }       
2401
2402         if (bantime->time == 0) {
2403                 printf("Node %u is not banned\n", bantime->pnn);
2404         } else {
2405                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2406         }
2407
2408         return 0;
2409 }
2410
2411 /*
2412   shutdown a daemon
2413  */
2414 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2415 {
2416         int ret;
2417
2418         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2419         if (ret != 0) {
2420                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2421                 return ret;
2422         }
2423
2424         return 0;
2425 }
2426
2427 /*
2428   trigger a recovery
2429  */
2430 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2431 {
2432         int ret;
2433         uint32_t generation, next_generation;
2434
2435         /* record the current generation number */
2436         generation = get_generation(ctdb);
2437
2438         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2439         if (ret != 0) {
2440                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2441                 return ret;
2442         }
2443
2444         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2445         if (ret != 0) {
2446                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2447                 return ret;
2448         }
2449
2450         /* wait until we are in a new generation */
2451         while (1) {
2452                 next_generation = get_generation(ctdb);
2453                 if (next_generation != generation) {
2454                         return 0;
2455                 }
2456                 sleep(1);
2457         }
2458
2459         return 0;
2460 }
2461
2462
2463 /*
2464   display monitoring mode of a remote node
2465  */
2466 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2467 {
2468         uint32_t monmode;
2469         int ret;
2470
2471         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2472         if (ret != 0) {
2473                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2474                 return ret;
2475         }
2476         if (!options.machinereadable){
2477                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2478         } else {
2479                 printf(":mode:\n");
2480                 printf(":%d:\n",monmode);
2481         }
2482         return 0;
2483 }
2484
2485
2486 /*
2487   display capabilities of a remote node
2488  */
2489 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2490 {
2491         uint32_t capabilities;
2492         int ret;
2493
2494         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2495         if (ret != 0) {
2496                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2497                 return ret;
2498         }
2499         
2500         if (!options.machinereadable){
2501                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2502                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2503                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2504                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2505         } else {
2506                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2507                 printf(":%d:%d:%d:%d:\n",
2508                         !!(capabilities&CTDB_CAP_RECMASTER),
2509                         !!(capabilities&CTDB_CAP_LMASTER),
2510                         !!(capabilities&CTDB_CAP_LVS),
2511                         !!(capabilities&CTDB_CAP_NATGW));
2512         }
2513         return 0;
2514 }
2515
2516 /*
2517   display lvs configuration
2518  */
2519 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2520 {
2521         uint32_t *capabilities;
2522         struct ctdb_node_map *nodemap=NULL;
2523         int i, ret;
2524         int healthy_count = 0;
2525
2526         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2527         if (ret != 0) {
2528                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2529                 return ret;
2530         }
2531
2532         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2533         CTDB_NO_MEMORY(ctdb, capabilities);
2534         
2535         /* collect capabilities for all connected nodes */
2536         for (i=0; i<nodemap->num; i++) {
2537                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2538                         continue;
2539                 }
2540                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2541                         continue;
2542                 }
2543         
2544                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2545                 if (ret != 0) {
2546                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2547                         return ret;
2548                 }
2549
2550                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2551                         continue;
2552                 }
2553
2554                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2555                         healthy_count++;
2556                 }
2557         }
2558
2559         /* Print all LVS nodes */
2560         for (i=0; i<nodemap->num; i++) {
2561                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2562                         continue;
2563                 }
2564                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2565                         continue;
2566                 }
2567                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2568                         continue;
2569                 }
2570
2571                 if (healthy_count != 0) {
2572                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2573                                 continue;
2574                         }
2575                 }
2576
2577                 printf("%d:%s\n", i, 
2578                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2579         }
2580
2581         return 0;
2582 }
2583
2584 /*
2585   display who is the lvs master
2586  */
2587 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2588 {
2589         uint32_t *capabilities;
2590         struct ctdb_node_map *nodemap=NULL;
2591         int i, ret;
2592         int healthy_count = 0;
2593
2594         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2595         if (ret != 0) {
2596                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2597                 return ret;
2598         }
2599
2600         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2601         CTDB_NO_MEMORY(ctdb, capabilities);
2602         
2603         /* collect capabilities for all connected nodes */
2604         for (i=0; i<nodemap->num; i++) {
2605                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2606                         continue;
2607                 }
2608                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2609                         continue;
2610                 }
2611         
2612                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2613                 if (ret != 0) {
2614                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2615                         return ret;
2616                 }
2617
2618                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2619                         continue;
2620                 }
2621
2622                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2623                         healthy_count++;
2624                 }
2625         }
2626
2627         /* find and show the lvsmaster */
2628         for (i=0; i<nodemap->num; i++) {
2629                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2630                         continue;
2631                 }
2632                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2633                         continue;
2634                 }
2635                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2636                         continue;
2637                 }
2638
2639                 if (healthy_count != 0) {
2640                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2641                                 continue;
2642                         }
2643                 }
2644
2645                 if (options.machinereadable){
2646                         printf("%d\n", i);
2647                 } else {
2648                         printf("Node %d is LVS master\n", i);
2649                 }
2650                 return 0;
2651         }
2652
2653         printf("There is no LVS master\n");
2654         return -1;
2655 }
2656
2657 /*
2658   disable monitoring on a  node
2659  */
2660 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2661 {
2662         
2663         int ret;
2664
2665         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2666         if (ret != 0) {
2667                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2668                 return ret;
2669         }
2670         printf("Monitoring mode:%s\n","DISABLED");
2671
2672         return 0;
2673 }
2674
2675 /*
2676   enable monitoring on a  node
2677  */
2678 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2679 {
2680         
2681         int ret;
2682
2683         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2684         if (ret != 0) {
2685                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2686                 return ret;
2687         }
2688         printf("Monitoring mode:%s\n","ACTIVE");
2689
2690         return 0;
2691 }
2692
2693 /*
2694   display remote list of keys/data for a db
2695  */
2696 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2697 {
2698         const char *db_name;
2699         struct ctdb_db_context *ctdb_db;
2700         int ret;
2701
2702         if (argc < 1) {
2703                 usage();
2704         }
2705
2706         db_name = argv[0];
2707
2708
2709         if (db_exists(ctdb, db_name)) {
2710                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2711                 return -1;
2712         }
2713
2714         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2715
2716         if (ctdb_db == NULL) {
2717                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2718                 return -1;
2719         }
2720
2721         /* traverse and dump the cluster tdb */
2722         ret = ctdb_dump_db(ctdb_db, stdout);
2723         if (ret == -1) {
2724                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2725                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2726                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2727                                   db_name));
2728                 return -1;
2729         }
2730         talloc_free(ctdb_db);
2731
2732         printf("Dumped %d records\n", ret);
2733         return 0;
2734 }
2735
2736
2737 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2738                              TDB_DATA data, void *private_data)
2739 {
2740         DEBUG(DEBUG_ERR,("Log data received\n"));
2741         if (data.dsize > 0) {
2742                 printf("%s", data.dptr);
2743         }
2744
2745         exit(0);
2746 }
2747
2748 /*
2749   display a list of log messages from the in memory ringbuffer
2750  */
2751 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2752 {
2753         int ret;
2754         int32_t res;
2755         struct ctdb_get_log_addr log_addr;
2756         TDB_DATA data;
2757         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2758         char *errmsg;
2759         struct timeval tv;
2760
2761         if (argc != 1) {
2762                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2763                 talloc_free(tmp_ctx);
2764                 return -1;
2765         }
2766
2767         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2768         log_addr.srvid = getpid();
2769         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2770                 log_addr.level = get_debug_by_desc(argv[0]);
2771         } else {
2772                 log_addr.level = strtol(argv[0], NULL, 0);
2773         }
2774
2775
2776         data.dptr = (unsigned char *)&log_addr;
2777         data.dsize = sizeof(log_addr);
2778
2779         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2780
2781         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2782         sleep(1);
2783
2784         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2785
2786         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2787                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2788         if (ret != 0 || res != 0) {
2789                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2790                 talloc_free(tmp_ctx);
2791                 return -1;
2792         }
2793
2794
2795         tv = timeval_current();
2796         /* this loop will terminate when we have received the reply */
2797         while (timeval_elapsed(&tv) < 3.0) {    
2798                 event_loop_once(ctdb->ev);
2799         }
2800
2801         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2802
2803         talloc_free(tmp_ctx);
2804         return 0;
2805 }
2806
2807 /*
2808   clear the in memory log area
2809  */
2810 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2811 {
2812         int ret;
2813         int32_t res;
2814         char *errmsg;
2815         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2816
2817         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2818                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2819         if (ret != 0 || res != 0) {
2820                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2821                 talloc_free(tmp_ctx);
2822                 return -1;
2823         }
2824
2825         talloc_free(tmp_ctx);
2826         return 0;
2827 }
2828
2829
2830
2831 /*
2832   display a list of the databases on a remote ctdb
2833  */
2834 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2835 {
2836         int i, ret;
2837         struct ctdb_dbid_map *dbmap=NULL;
2838
2839         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2840         if (ret != 0) {
2841                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2842                 return ret;
2843         }
2844
2845         if(options.machinereadable){
2846                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2847                 for(i=0;i<dbmap->num;i++){
2848                         const char *path;
2849                         const char *name;
2850                         const char *health;
2851                         bool persistent;
2852
2853                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2854                                             dbmap->dbs[i].dbid, ctdb, &path);
2855                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2856                                             dbmap->dbs[i].dbid, ctdb, &name);
2857                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2858                                               dbmap->dbs[i].dbid, ctdb, &health);
2859                         persistent = dbmap->dbs[i].persistent;
2860                         printf(":0x%08X:%s:%s:%d:%d:\n",
2861                                dbmap->dbs[i].dbid, name, path,
2862                                !!(persistent), !!(health));
2863                 }
2864                 return 0;
2865         }
2866
2867         printf("Number of databases:%d\n", dbmap->num);
2868         for(i=0;i<dbmap->num;i++){
2869                 const char *path;
2870                 const char *name;
2871                 const char *health;
2872                 bool persistent;
2873
2874                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2875                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2876                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2877                 persistent = dbmap->dbs[i].persistent;
2878                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2879                        dbmap->dbs[i].dbid, name, path,
2880                        persistent?" PERSISTENT":"",
2881                        health?" UNHEALTHY":"");
2882         }
2883
2884         return 0;
2885 }
2886
2887 /*
2888   display the status of a database on a remote ctdb
2889  */
2890 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2891 {
2892         int i, ret;
2893         struct ctdb_dbid_map *dbmap=NULL;
2894         const char *db_name;
2895
2896         if (argc < 1) {
2897                 usage();
2898         }
2899
2900         db_name = argv[0];
2901
2902         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2903         if (ret != 0) {
2904                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2905                 return ret;
2906         }
2907
2908         for(i=0;i<dbmap->num;i++){
2909                 const char *path;
2910                 const char *name;
2911                 const char *health;
2912                 bool persistent;
2913
2914                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2915                 if (strcmp(name, db_name) != 0) {
2916                         continue;
2917                 }
2918
2919                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2920                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2921                 persistent = dbmap->dbs[i].persistent;
2922                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2923                        dbmap->dbs[i].dbid, name, path,
2924                        persistent?"yes":"no",
2925                        health?health:"OK");
2926                 return 0;
2927         }
2928
2929         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2930         return 0;
2931 }
2932
2933 /*
2934   check if the local node is recmaster or not
2935   it will return 1 if this node is the recmaster and 0 if it is not
2936   or if the local ctdb daemon could not be contacted
2937  */
2938 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2939 {
2940         uint32_t mypnn, recmaster;
2941         int ret;
2942
2943         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2944         if (mypnn == -1) {
2945                 printf("Failed to get pnn of node\n");
2946                 return 1;
2947         }
2948
2949         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2950         if (ret != 0) {
2951                 printf("Failed to get the recmaster\n");
2952                 return 1;
2953         }
2954
2955         if (recmaster != mypnn) {
2956                 printf("this node is not the recmaster\n");
2957                 return 1;
2958         }
2959
2960         printf("this node is the recmaster\n");
2961         return 0;
2962 }
2963
2964 /*
2965   ping a node
2966  */
2967 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2968 {
2969         int ret;
2970         struct timeval tv = timeval_current();
2971         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2972         if (ret == -1) {
2973                 printf("Unable to get ping response from node %u\n", options.pnn);
2974                 return -1;
2975         } else {
2976                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2977                        options.pnn, timeval_elapsed(&tv), ret);
2978         }
2979         return 0;
2980 }
2981
2982
2983 /*
2984   get a tunable
2985  */
2986 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2987 {
2988         const char *name;
2989         uint32_t value;
2990         int ret;
2991
2992         if (argc < 1) {
2993                 usage();
2994         }
2995
2996         name = argv[0];
2997         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2998         if (ret == -1) {
2999                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3000                 return -1;
3001         }
3002
3003         printf("%-19s = %u\n", name, value);
3004         return 0;
3005 }
3006
3007 /*
3008   set a tunable
3009  */
3010 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3011 {
3012         const char *name;
3013         uint32_t value;
3014         int ret;
3015
3016         if (argc < 2) {
3017                 usage();
3018         }
3019
3020         name = argv[0];
3021         value = strtoul(argv[1], NULL, 0);
3022
3023         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3024         if (ret == -1) {
3025                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3026                 return -1;
3027         }
3028         return 0;
3029 }
3030
3031 /*
3032   list all tunables
3033  */
3034 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3035 {
3036         uint32_t count;
3037         const char **list;
3038         int ret, i;
3039
3040         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3041         if (ret == -1) {
3042                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3043                 return -1;
3044         }
3045
3046         for (i=0;i<count;i++) {
3047                 control_getvar(ctdb, 1, &list[i]);
3048         }
3049
3050         talloc_free(list);
3051         
3052         return 0;
3053 }
3054
3055 /*
3056   display debug level on a node
3057  */
3058 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3059 {
3060         int ret;
3061         int32_t level;
3062
3063         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3064         if (ret != 0) {
3065                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3066                 return ret;
3067         } else {
3068                 if (options.machinereadable){
3069                         printf(":Name:Level:\n");
3070                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3071                 } else {
3072                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3073                 }
3074         }
3075         return 0;
3076 }
3077
3078 /*
3079   display reclock file of a node
3080  */
3081 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3082 {
3083         int ret;
3084         const char *reclock;
3085
3086         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3087         if (ret != 0) {
3088                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3089                 return ret;
3090         } else {
3091                 if (options.machinereadable){
3092                         if (reclock != NULL) {
3093                                 printf("%s", reclock);
3094                         }
3095                 } else {
3096                         if (reclock == NULL) {
3097                                 printf("No reclock file used.\n");
3098                         } else {
3099                                 printf("Reclock file:%s\n", reclock);
3100                         }
3101                 }
3102         }
3103         return 0;
3104 }
3105
3106 /*
3107   set the reclock file of a node
3108  */
3109 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3110 {
3111         int ret;
3112         const char *reclock;
3113
3114         if (argc == 0) {
3115                 reclock = NULL;
3116         } else if (argc == 1) {
3117                 reclock = argv[0];
3118         } else {
3119                 usage();
3120         }
3121
3122         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3123         if (ret != 0) {
3124                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3125                 return ret;
3126         }
3127         return 0;
3128 }
3129
3130 /*
3131   set the natgw state on/off
3132  */
3133 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3134 {
3135         int ret;
3136         uint32_t natgwstate;
3137
3138         if (argc == 0) {
3139                 usage();
3140         }
3141
3142         if (!strcmp(argv[0], "on")) {
3143                 natgwstate = 1;
3144         } else if (!strcmp(argv[0], "off")) {
3145                 natgwstate = 0;
3146         } else {
3147                 usage();
3148         }
3149
3150         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3151         if (ret != 0) {
3152                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3153                 return ret;
3154         }
3155
3156         return 0;
3157 }
3158
3159 /*
3160   set the lmaster role on/off
3161  */
3162 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3163 {
3164         int ret;
3165         uint32_t lmasterrole;
3166
3167         if (argc == 0) {
3168                 usage();
3169         }
3170
3171         if (!strcmp(argv[0], "on")) {
3172                 lmasterrole = 1;
3173         } else if (!strcmp(argv[0], "off")) {
3174                 lmasterrole = 0;
3175         } else {
3176                 usage();
3177         }
3178
3179         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3180         if (ret != 0) {
3181                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3182                 return ret;
3183         }
3184
3185         return 0;
3186 }
3187
3188 /*
3189   set the recmaster role on/off
3190  */
3191 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3192 {
3193         int ret;
3194         uint32_t recmasterrole;
3195
3196         if (argc == 0) {
3197                 usage();
3198         }
3199
3200         if (!strcmp(argv[0], "on")) {
3201                 recmasterrole = 1;
3202         } else if (!strcmp(argv[0], "off")) {
3203                 recmasterrole = 0;
3204         } else {
3205                 usage();
3206         }
3207
3208         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3209         if (ret != 0) {
3210                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3211                 return ret;
3212         }
3213
3214         return 0;
3215 }
3216
3217 /*
3218   set debug level on a node or all nodes
3219  */
3220 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3221 {
3222         int i, ret;
3223         int32_t level;
3224
3225         if (argc == 0) {
3226                 printf("You must specify the debug level. Valid levels are:\n");
3227                 for (i=0; debug_levels[i].description != NULL; i++) {
3228                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3229                 }
3230
3231                 return 0;
3232         }
3233
3234         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3235                 level = get_debug_by_desc(argv[0]);
3236         } else {
3237                 level = strtol(argv[0], NULL, 0);
3238         }
3239
3240         for (i=0; debug_levels[i].description != NULL; i++) {
3241                 if (level == debug_levels[i].level) {
3242                         break;
3243                 }
3244         }
3245         if (debug_levels[i].description == NULL) {
3246                 printf("Invalid debug level, must be one of\n");
3247                 for (i=0; debug_levels[i].description != NULL; i++) {
3248                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3249                 }
3250                 return -1;
3251         }
3252
3253         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3254         if (ret != 0) {
3255                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3256         }
3257         return 0;
3258 }
3259
3260
3261 /*
3262   freeze a node
3263  */
3264 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3265 {
3266         int ret;
3267         uint32_t priority;
3268         
3269         if (argc == 1) {
3270                 priority = strtol(argv[0], NULL, 0);
3271         } else {
3272                 priority = 0;
3273         }
3274         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3275
3276         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3277         if (ret != 0) {
3278                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3279         }               
3280         return 0;
3281 }
3282
3283 /*
3284   thaw a node
3285  */
3286 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3287 {
3288         int ret;
3289         uint32_t priority;
3290         
3291         if (argc == 1) {
3292                 priority = strtol(argv[0], NULL, 0);
3293         } else {
3294                 priority = 0;
3295         }
3296         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3297
3298         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3299         if (ret != 0) {
3300                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3301         }               
3302         return 0;
3303 }
3304
3305
3306 /*
3307   attach to a database
3308  */
3309 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3310 {
3311         const char *db_name;
3312         struct ctdb_db_context *ctdb_db;
3313
3314         if (argc < 1) {
3315                 usage();
3316         }
3317         db_name = argv[0];
3318
3319         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3320         if (ctdb_db == NULL) {
3321                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3322                 return -1;
3323         }
3324
3325         return 0;
3326 }
3327
3328 /*
3329   set db priority
3330  */
3331 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3332 {
3333         struct ctdb_db_priority db_prio;
3334         int ret;
3335
3336         if (argc < 2) {
3337                 usage();
3338         }
3339
3340         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3341         db_prio.priority = strtoul(argv[1], NULL, 0);
3342
3343         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3344         if (ret != 0) {
3345                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3346                 return -1;
3347         }
3348
3349         return 0;
3350 }
3351
3352 /*
3353   get db priority
3354  */
3355 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3356 {
3357         uint32_t db_id, priority;
3358         int ret;
3359
3360         if (argc < 1) {
3361                 usage();
3362         }
3363
3364         db_id = strtoul(argv[0], NULL, 0);
3365
3366         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3367         if (ret != 0) {
3368                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3369                 return -1;
3370         }
3371
3372         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3373
3374         return 0;
3375 }
3376
3377 /*
3378   run an eventscript on a node
3379  */
3380 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3381 {
3382         TDB_DATA data;
3383         int ret;
3384         int32_t res;
3385         char *errmsg;
3386         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3387
3388         if (argc != 1) {
3389                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3390                 return -1;
3391         }
3392
3393         data.dptr = (unsigned char *)discard_const(argv[0]);
3394         data.dsize = strlen((char *)data.dptr) + 1;
3395
3396         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3397
3398         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3399                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3400         if (ret != 0 || res != 0) {
3401                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3402                 talloc_free(tmp_ctx);
3403                 return -1;
3404         }
3405         talloc_free(tmp_ctx);
3406         return 0;
3407 }
3408
3409 #define DB_VERSION 1
3410 #define MAX_DB_NAME 64
3411 struct db_file_header {
3412         unsigned long version;
3413         time_t timestamp;
3414         unsigned long persistent;
3415         unsigned long size;
3416         const char name[MAX_DB_NAME];
3417 };
3418
3419 struct backup_data {
3420         struct ctdb_marshall_buffer *records;
3421         uint32_t len;
3422         uint32_t total;
3423         bool traverse_error;
3424 };
3425
3426 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3427 {
3428         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3429         struct ctdb_rec_data *rec;
3430
3431         /* add the record */
3432         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3433         if (rec == NULL) {
3434                 bd->traverse_error = true;
3435                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3436                 return -1;
3437         }
3438         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3439         if (bd->records == NULL) {
3440                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3441                 bd->traverse_error = true;
3442                 return -1;
3443         }
3444         bd->records->count++;
3445         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3446         bd->len += rec->length;
3447         talloc_free(rec);
3448
3449         bd->total++;
3450         return 0;
3451 }
3452
3453 /*
3454  * backup a database to a file 
3455  */
3456 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3457 {
3458         int i, ret;
3459         struct ctdb_dbid_map *dbmap=NULL;
3460         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3461         struct db_file_header dbhdr;
3462         struct ctdb_db_context *ctdb_db;
3463         struct backup_data *bd;
3464         int fh = -1;
3465         int status = -1;
3466         const char *reason = NULL;
3467
3468         if (argc != 2) {
3469                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3470                 return -1;
3471         }
3472
3473         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3474         if (ret != 0) {
3475                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3476                 return ret;
3477         }
3478
3479         for(i=0;i<dbmap->num;i++){
3480                 const char *name;
3481
3482                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3483                 if(!strcmp(argv[0], name)){
3484                         talloc_free(discard_const(name));
3485                         break;
3486                 }
3487                 talloc_free(discard_const(name));
3488         }
3489         if (i == dbmap->num) {
3490                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3491                 talloc_free(tmp_ctx);
3492                 return -1;
3493         }
3494
3495         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3496                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3497         if (ret != 0) {
3498                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3499                                  argv[0]));
3500                 talloc_free(tmp_ctx);
3501                 return -1;
3502         }
3503         if (reason) {
3504                 uint32_t allow_unhealthy = 0;
3505
3506                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3507                                       "AllowUnhealthyDBRead",
3508                                       &allow_unhealthy);
3509
3510                 if (allow_unhealthy != 1) {
3511                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3512                                          argv[0], reason));
3513
3514                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3515                                          allow_unhealthy));
3516                         talloc_free(tmp_ctx);
3517                         return -1;
3518                 }
3519
3520                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3521                                      argv[0], argv[0]));
3522                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3523                                      "tunnable AllowUnhealthyDBRead = %u\n",
3524                                      allow_unhealthy));
3525         }
3526
3527         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3528         if (ctdb_db == NULL) {
3529                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3530                 talloc_free(tmp_ctx);
3531                 return -1;
3532         }
3533
3534
3535         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3536         if (ret == -1) {
3537                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3538                 talloc_free(tmp_ctx);
3539                 return -1;
3540         }
3541
3542
3543         bd = talloc_zero(tmp_ctx, struct backup_data);
3544         if (bd == NULL) {
3545                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3546                 talloc_free(tmp_ctx);
3547                 return -1;
3548         }
3549
3550         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3551         if (bd->records == NULL) {
3552                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3553                 talloc_free(tmp_ctx);
3554                 return -1;
3555         }
3556
3557         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3558         bd->records->db_id = ctdb_db->db_id;
3559         /* traverse the database collecting all records */
3560         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3561             bd->traverse_error) {
3562                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3563                 talloc_free(tmp_ctx);
3564                 return -1;              
3565         }
3566
3567         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3568
3569
3570         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3571         if (fh == -1) {
3572                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3573                 talloc_free(tmp_ctx);
3574                 return -1;
3575         }
3576
3577         dbhdr.version = DB_VERSION;
3578         dbhdr.timestamp = time(NULL);
3579         dbhdr.persistent = dbmap->dbs[i].persistent;
3580         dbhdr.size = bd->len;
3581         if (strlen(argv[0]) >= MAX_DB_NAME) {
3582                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3583                 goto done;
3584         }
3585         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3586         ret = write(fh, &dbhdr, sizeof(dbhdr));
3587         if (ret == -1) {
3588                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3589                 goto done;
3590         }
3591         ret = write(fh, bd->records, bd->len);
3592         if (ret == -1) {
3593                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3594                 goto done;
3595         }
3596
3597         status = 0;
3598 done:
3599         if (fh != -1) {
3600                 ret = close(fh);
3601                 if (ret == -1) {
3602                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3603                 }
3604         }
3605         talloc_free(tmp_ctx);
3606         return status;
3607 }
3608
3609 /*
3610  * restore a database from a file 
3611  */
3612 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3613 {
3614         int ret;
3615         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3616         TDB_DATA outdata;
3617         TDB_DATA data;
3618         struct db_file_header dbhdr;
3619         struct ctdb_db_context *ctdb_db;
3620         struct ctdb_node_map *nodemap=NULL;
3621         struct ctdb_vnn_map *vnnmap=NULL;
3622         int i, fh;
3623         struct ctdb_control_wipe_database w;
3624         uint32_t *nodes;
3625         uint32_t generation;
3626         struct tm *tm;
3627         char tbuf[100];
3628         char *dbname;
3629
3630         if (argc < 1 || argc > 2) {
3631                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3632                 return -1;
3633         }
3634
3635         fh = open(argv[0], O_RDONLY);
3636         if (fh == -1) {
3637                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3638                 talloc_free(tmp_ctx);
3639                 return -1;
3640         }
3641
3642         read(fh, &dbhdr, sizeof(dbhdr));
3643         if (dbhdr.version != DB_VERSION) {
3644                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3645                 talloc_free(tmp_ctx);
3646                 return -1;
3647         }
3648
3649         dbname = dbhdr.name;
3650         if (argc == 2) {
3651                 dbname = argv[1];
3652         }
3653
3654         outdata.dsize = dbhdr.size;
3655         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3656         if (outdata.dptr == NULL) {
3657                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3658                 close(fh);
3659                 talloc_free(tmp_ctx);
3660                 return -1;
3661         }               
3662         read(fh, outdata.dptr, outdata.dsize);
3663         close(fh);
3664
3665         tm = localtime(&dbhdr.timestamp);
3666         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3667         printf("Restoring database '%s' from backup @ %s\n",
3668                 dbname, tbuf);
3669
3670
3671         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3672         if (ctdb_db == NULL) {
3673                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3674                 talloc_free(tmp_ctx);
3675                 return -1;
3676         }
3677
3678         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3679         if (ret != 0) {
3680                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3681                 talloc_free(tmp_ctx);
3682                 return ret;
3683         }
3684
3685
3686         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3687         if (ret != 0) {
3688                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3689                 talloc_free(tmp_ctx);
3690                 return ret;
3691         }
3692
3693         /* freeze all nodes */
3694         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3695         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3696                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3697                                         nodes, i,
3698                                         TIMELIMIT(),
3699                                         false, tdb_null,
3700                                         NULL, NULL,
3701                                         NULL) != 0) {
3702                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3703                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3704                         talloc_free(tmp_ctx);
3705                         return -1;
3706                 }
3707         }
3708
3709         generation = vnnmap->generation;
3710         data.dptr = (void *)&generation;
3711         data.dsize = sizeof(generation);
3712
3713         /* start a cluster wide transaction */
3714         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3715         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3716                                         nodes, 0,
3717                                         TIMELIMIT(), false, data,
3718                                         NULL, NULL,
3719                                         NULL) != 0) {
3720                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3721                 return -1;
3722         }
3723
3724
3725         w.db_id = ctdb_db->db_id;
3726         w.transaction_id = generation;
3727
3728         data.dptr = (void *)&w;
3729         data.dsize = sizeof(w);
3730
3731         /* wipe all the remote databases. */
3732         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3733         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3734                                         nodes, 0,
3735                                         TIMELIMIT(), false, data,
3736                                         NULL, NULL,
3737                                         NULL) != 0) {
3738                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3739                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3740                 talloc_free(tmp_ctx);
3741                 return -1;
3742         }
3743         
3744         /* push the database */
3745         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3746         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3747                                         nodes, 0,
3748                                         TIMELIMIT(), false, outdata,
3749                                         NULL, NULL,
3750                                         NULL) != 0) {
3751                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3752                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3753                 talloc_free(tmp_ctx);
3754                 return -1;
3755         }
3756
3757         data.dptr = (void *)&ctdb_db->db_id;
3758         data.dsize = sizeof(ctdb_db->db_id);
3759
3760         /* mark the database as healthy */
3761         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3762         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3763                                         nodes, 0,
3764                                         TIMELIMIT(), false, data,
3765                                         NULL, NULL,
3766                                         NULL) != 0) {
3767                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3768                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3769                 talloc_free(tmp_ctx);
3770                 return -1;
3771         }
3772
3773         data.dptr = (void *)&generation;
3774         data.dsize = sizeof(generation);
3775
3776         /* commit all the changes */
3777         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3778                                         nodes, 0,
3779                                         TIMELIMIT(), false, data,
3780                                         NULL, NULL,
3781                                         NULL) != 0) {
3782                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3783                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3784                 talloc_free(tmp_ctx);
3785                 return -1;
3786         }
3787
3788
3789         /* thaw all nodes */
3790         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3791         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3792                                         nodes, 0,
3793                                         TIMELIMIT(),
3794                                         false, tdb_null,
3795                                         NULL, NULL,
3796                                         NULL) != 0) {
3797                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3798                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3799                 talloc_free(tmp_ctx);
3800                 return -1;
3801         }
3802
3803
3804         talloc_free(tmp_ctx);
3805         return 0;
3806 }
3807
3808 /*
3809  * dump a database backup from a file
3810  */
3811 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3812 {
3813         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3814         TDB_DATA outdata;
3815         struct db_file_header dbhdr;
3816         int i, fh;
3817         struct tm *tm;
3818         char tbuf[100];
3819         struct ctdb_rec_data *rec = NULL;
3820         struct ctdb_marshall_buffer *m;
3821
3822         if (argc != 1) {
3823                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3824                 return -1;
3825         }
3826
3827         fh = open(argv[0], O_RDONLY);
3828         if (fh == -1) {
3829                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3830                 talloc_free(tmp_ctx);
3831                 return -1;
3832         }
3833
3834         read(fh, &dbhdr, sizeof(dbhdr));
3835         if (dbhdr.version != DB_VERSION) {
3836                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3837                 talloc_free(tmp_ctx);
3838                 return -1;
3839         }
3840
3841         outdata.dsize = dbhdr.size;
3842         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3843         if (outdata.dptr == NULL) {
3844                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3845                 close(fh);
3846                 talloc_free(tmp_ctx);
3847                 return -1;
3848         }
3849         read(fh, outdata.dptr, outdata.dsize);
3850         close(fh);
3851         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3852
3853         tm = localtime(&dbhdr.timestamp);
3854         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3855         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3856                 dbhdr.name, m->db_id, tbuf);
3857
3858         for (i=0; i < m->count; i++) {
3859                 uint32_t reqid = 0;
3860                 TDB_DATA key, data;
3861
3862                 /* we do not want the header splitted, so we pass NULL*/
3863                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3864                                               NULL, &key, &data);
3865
3866                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3867         }
3868
3869         printf("Dumped %d records\n", i);
3870         talloc_free(tmp_ctx);
3871         return 0;
3872 }
3873
3874 /*
3875  * wipe a database from a file
3876  */
3877 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3878                           const char **argv)
3879 {
3880         int ret;
3881         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3882         TDB_DATA data;
3883         struct ctdb_db_context *ctdb_db;
3884         struct ctdb_node_map *nodemap = NULL;
3885         struct ctdb_vnn_map *vnnmap = NULL;
3886         int i;
3887         struct ctdb_control_wipe_database w;
3888         uint32_t *nodes;
3889         uint32_t generation;
3890         struct ctdb_dbid_map *dbmap = NULL;
3891
3892         if (argc != 1) {
3893                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3894                 return -1;
3895         }
3896
3897         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3898                                  &dbmap);
3899         if (ret != 0) {
3900                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3901                                   options.pnn));
3902                 return ret;
3903         }
3904
3905         for(i=0;i<dbmap->num;i++){
3906                 const char *name;
3907
3908                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3909                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3910                 if(!strcmp(argv[0], name)){
3911                         talloc_free(discard_const(name));
3912                         break;
3913                 }
3914                 talloc_free(discard_const(name));
3915         }
3916         if (i == dbmap->num) {
3917                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3918                                   argv[0]));
3919                 talloc_free(tmp_ctx);
3920                 return -1;
3921         }
3922
3923         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3924         if (ctdb_db == NULL) {
3925                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3926                                   argv[0]));
3927                 talloc_free(tmp_ctx);
3928                 return -1;
3929         }
3930
3931         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3932                                    &nodemap);
3933         if (ret != 0) {
3934                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3935                                   options.pnn));
3936                 talloc_free(tmp_ctx);
3937                 return ret;
3938         }
3939
3940         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3941                                   &vnnmap);
3942         if (ret != 0) {
3943                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3944                                   options.pnn));
3945                 talloc_free(tmp_ctx);
3946                 return ret;
3947         }
3948
3949         /* freeze all nodes */
3950         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3951         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3952                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3953                                                 nodes, i,
3954                                                 TIMELIMIT(),
3955                                                 false, tdb_null,
3956                                                 NULL, NULL,
3957                                                 NULL);
3958                 if (ret != 0) {
3959                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3960                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3961                                              CTDB_RECOVERY_ACTIVE);
3962                         talloc_free(tmp_ctx);
3963                         return -1;
3964                 }
3965         }
3966
3967         generation = vnnmap->generation;
3968         data.dptr = (void *)&generation;
3969         data.dsize = sizeof(generation);
3970
3971         /* start a cluster wide transaction */
3972         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3973         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3974                                         nodes, 0,
3975                                         TIMELIMIT(), false, data,
3976                                         NULL, NULL,
3977                                         NULL);
3978         if (ret!= 0) {
3979                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3980                                   "transactions.\n"));
3981                 return -1;
3982         }
3983
3984         w.db_id = ctdb_db->db_id;
3985         w.transaction_id = generation;
3986
3987         data.dptr = (void *)&w;
3988         data.dsize = sizeof(w);
3989
3990         /* wipe all the remote databases. */
3991         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3992         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3993                                         nodes, 0,
3994                                         TIMELIMIT(), false, data,
3995                                         NULL, NULL,
3996                                         NULL) != 0) {
3997                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3998                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3999                 talloc_free(tmp_ctx);
4000                 return -1;
4001         }
4002
4003         data.dptr = (void *)&ctdb_db->db_id;
4004         data.dsize = sizeof(ctdb_db->db_id);
4005
4006         /* mark the database as healthy */
4007         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4008         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4009                                         nodes, 0,
4010                                         TIMELIMIT(), false, data,
4011                                         NULL, NULL,
4012                                         NULL) != 0) {
4013                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4014                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4015                 talloc_free(tmp_ctx);
4016                 return -1;
4017         }
4018
4019         data.dptr = (void *)&generation;
4020         data.dsize = sizeof(generation);
4021
4022         /* commit all the changes */
4023         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4024                                         nodes, 0,
4025                                         TIMELIMIT(), false, data,
4026                                         NULL, NULL,
4027                                         NULL) != 0) {
4028                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4029                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4030                 talloc_free(tmp_ctx);
4031                 return -1;
4032         }
4033
4034         /* thaw all nodes */
4035         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4036         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4037                                         nodes, 0,
4038                                         TIMELIMIT(),
4039                                         false, tdb_null,
4040                                         NULL, NULL,
4041                                         NULL) != 0) {
4042                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4043                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4044                 talloc_free(tmp_ctx);
4045                 return -1;
4046         }
4047
4048         talloc_free(tmp_ctx);
4049         return 0;
4050 }
4051
4052 /*
4053  * set flags of a node in the nodemap
4054  */
4055 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4056 {
4057         int ret;
4058         int32_t status;
4059         int node;
4060         int flags;
4061         TDB_DATA data;
4062         struct ctdb_node_flag_change c;
4063
4064         if (argc != 2) {
4065                 usage();
4066                 return -1;
4067         }
4068
4069         if (sscanf(argv[0], "%d", &node) != 1) {
4070                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4071                 usage();
4072                 return -1;
4073         }
4074         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4075                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4076                 usage();
4077                 return -1;
4078         }
4079
4080         c.pnn       = node;
4081         c.old_flags = 0;
4082         c.new_flags = flags;
4083
4084         data.dsize = sizeof(c);
4085         data.dptr = (unsigned char *)&c;
4086
4087         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4088                            data, NULL, NULL, &status, NULL, NULL);
4089         if (ret != 0 || status != 0) {
4090                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4091                 return -1;
4092         }
4093         return 0;
4094 }
4095
4096 /*
4097   dump memory usage
4098  */
4099 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4100 {
4101         TDB_DATA data;
4102         int ret;
4103         int32_t res;
4104         char *errmsg;
4105         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4106         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4107                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4108         if (ret != 0 || res != 0) {
4109                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4110                 talloc_free(tmp_ctx);
4111                 return -1;
4112         }
4113         write(1, data.dptr, data.dsize);
4114         talloc_free(tmp_ctx);
4115         return 0;
4116 }
4117
4118 /*
4119   handler for memory dumps
4120 */
4121 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4122                              TDB_DATA data, void *private_data)
4123 {
4124         write(1, data.dptr, data.dsize);
4125         exit(0);
4126 }
4127
4128 /*
4129   dump memory usage on the recovery daemon
4130  */
4131 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4132 {
4133         int ret;
4134         TDB_DATA data;
4135         struct rd_memdump_reply rd;
4136
4137         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4138         if (rd.pnn == -1) {
4139                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4140                 return -1;
4141         }
4142         rd.srvid = getpid();
4143
4144         /* register a message port for receiveing the reply so that we
4145            can receive the reply
4146         */
4147         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4148
4149
4150         data.dptr = (uint8_t *)&rd;
4151         data.dsize = sizeof(rd);
4152
4153         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4154         if (ret != 0) {
4155                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4156                 return -1;
4157         }
4158
4159         /* this loop will terminate when we have received the reply */
4160         while (1) {     
4161                 event_loop_once(ctdb->ev);
4162         }
4163
4164         return 0;
4165 }
4166
4167 /*
4168   send a message to a srvid
4169  */
4170 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4171 {
4172         unsigned long srvid;
4173         int ret;
4174         TDB_DATA data;
4175
4176         if (argc < 2) {
4177                 usage();
4178         }
4179
4180         srvid      = strtoul(argv[0], NULL, 0);
4181
4182         data.dptr = (uint8_t *)discard_const(argv[1]);
4183         data.dsize= strlen(argv[1]);
4184
4185         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4186         if (ret != 0) {
4187                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4188                 return -1;
4189         }
4190
4191         return 0;
4192 }
4193
4194 /*
4195   handler for msglisten
4196 */
4197 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4198                              TDB_DATA data, void *private_data)
4199 {
4200         int i;
4201
4202         printf("Message received: ");
4203         for (i=0;i<data.dsize;i++) {
4204                 printf("%c", data.dptr[i]);
4205         }
4206         printf("\n");
4207 }
4208
4209 /*
4210   listen for messages on a messageport
4211  */
4212 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4213 {
4214         uint64_t srvid;
4215
4216         srvid = getpid();
4217
4218         /* register a message port and listen for messages
4219         */
4220         ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4221         printf("Listening for messages on srvid:%d\n", (int)srvid);
4222
4223         while (1) {     
4224                 event_loop_once(ctdb->ev);
4225         }
4226
4227         return 0;
4228 }
4229
4230 /*
4231   list all nodes in the cluster
4232   if the daemon is running, we read the data from the daemon.
4233   if the daemon is not running we parse the nodes file directly
4234  */
4235 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4236 {
4237         int i, ret;
4238         struct ctdb_node_map *nodemap=NULL;
4239
4240         if (ctdb != NULL) {
4241                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4242                 if (ret != 0) {
4243                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4244                         return ret;
4245                 }
4246
4247                 for(i=0;i<nodemap->num;i++){
4248                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4249                                 continue;
4250                         }
4251                         if (options.machinereadable){
4252                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4253                         } else {
4254                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4255                         }
4256                 }
4257         } else {
4258                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4259                 struct pnn_node *pnn_nodes;
4260                 struct pnn_node *pnn_node;
4261         
4262                 pnn_nodes = read_nodes_file(mem_ctx);
4263                 if (pnn_nodes == NULL) {
4264                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4265                         talloc_free(mem_ctx);
4266                         return -1;
4267                 }
4268
4269                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4270                         ctdb_sock_addr addr;
4271
4272                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4273                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4274                                 talloc_free(mem_ctx);
4275                                 return -1;
4276                         }
4277
4278                         if (options.machinereadable){
4279                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4280                         } else {
4281                                 printf("%s\n", pnn_node->addr);
4282                         }
4283                 }
4284                 talloc_free(mem_ctx);
4285         }
4286
4287         return 0;
4288 }
4289
4290 /*
4291   reload the nodes file on the local node
4292  */
4293 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4294 {
4295         int i, ret;
4296         int mypnn;
4297         struct ctdb_node_map *nodemap=NULL;
4298
4299         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4300         if (mypnn == -1) {
4301                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4302                 return -1;
4303         }
4304
4305         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4306         if (ret != 0) {
4307                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4308                 return ret;
4309         }
4310
4311         /* reload the nodes file on all remote nodes */
4312         for (i=0;i<nodemap->num;i++) {
4313                 if (nodemap->nodes[i].pnn == mypnn) {
4314                         continue;
4315                 }
4316                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4317                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4318                         nodemap->nodes[i].pnn);
4319                 if (ret != 0) {
4320                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4321                 }
4322         }
4323
4324         /* reload the nodes file on the local node */
4325         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4326         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4327         if (ret != 0) {
4328                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4329         }
4330
4331         /* initiate a recovery */
4332         control_recover(ctdb, argc, argv);
4333
4334         return 0;
4335 }
4336
4337
4338 static const struct {
4339         const char *name;
4340         int (*fn)(struct ctdb_context *, int, const char **);
4341         bool auto_all;
4342         bool without_daemon; /* can be run without daemon running ? */
4343         const char *msg;
4344         const char *args;
4345 } ctdb_commands[] = {
4346 #ifdef CTDB_VERS
4347         { "version",         control_version,           true,   false,  "show version of ctdb" },
4348 #endif
4349         { "status",          control_status,            true,   false,  "show node status" },
4350         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4351         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4352         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4353         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4354         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4355         { "statistics",      control_statistics,        false,  false, "show statistics" },
4356         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4357         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4358         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4359         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4360         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4361         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4362         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4363         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4364         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4365         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4366         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4367         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4368         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4369         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4370         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4371         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4372         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4373         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4374         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4375         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4376         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4377         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4378         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4379         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4380         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4381         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4382         { "stop",            control_stop,              true,   false,  "stop a node" },
4383         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4384         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4385         { "unban",           control_unban,             true,   false,  "unban a node" },
4386         { "showban",         control_showban,           true,   false,  "show ban information"},
4387         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4388         { "recover",         control_recover,           true,   false,  "force recovery" },
4389         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4390         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4391         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4392         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4393         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4394         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4395         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4396         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4397
4398         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4399         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4400         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4401         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4402         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4403         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4404         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4405         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4406         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4407         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4408         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4409         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4410         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4411         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4412         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4413         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4414         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4415         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4416         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4417         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4418         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4419         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4420         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4421         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4422         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4423         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4424         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4425         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4426         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4427         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4428         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4429         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4430 };
4431
4432 /*
4433   show usage message
4434  */
4435 static void usage(void)
4436 {
4437         int i;
4438         printf(
4439 "Usage: ctdb [options] <control>\n" \
4440 "Options:\n" \
4441 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4442 "   -Y                 generate machinereadable output\n"
4443 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4444         printf("Controls:\n");
4445         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4446                 printf("  %-15s %-27s  %s\n", 
4447                        ctdb_commands[i].name, 
4448                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4449                        ctdb_commands[i].msg);
4450         }
4451         exit(1);
4452 }
4453
4454
4455 static void ctdb_alarm(int sig)
4456 {
4457         printf("Maximum runtime exceeded - exiting\n");
4458         _exit(ERR_TIMEOUT);
4459 }
4460
4461 /*
4462   main program
4463 */
4464 int main(int argc, const char *argv[])
4465 {
4466         struct ctdb_context *ctdb;
4467         char *nodestring = NULL;
4468         struct poptOption popt_options[] = {
4469                 POPT_AUTOHELP
4470                 POPT_CTDB_CMDLINE
4471                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4472                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4473                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4474                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4475                 POPT_TABLEEND
4476         };
4477         int opt;
4478         const char **extra_argv;
4479         int extra_argc = 0;
4480         int ret=-1, i;
4481         poptContext pc;
4482         struct event_context *ev;
4483         const char *control;
4484
4485         setlinebuf(stdout);
4486         
4487         /* set some defaults */
4488         options.maxruntime = 0;
4489         options.timelimit = 3;
4490         options.pnn = CTDB_CURRENT_NODE;
4491
4492         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4493
4494         while ((opt = poptGetNextOpt(pc)) != -1) {
4495                 switch (opt) {
4496                 default:
4497                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4498                                 poptBadOption(pc, 0), poptStrerror(opt)));
4499                         exit(1);
4500                 }
4501         }
4502
4503         /* setup the remaining options for the main program to use */
4504         extra_argv = poptGetArgs(pc);
4505         if (extra_argv) {
4506                 extra_argv++;
4507                 while (extra_argv[extra_argc]) extra_argc++;
4508         }
4509
4510         if (extra_argc < 1) {
4511                 usage();
4512         }
4513
4514         if (options.maxruntime == 0) {
4515                 const char *ctdb_timeout;
4516                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4517                 if (ctdb_timeout != NULL) {
4518                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4519                 } else {
4520                         /* default timeout is 120 seconds */
4521                         options.maxruntime = 120;
4522                 }
4523         }
4524
4525         signal(SIGALRM, ctdb_alarm);
4526         alarm(options.maxruntime);
4527
4528         /* setup the node number to contact */
4529         if (nodestring != NULL) {
4530                 if (strcmp(nodestring, "all") == 0) {
4531                         options.pnn = CTDB_BROADCAST_ALL;
4532                 } else {
4533                         options.pnn = strtoul(nodestring, NULL, 0);
4534                 }
4535         }
4536
4537         control = extra_argv[0];
4538
4539         ev = event_context_init(NULL);
4540
4541         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4542                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4543                         int j;
4544
4545                         if (ctdb_commands[i].without_daemon == true) {
4546                                 close(2);
4547                         }
4548
4549                         /* initialise ctdb */
4550                         ctdb = ctdb_cmdline_client(ev);
4551
4552                         if (ctdb_commands[i].without_daemon == false) {
4553                                 if (ctdb == NULL) {
4554                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4555                                         exit(1);
4556                                 }
4557
4558                                 /* verify the node exists */
4559                                 verify_node(ctdb);
4560
4561                                 if (options.pnn == CTDB_CURRENT_NODE) {
4562                                         int pnn;
4563                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4564                                         if (pnn == -1) {
4565                                                 return -1;
4566                                         }
4567                                         options.pnn = pnn;
4568                                 }
4569                         }
4570
4571                         if (ctdb_commands[i].auto_all && 
4572                             options.pnn == CTDB_BROADCAST_ALL) {
4573                                 uint32_t *nodes;
4574                                 uint32_t num_nodes;
4575                                 ret = 0;
4576
4577                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4578                                 CTDB_NO_MEMORY(ctdb, nodes);
4579         
4580                                 for (j=0;j<num_nodes;j++) {
4581                                         options.pnn = nodes[j];
4582                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4583                                 }
4584                                 talloc_free(nodes);
4585                         } else {
4586                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4587                         }
4588                         break;
4589                 }
4590         }
4591
4592         if (i == ARRAY_SIZE(ctdb_commands)) {
4593                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4594                 exit(1);
4595         }
4596
4597         return ret;
4598 }