add two new debug controls to send and receive messages
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         int partially_online = 0;
508                         int j;
509
510                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
511                                 continue;
512                         }
513                         if (nodemap->nodes[i].flags == 0) {
514                                 struct ctdb_control_get_ifaces *ifaces;
515
516                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
517                                                            nodemap->nodes[i].pnn,
518                                                            ctdb, &ifaces);
519                                 if (ret == 0) {
520                                         for (j=0; j < ifaces->num; j++) {
521                                                 if (ifaces->ifaces[j].link_state != 0) {
522                                                         continue;
523                                                 }
524                                                 partially_online = 1;
525                                                 break;
526                                         }
527                                         talloc_free(ifaces);
528                                 }
529                         }
530                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
531                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
532                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
533                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
538                                partially_online);
539                 }
540                 return 0;
541         }
542
543         printf("Number of nodes:%d\n", nodemap->num);
544         for(i=0;i<nodemap->num;i++){
545                 static const struct {
546                         uint32_t flag;
547                         const char *name;
548                 } flag_names[] = {
549                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
550                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
551                         { NODE_FLAGS_BANNED,                "BANNED" },
552                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
553                         { NODE_FLAGS_DELETED,               "DELETED" },
554                         { NODE_FLAGS_STOPPED,               "STOPPED" },
555                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
556                 };
557                 char *flags_str = NULL;
558                 int j;
559
560                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
561                         continue;
562                 }
563                 if (nodemap->nodes[i].flags == 0) {
564                         struct ctdb_control_get_ifaces *ifaces;
565
566                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
567                                                    nodemap->nodes[i].pnn,
568                                                    ctdb, &ifaces);
569                         if (ret == 0) {
570                                 for (j=0; j < ifaces->num; j++) {
571                                         if (ifaces->ifaces[j].link_state != 0) {
572                                                 continue;
573                                         }
574                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
575                                         break;
576                                 }
577                                 talloc_free(ifaces);
578                         }
579                 }
580                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
581                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
582                                 if (flags_str == NULL) {
583                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
584                                 } else {
585                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
586                                                                            flag_names[j].name);
587                                 }
588                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
589                         }
590                 }
591                 if (flags_str == NULL) {
592                         flags_str = talloc_strdup(ctdb, "OK");
593                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
594                 }
595                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
596                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
597                        flags_str,
598                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
599                 talloc_free(flags_str);
600         }
601
602         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
603         if (ret != 0) {
604                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
605                 return ret;
606         }
607         if (vnnmap->generation == INVALID_GENERATION) {
608                 printf("Generation:INVALID\n");
609         } else {
610                 printf("Generation:%d\n",vnnmap->generation);
611         }
612         printf("Size:%d\n",vnnmap->size);
613         for(i=0;i<vnnmap->size;i++){
614                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
615         }
616
617         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
618         if (ret != 0) {
619                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
620                 return ret;
621         }
622         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
623
624         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
625         if (ret != 0) {
626                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
627                 return ret;
628         }
629         printf("Recovery master:%d\n",recmaster);
630
631         return 0;
632 }
633
634
635 struct natgw_node {
636         struct natgw_node *next;
637         const char *addr;
638 };
639
640 /*
641   display the list of nodes belonging to this natgw configuration
642  */
643 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
644 {
645         int i, ret;
646         const char *natgw_list;
647         int nlines;
648         char **lines;
649         struct natgw_node *natgw_nodes = NULL;
650         struct natgw_node *natgw_node;
651         struct ctdb_node_map *nodemap=NULL;
652
653
654         /* read the natgw nodes file into a linked list */
655         natgw_list = getenv("NATGW_NODES");
656         if (natgw_list == NULL) {
657                 natgw_list = "/etc/ctdb/natgw_nodes";
658         }
659         lines = file_lines_load(natgw_list, &nlines, ctdb);
660         if (lines == NULL) {
661                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
662                 return -1;
663         }
664         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
665                 nlines--;
666         }
667         for (i=0;i<nlines;i++) {
668                 char *node;
669
670                 node = lines[i];
671                 /* strip leading spaces */
672                 while((*node == ' ') || (*node == '\t')) {
673                         node++;
674                 }
675                 if (*node == '#') {
676                         continue;
677                 }
678                 if (strcmp(node, "") == 0) {
679                         continue;
680                 }
681                 natgw_node = talloc(ctdb, struct natgw_node);
682                 natgw_node->addr = talloc_strdup(natgw_node, node);
683                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
684                 natgw_node->next = natgw_nodes;
685                 natgw_nodes = natgw_node;
686         }
687
688         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
689         if (ret != 0) {
690                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
691                 return ret;
692         }
693
694         i=0;
695         while(i<nodemap->num) {
696                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
697                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
698                                 break;
699                         }
700                 }
701
702                 /* this node was not in the natgw so we just remove it from
703                  * the list
704                  */
705                 if ((natgw_node == NULL) 
706                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
707                         int j;
708
709                         for (j=i+1; j<nodemap->num; j++) {
710                                 nodemap->nodes[j-1] = nodemap->nodes[j];
711                         }
712                         nodemap->num--;
713                         continue;
714                 }
715
716                 i++;
717         }               
718
719         /* pick a node to be natgwmaster
720          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
721          */
722         for(i=0;i<nodemap->num;i++){
723                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
724                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
725                         break;
726                 }
727         }
728         /* we couldnt find any healthy node, try unhealthy ones */
729         if (i == nodemap->num) {
730                 for(i=0;i<nodemap->num;i++){
731                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
732                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
733                                 break;
734                         }
735                 }
736         }
737         /* unless all nodes are STOPPED, when we pick one anyway */
738         if (i == nodemap->num) {
739                 for(i=0;i<nodemap->num;i++){
740                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
741                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
742                                 break;
743                         }
744                 }
745                 /* or if we still can not find any */
746                 if (i == nodemap->num) {
747                         printf("-1 0.0.0.0\n");
748                 }
749         }
750
751         /* print the pruned list of nodes belonging to this natgw list */
752         for(i=0;i<nodemap->num;i++){
753                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
754                         continue;
755                 }
756                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
757                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
758                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
759                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
760                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
761                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
762                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
763         }
764
765         return 0;
766 }
767
768 /*
769   display the status of the scripts for monitoring (or other events)
770  */
771 static int control_one_scriptstatus(struct ctdb_context *ctdb,
772                                     enum ctdb_eventscript_call type)
773 {
774         struct ctdb_scripts_wire *script_status;
775         int ret, i;
776
777         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
778         if (ret != 0) {
779                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
780                 return ret;
781         }
782
783         if (script_status == NULL) {
784                 if (!options.machinereadable) {
785                         printf("%s cycle never run\n",
786                                ctdb_eventscript_call_names[type]);
787                 }
788                 return 0;
789         }
790
791         if (!options.machinereadable) {
792                 printf("%d scripts were executed last %s cycle\n",
793                        script_status->num_scripts,
794                        ctdb_eventscript_call_names[type]);
795         }
796         for (i=0; i<script_status->num_scripts; i++) {
797                 const char *status = NULL;
798
799                 switch (script_status->scripts[i].status) {
800                 case -ETIME:
801                         status = "TIMEDOUT";
802                         break;
803                 case -ENOEXEC:
804                         status = "DISABLED";
805                         break;
806                 case 0:
807                         status = "OK";
808                         break;
809                 default:
810                         if (script_status->scripts[i].status > 0)
811                                 status = "ERROR";
812                         break;
813                 }
814                 if (options.machinereadable) {
815                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
816                                ctdb_eventscript_call_names[type],
817                                script_status->scripts[i].name,
818                                script_status->scripts[i].status,
819                                status,
820                                (long)script_status->scripts[i].start.tv_sec,
821                                (long)script_status->scripts[i].start.tv_usec,
822                                (long)script_status->scripts[i].finished.tv_sec,
823                                (long)script_status->scripts[i].finished.tv_usec,
824                                script_status->scripts[i].output);
825                         continue;
826                 }
827                 if (status)
828                         printf("%-20s Status:%s    ",
829                                script_status->scripts[i].name, status);
830                 else
831                         /* Some other error, eg from stat. */
832                         printf("%-20s Status:CANNOT RUN (%s)",
833                                script_status->scripts[i].name,
834                                strerror(-script_status->scripts[i].status));
835
836                 if (script_status->scripts[i].status >= 0) {
837                         printf("Duration:%.3lf ",
838                         timeval_delta(&script_status->scripts[i].finished,
839                               &script_status->scripts[i].start));
840                 }
841                 if (script_status->scripts[i].status != -ENOEXEC) {
842                         printf("%s",
843                                ctime(&script_status->scripts[i].start.tv_sec));
844                         if (script_status->scripts[i].status != 0) {
845                                 printf("   OUTPUT:%s\n",
846                                        script_status->scripts[i].output);
847                         }
848                 } else {
849                         printf("\n");
850                 }
851         }
852         return 0;
853 }
854
855
856 static int control_scriptstatus(struct ctdb_context *ctdb,
857                                 int argc, const char **argv)
858 {
859         int ret;
860         enum ctdb_eventscript_call type, min, max;
861         const char *arg;
862
863         if (argc > 1) {
864                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
865                 return -1;
866         }
867
868         if (argc == 0)
869                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
870         else
871                 arg = argv[0];
872
873         for (type = 0; type < CTDB_EVENT_MAX; type++) {
874                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
875                         min = type;
876                         max = type+1;
877                         break;
878                 }
879         }
880         if (type == CTDB_EVENT_MAX) {
881                 if (strcmp(arg, "all") == 0) {
882                         min = 0;
883                         max = CTDB_EVENT_MAX;
884                 } else {
885                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
886                         return -1;
887                 }
888         }
889
890         if (options.machinereadable) {
891                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
892         }
893
894         for (type = min; type < max; type++) {
895                 ret = control_one_scriptstatus(ctdb, type);
896                 if (ret != 0) {
897                         return ret;
898                 }
899         }
900
901         return 0;
902 }
903
904 /*
905   enable an eventscript
906  */
907 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
908 {
909         int ret;
910
911         if (argc < 1) {
912                 usage();
913         }
914
915         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
916         if (ret != 0) {
917           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
918                 return ret;
919         }
920
921         return 0;
922 }
923
924 /*
925   disable an eventscript
926  */
927 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
928 {
929         int ret;
930
931         if (argc < 1) {
932                 usage();
933         }
934
935         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
936         if (ret != 0) {
937           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
938                 return ret;
939         }
940
941         return 0;
942 }
943
944 /*
945   display the pnn of the recovery master
946  */
947 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
948 {
949         int ret;
950         uint32_t recmaster;
951
952         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
953         if (ret != 0) {
954                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
955                 return ret;
956         }
957         printf("%d\n",recmaster);
958
959         return 0;
960 }
961
962 /*
963   get a list of all tickles for this pnn
964  */
965 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
966 {
967         struct ctdb_control_tcp_tickle_list *list;
968         ctdb_sock_addr addr;
969         int i, ret;
970
971         if (argc < 1) {
972                 usage();
973         }
974
975         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
976                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
977                 return -1;
978         }
979
980         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
981         if (ret == -1) {
982                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
983                 return -1;
984         }
985
986         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
987         printf("Num tickles:%u\n", list->tickles.num);
988         for (i=0;i<list->tickles.num;i++) {
989                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
990                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
991         }
992
993         talloc_free(list);
994         
995         return 0;
996 }
997
998
999
1000 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1001 {
1002         struct ctdb_all_public_ips *ips;
1003         struct ctdb_public_ip ip;
1004         int i, ret;
1005         uint32_t *nodes;
1006         uint32_t disable_time;
1007         TDB_DATA data;
1008         struct ctdb_node_map *nodemap=NULL;
1009         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1010
1011         disable_time = 30;
1012         data.dptr  = (uint8_t*)&disable_time;
1013         data.dsize = sizeof(disable_time);
1014         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1017                 return -1;
1018         }
1019
1020
1021
1022         /* read the public ip list from the node */
1023         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         for (i=0;i<ips->num;i++) {
1031                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1032                         break;
1033                 }
1034         }
1035         if (i==ips->num) {
1036                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1037                         pnn, ctdb_addr_to_str(addr)));
1038                 talloc_free(tmp_ctx);
1039                 return -1;
1040         }
1041
1042         ip.pnn  = pnn;
1043         ip.addr = *addr;
1044
1045         data.dptr  = (uint8_t *)&ip;
1046         data.dsize = sizeof(ip);
1047
1048         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1049         if (ret != 0) {
1050                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1051                 talloc_free(tmp_ctx);
1052                 return ret;
1053         }
1054
1055         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1056         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1057                                         nodes, 0,
1058                                         TIMELIMIT(),
1059                                         false, data,
1060                                         NULL, NULL,
1061                                         NULL);
1062         if (ret != 0) {
1063                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1064                 talloc_free(tmp_ctx);
1065                 return -1;
1066         }
1067
1068         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1071                 talloc_free(tmp_ctx);
1072                 return -1;
1073         }
1074
1075         talloc_free(tmp_ctx);
1076         return 0;
1077 }
1078
1079 /*
1080   move/failover an ip address to a specific node
1081  */
1082 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1083 {
1084         uint32_t pnn;
1085         ctdb_sock_addr addr;
1086
1087         if (argc < 2) {
1088                 usage();
1089                 return -1;
1090         }
1091
1092         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1093                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1094                 return -1;
1095         }
1096
1097
1098         if (sscanf(argv[1], "%u", &pnn) != 1) {
1099                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1100                 return -1;
1101         }
1102
1103         if (move_ip(ctdb, &addr, pnn) != 0) {
1104                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1105                 return -1;
1106         }
1107
1108         return 0;
1109 }
1110
1111 void getips_store_callback(void *param, void *data)
1112 {
1113         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1114         struct ctdb_all_public_ips *ips = param;
1115         int i;
1116
1117         i = ips->num++;
1118         ips->ips[i].pnn  = node_ip->pnn;
1119         ips->ips[i].addr = node_ip->addr;
1120 }
1121
1122 void getips_count_callback(void *param, void *data)
1123 {
1124         uint32_t *count = param;
1125
1126         (*count)++;
1127 }
1128
1129 #define IP_KEYLEN       4
1130 static uint32_t *ip_key(ctdb_sock_addr *ip)
1131 {
1132         static uint32_t key[IP_KEYLEN];
1133
1134         bzero(key, sizeof(key));
1135
1136         switch (ip->sa.sa_family) {
1137         case AF_INET:
1138                 key[0]  = ip->ip.sin_addr.s_addr;
1139                 break;
1140         case AF_INET6:
1141                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1142                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1143                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1144                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1145                 break;
1146         default:
1147                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1148                 return key;
1149         }
1150
1151         return key;
1152 }
1153
1154 static void *add_ip_callback(void *parm, void *data)
1155 {
1156         return parm;
1157 }
1158
1159 static int
1160 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1161 {
1162         struct ctdb_all_public_ips *tmp_ips;
1163         struct ctdb_node_map *nodemap=NULL;
1164         trbt_tree_t *ip_tree;
1165         int i, j, len, ret;
1166         uint32_t count;
1167
1168         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1169         if (ret != 0) {
1170                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1171                 return ret;
1172         }
1173
1174         ip_tree = trbt_create(tmp_ctx, 0);
1175
1176         for(i=0;i<nodemap->num;i++){
1177                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1178                         continue;
1179                 }
1180                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1181                         continue;
1182                 }
1183
1184                 /* read the public ip list from this node */
1185                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1186                 if (ret != 0) {
1187                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1188                         return -1;
1189                 }
1190         
1191                 for (j=0; j<tmp_ips->num;j++) {
1192                         struct ctdb_public_ip *node_ip;
1193
1194                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1195                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1196                         node_ip->addr = tmp_ips->ips[j].addr;
1197
1198                         trbt_insertarray32_callback(ip_tree,
1199                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1200                                 add_ip_callback,
1201                                 node_ip);
1202                 }
1203                 talloc_free(tmp_ips);
1204         }
1205
1206         /* traverse */
1207         count = 0;
1208         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1209
1210         len = offsetof(struct ctdb_all_public_ips, ips) + 
1211                 count*sizeof(struct ctdb_public_ip);
1212         tmp_ips = talloc_zero_size(tmp_ctx, len);
1213         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1214
1215         *ips = tmp_ips;
1216
1217         return 0;
1218 }
1219
1220
1221 /* 
1222  * scans all other nodes and returns a pnn for another node that can host this 
1223  * ip address or -1
1224  */
1225 static int
1226 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1227 {
1228         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1229         struct ctdb_all_public_ips *ips;
1230         struct ctdb_node_map *nodemap=NULL;
1231         int i, j, ret;
1232
1233         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1234         if (ret != 0) {
1235                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1236                 talloc_free(tmp_ctx);
1237                 return ret;
1238         }
1239
1240         for(i=0;i<nodemap->num;i++){
1241                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1242                         continue;
1243                 }
1244                 if (nodemap->nodes[i].pnn == options.pnn) {
1245                         continue;
1246                 }
1247
1248                 /* read the public ip list from this node */
1249                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1250                 if (ret != 0) {
1251                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1252                         return -1;
1253                 }
1254
1255                 for (j=0;j<ips->num;j++) {
1256                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1257                                 talloc_free(tmp_ctx);
1258                                 return nodemap->nodes[i].pnn;
1259                         }
1260                 }
1261                 talloc_free(ips);
1262         }
1263
1264         talloc_free(tmp_ctx);
1265         return -1;
1266 }
1267
1268 /*
1269   add a public ip address to a node
1270  */
1271 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1272 {
1273         int i, ret;
1274         int len;
1275         uint32_t pnn;
1276         unsigned mask;
1277         ctdb_sock_addr addr;
1278         struct ctdb_control_ip_iface *pub;
1279         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1280         struct ctdb_all_public_ips *ips;
1281
1282
1283         if (argc != 2) {
1284                 talloc_free(tmp_ctx);
1285                 usage();
1286         }
1287
1288         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1289                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1290                 talloc_free(tmp_ctx);
1291                 return -1;
1292         }
1293
1294         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1297                 talloc_free(tmp_ctx);
1298                 return ret;
1299         }
1300
1301
1302         /* check if some other node is already serving this ip, if not,
1303          * we will claim it
1304          */
1305         for (i=0;i<ips->num;i++) {
1306                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1307                         break;
1308                 }
1309         }
1310
1311         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1312         pub = talloc_size(tmp_ctx, len); 
1313         CTDB_NO_MEMORY(ctdb, pub);
1314
1315         pub->addr  = addr;
1316         pub->mask  = mask;
1317         pub->len   = strlen(argv[1])+1;
1318         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1319
1320         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1321         if (ret != 0) {
1322                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1323                 talloc_free(tmp_ctx);
1324                 return ret;
1325         }
1326
1327         if (i == ips->num) {
1328                 /* no one has this ip so we claim it */
1329                 pnn  = options.pnn;
1330         } else {
1331                 pnn  = ips->ips[i].pnn;
1332         }
1333
1334         if (move_ip(ctdb, &addr, pnn) != 0) {
1335                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1336                 return -1;
1337         }
1338
1339         talloc_free(tmp_ctx);
1340         return 0;
1341 }
1342
1343 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1344
1345 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1346 {
1347         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1348         struct ctdb_node_map *nodemap=NULL;
1349         struct ctdb_all_public_ips *ips;
1350         int ret, i, j;
1351
1352         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1353         if (ret != 0) {
1354                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1355                 return ret;
1356         }
1357
1358         /* remove it from the nodes that are not hosting the ip currently */
1359         for(i=0;i<nodemap->num;i++){
1360                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1361                         continue;
1362                 }
1363                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1364                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1365                         continue;
1366                 }
1367
1368                 for (j=0;j<ips->num;j++) {
1369                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1370                                 break;
1371                         }
1372                 }
1373                 if (j==ips->num) {
1374                         continue;
1375                 }
1376
1377                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1378                         continue;
1379                 }
1380
1381                 options.pnn = nodemap->nodes[i].pnn;
1382                 control_delip(ctdb, argc, argv);
1383         }
1384
1385
1386         /* remove it from every node (also the one hosting it) */
1387         for(i=0;i<nodemap->num;i++){
1388                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1389                         continue;
1390                 }
1391                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1392                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1393                         continue;
1394                 }
1395
1396                 for (j=0;j<ips->num;j++) {
1397                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1398                                 break;
1399                         }
1400                 }
1401                 if (j==ips->num) {
1402                         continue;
1403                 }
1404
1405                 options.pnn = nodemap->nodes[i].pnn;
1406                 control_delip(ctdb, argc, argv);
1407         }
1408
1409         talloc_free(tmp_ctx);
1410         return 0;
1411 }
1412         
1413 /*
1414   delete a public ip address from a node
1415  */
1416 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1417 {
1418         int i, ret;
1419         ctdb_sock_addr addr;
1420         struct ctdb_control_ip_iface pub;
1421         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1422         struct ctdb_all_public_ips *ips;
1423
1424         if (argc != 1) {
1425                 talloc_free(tmp_ctx);
1426                 usage();
1427         }
1428
1429         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1430                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1431                 return -1;
1432         }
1433
1434         if (options.pnn == CTDB_BROADCAST_ALL) {
1435                 return control_delip_all(ctdb, argc, argv, &addr);
1436         }
1437
1438         pub.addr  = addr;
1439         pub.mask  = 0;
1440         pub.len   = 0;
1441
1442         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1445                 talloc_free(tmp_ctx);
1446                 return ret;
1447         }
1448         
1449         for (i=0;i<ips->num;i++) {
1450                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1451                         break;
1452                 }
1453         }
1454
1455         if (i==ips->num) {
1456                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1457                         ctdb_addr_to_str(&addr)));
1458                 talloc_free(tmp_ctx);
1459                 return -1;
1460         }
1461
1462         if (ips->ips[i].pnn == options.pnn) {
1463                 ret = find_other_host_for_public_ip(ctdb, &addr);
1464                 if (ret != -1) {
1465                         if (move_ip(ctdb, &addr, ret) != 0) {
1466                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1467                                 return -1;
1468                         }
1469                 }
1470         }
1471
1472         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1473         if (ret != 0) {
1474                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1475                 talloc_free(tmp_ctx);
1476                 return ret;
1477         }
1478
1479         talloc_free(tmp_ctx);
1480         return 0;
1481 }
1482
1483 /*
1484   kill a tcp connection
1485  */
1486 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1487 {
1488         int ret;
1489         struct ctdb_control_killtcp killtcp;
1490
1491         if (argc < 2) {
1492                 usage();
1493         }
1494
1495         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1496                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1497                 return -1;
1498         }
1499
1500         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1501                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1502                 return -1;
1503         }
1504
1505         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1506         if (ret != 0) {
1507                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1508                 return ret;
1509         }
1510
1511         return 0;
1512 }
1513
1514
1515 /*
1516   send a gratious arp
1517  */
1518 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1519 {
1520         int ret;
1521         ctdb_sock_addr addr;
1522
1523         if (argc < 2) {
1524                 usage();
1525         }
1526
1527         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1528                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1529                 return -1;
1530         }
1531
1532         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1533         if (ret != 0) {
1534                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1535                 return ret;
1536         }
1537
1538         return 0;
1539 }
1540
1541 /*
1542   register a server id
1543  */
1544 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1545 {
1546         int ret;
1547         struct ctdb_server_id server_id;
1548
1549         if (argc < 3) {
1550                 usage();
1551         }
1552
1553         server_id.pnn       = strtoul(argv[0], NULL, 0);
1554         server_id.type      = strtoul(argv[1], NULL, 0);
1555         server_id.server_id = strtoul(argv[2], NULL, 0);
1556
1557         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1558         if (ret != 0) {
1559                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1560                 return ret;
1561         }
1562         return -1;
1563 }
1564
1565 /*
1566   unregister a server id
1567  */
1568 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1569 {
1570         int ret;
1571         struct ctdb_server_id server_id;
1572
1573         if (argc < 3) {
1574                 usage();
1575         }
1576
1577         server_id.pnn       = strtoul(argv[0], NULL, 0);
1578         server_id.type      = strtoul(argv[1], NULL, 0);
1579         server_id.server_id = strtoul(argv[2], NULL, 0);
1580
1581         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1582         if (ret != 0) {
1583                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1584                 return ret;
1585         }
1586         return -1;
1587 }
1588
1589 /*
1590   check if a server id exists
1591  */
1592 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1593 {
1594         uint32_t status;
1595         int ret;
1596         struct ctdb_server_id server_id;
1597
1598         if (argc < 3) {
1599                 usage();
1600         }
1601
1602         server_id.pnn       = strtoul(argv[0], NULL, 0);
1603         server_id.type      = strtoul(argv[1], NULL, 0);
1604         server_id.server_id = strtoul(argv[2], NULL, 0);
1605
1606         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1607         if (ret != 0) {
1608                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1609                 return ret;
1610         }
1611
1612         if (status) {
1613                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1614         } else {
1615                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1616         }
1617         return 0;
1618 }
1619
1620 /*
1621   get a list of all server ids that are registered on a node
1622  */
1623 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1624 {
1625         int i, ret;
1626         struct ctdb_server_id_list *server_ids;
1627
1628         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1629         if (ret != 0) {
1630                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1631                 return ret;
1632         }
1633
1634         for (i=0; i<server_ids->num; i++) {
1635                 printf("Server id %d:%d:%d\n", 
1636                         server_ids->server_ids[i].pnn, 
1637                         server_ids->server_ids[i].type, 
1638                         server_ids->server_ids[i].server_id); 
1639         }
1640
1641         return -1;
1642 }
1643
1644 /*
1645   send a tcp tickle ack
1646  */
1647 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1648 {
1649         int ret;
1650         ctdb_sock_addr  src, dst;
1651
1652         if (argc < 2) {
1653                 usage();
1654         }
1655
1656         if (!parse_ip_port(argv[0], &src)) {
1657                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1658                 return -1;
1659         }
1660
1661         if (!parse_ip_port(argv[1], &dst)) {
1662                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1663                 return -1;
1664         }
1665
1666         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1667         if (ret==0) {
1668                 return 0;
1669         }
1670         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1671
1672         return -1;
1673 }
1674
1675
1676 /*
1677   display public ip status
1678  */
1679 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1680 {
1681         int i, ret;
1682         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1683         struct ctdb_all_public_ips *ips;
1684
1685         if (options.pnn == CTDB_BROADCAST_ALL) {
1686                 /* read the list of public ips from all nodes */
1687                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1688         } else {
1689                 /* read the public ip list from this node */
1690                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1691         }
1692         if (ret != 0) {
1693                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1694                 talloc_free(tmp_ctx);
1695                 return ret;
1696         }
1697
1698         if (options.machinereadable){
1699                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1700         } else {
1701                 if (options.pnn == CTDB_BROADCAST_ALL) {
1702                         printf("Public IPs on ALL nodes\n");
1703                 } else {
1704                         printf("Public IPs on node %u\n", options.pnn);
1705                 }
1706         }
1707
1708         for (i=1;i<=ips->num;i++) {
1709                 struct ctdb_control_public_ip_info *info = NULL;
1710                 int32_t pnn;
1711                 char *aciface = NULL;
1712                 char *avifaces = NULL;
1713                 char *cifaces = NULL;
1714
1715                 if (options.pnn == CTDB_BROADCAST_ALL) {
1716                         pnn = ips->ips[ips->num-i].pnn;
1717                 } else {
1718                         pnn = options.pnn;
1719                 }
1720
1721                 if (pnn != -1) {
1722                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1723                                                    &ips->ips[ips->num-i].addr, &info);
1724                 } else {
1725                         ret = -1;
1726                 }
1727
1728                 if (ret == 0) {
1729                         int j;
1730                         for (j=0; j < info->num; j++) {
1731                                 if (cifaces == NULL) {
1732                                         cifaces = talloc_strdup(info,
1733                                                                 info->ifaces[j].name);
1734                                 } else {
1735                                         cifaces = talloc_asprintf_append(cifaces,
1736                                                                          ",%s",
1737                                                                          info->ifaces[j].name);
1738                                 }
1739
1740                                 if (info->active_idx == j) {
1741                                         aciface = info->ifaces[j].name;
1742                                 }
1743
1744                                 if (info->ifaces[j].link_state == 0) {
1745                                         continue;
1746                                 }
1747
1748                                 if (avifaces == NULL) {
1749                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1750                                 } else {
1751                                         avifaces = talloc_asprintf_append(avifaces,
1752                                                                           ",%s",
1753                                                                           info->ifaces[j].name);
1754                                 }
1755                         }
1756                 }
1757
1758                 if (options.machinereadable){
1759                         printf(":%s:%d:%s:%s:%s:\n",
1760                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1761                                ips->ips[ips->num-i].pnn,
1762                                aciface?aciface:"",
1763                                avifaces?avifaces:"",
1764                                cifaces?cifaces:"");
1765                 } else {
1766                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1767                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1768                                ips->ips[ips->num-i].pnn,
1769                                aciface?aciface:"",
1770                                avifaces?avifaces:"",
1771                                cifaces?cifaces:"");
1772                 }
1773                 talloc_free(info);
1774         }
1775
1776         talloc_free(tmp_ctx);
1777         return 0;
1778 }
1779
1780 /*
1781   public ip info
1782  */
1783 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1784 {
1785         int i, ret;
1786         ctdb_sock_addr addr;
1787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1788         struct ctdb_control_public_ip_info *info;
1789
1790         if (argc != 1) {
1791                 talloc_free(tmp_ctx);
1792                 usage();
1793         }
1794
1795         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1796                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1797                 return -1;
1798         }
1799
1800         /* read the public ip info from this node */
1801         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1802                                            tmp_ctx, &addr, &info);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1805                                   argv[0], options.pnn));
1806                 talloc_free(tmp_ctx);
1807                 return ret;
1808         }
1809
1810         printf("Public IP[%s] info on node %u\n",
1811                ctdb_addr_to_str(&info->ip.addr),
1812                options.pnn);
1813
1814         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1815                ctdb_addr_to_str(&info->ip.addr),
1816                info->ip.pnn, info->num);
1817
1818         for (i=0; i<info->num; i++) {
1819                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1820
1821                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1822                        i+1, info->ifaces[i].name,
1823                        info->ifaces[i].link_state?"up":"down",
1824                        (unsigned int)info->ifaces[i].references,
1825                        (i==info->active_idx)?" (active)":"");
1826         }
1827
1828         talloc_free(tmp_ctx);
1829         return 0;
1830 }
1831
1832 /*
1833   display interfaces status
1834  */
1835 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1836 {
1837         int i, ret;
1838         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1839         struct ctdb_control_get_ifaces *ifaces;
1840
1841         /* read the public ip list from this node */
1842         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1843                                    tmp_ctx, &ifaces);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1846                                   options.pnn));
1847                 talloc_free(tmp_ctx);
1848                 return ret;
1849         }
1850
1851         if (options.machinereadable){
1852                 printf(":Name:LinkStatus:References:\n");
1853         } else {
1854                 printf("Interfaces on node %u\n", options.pnn);
1855         }
1856
1857         for (i=0; i<ifaces->num; i++) {
1858                 if (options.machinereadable){
1859                         printf(":%s:%s:%u\n",
1860                                ifaces->ifaces[i].name,
1861                                ifaces->ifaces[i].link_state?"1":"0",
1862                                (unsigned int)ifaces->ifaces[i].references);
1863                 } else {
1864                         printf("name:%s link:%s references:%u\n",
1865                                ifaces->ifaces[i].name,
1866                                ifaces->ifaces[i].link_state?"up":"down",
1867                                (unsigned int)ifaces->ifaces[i].references);
1868                 }
1869         }
1870
1871         talloc_free(tmp_ctx);
1872         return 0;
1873 }
1874
1875
1876 /*
1877   set link status of an interface
1878  */
1879 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1880 {
1881         int ret;
1882         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1883         struct ctdb_control_iface_info info;
1884
1885         ZERO_STRUCT(info);
1886
1887         if (argc != 2) {
1888                 usage();
1889         }
1890
1891         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1892                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1893                                   argv[0]));
1894                 talloc_free(tmp_ctx);
1895                 return -1;
1896         }
1897         strcpy(info.name, argv[0]);
1898
1899         if (strcmp(argv[1], "up") == 0) {
1900                 info.link_state = 1;
1901         } else if (strcmp(argv[1], "down") == 0) {
1902                 info.link_state = 0;
1903         } else {
1904                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1905                                   argv[1]));
1906                 talloc_free(tmp_ctx);
1907                 return -1;
1908         }
1909
1910         /* read the public ip list from this node */
1911         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1912                                    tmp_ctx, &info);
1913         if (ret != 0) {
1914                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1915                                   argv[0], options.pnn));
1916                 talloc_free(tmp_ctx);
1917                 return ret;
1918         }
1919
1920         talloc_free(tmp_ctx);
1921         return 0;
1922 }
1923
1924 /*
1925   display pid of a ctdb daemon
1926  */
1927 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1928 {
1929         uint32_t pid;
1930         int ret;
1931
1932         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1933         if (ret != 0) {
1934                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1935                 return ret;
1936         }
1937         printf("Pid:%d\n", pid);
1938
1939         return 0;
1940 }
1941
1942 /*
1943   handler for receiving the response to ipreallocate
1944 */
1945 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1946                              TDB_DATA data, void *private_data)
1947 {
1948         exit(0);
1949 }
1950
1951 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1952 {
1953         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1954
1955         event_add_timed(ctdb->ev, ctdb, 
1956                                 timeval_current_ofs(1, 0),
1957                                 ctdb_every_second, ctdb);
1958 }
1959
1960 /*
1961   ask the recovery daemon on the recovery master to perform a ip reallocation
1962  */
1963 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1964 {
1965         int i, ret;
1966         TDB_DATA data;
1967         struct takeover_run_reply rd;
1968         uint32_t recmaster;
1969         struct ctdb_node_map *nodemap=NULL;
1970         int retries=0;
1971         struct timeval tv = timeval_current();
1972
1973         /* we need some events to trigger so we can timeout and restart
1974            the loop
1975         */
1976         event_add_timed(ctdb->ev, ctdb, 
1977                                 timeval_current_ofs(1, 0),
1978                                 ctdb_every_second, ctdb);
1979
1980         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1981         if (rd.pnn == -1) {
1982                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1983                 return -1;
1984         }
1985         rd.srvid = getpid();
1986
1987         /* register a message port for receiveing the reply so that we
1988            can receive the reply
1989         */
1990         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1991
1992         data.dptr = (uint8_t *)&rd;
1993         data.dsize = sizeof(rd);
1994
1995 again:
1996         if (retries>5) {
1997                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1998                 exit(10);
1999         }
2000
2001         /* check that there are valid nodes available */
2002         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2003                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2004                 exit(10);
2005         }
2006         for (i=0; i<nodemap->num;i++) {
2007                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2008                         break;
2009                 }
2010         }
2011         if (i==nodemap->num) {
2012                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2013                 return 0;
2014         }
2015
2016
2017         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2018         if (ret != 0) {
2019                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2020                 return ret;
2021         }
2022
2023         /* verify the node exists */
2024         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2025                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2026                 exit(10);
2027         }
2028
2029
2030         /* check tha there are nodes available that can act as a recmaster */
2031         for (i=0; i<nodemap->num; i++) {
2032                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2033                         continue;
2034                 }
2035                 break;
2036         }
2037         if (i == nodemap->num) {
2038                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2039                 return 0;
2040         }
2041
2042         /* verify the recovery master is not STOPPED, nor BANNED */
2043         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2044                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2045                 retries++;
2046                 sleep(1);
2047                 goto again;
2048         } 
2049
2050         
2051         /* verify the recovery master is not STOPPED, nor BANNED */
2052         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2053                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2054                 retries++;
2055                 sleep(1);
2056                 goto again;
2057         } 
2058
2059         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2060         if (ret != 0) {
2061                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2062                 return -1;
2063         }
2064
2065         tv = timeval_current();
2066         /* this loop will terminate when we have received the reply */
2067         while (timeval_elapsed(&tv) < 3.0) {    
2068                 event_loop_once(ctdb->ev);
2069         }
2070
2071         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2072         retries++;
2073         sleep(1);
2074         goto again;
2075
2076         return 0;
2077 }
2078
2079
2080 /*
2081   disable a remote node
2082  */
2083 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2084 {
2085         int ret;
2086         struct ctdb_node_map *nodemap=NULL;
2087
2088         do {
2089                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2090                 if (ret != 0) {
2091                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2092                         return ret;
2093                 }
2094
2095                 sleep(1);
2096
2097                 /* read the nodemap and verify the change took effect */
2098                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2099                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2100                         exit(10);
2101                 }
2102
2103         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2104         ret = control_ipreallocate(ctdb, argc, argv);
2105         if (ret != 0) {
2106                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2107                 return ret;
2108         }
2109
2110         return 0;
2111 }
2112
2113 /*
2114   enable a disabled remote node
2115  */
2116 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2117 {
2118         int ret;
2119
2120         struct ctdb_node_map *nodemap=NULL;
2121
2122         do {
2123                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2124                 if (ret != 0) {
2125                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2126                         return ret;
2127                 }
2128
2129                 sleep(1);
2130
2131                 /* read the nodemap and verify the change took effect */
2132                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2133                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2134                         exit(10);
2135                 }
2136
2137         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2138         ret = control_ipreallocate(ctdb, argc, argv);
2139         if (ret != 0) {
2140                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2141                 return ret;
2142         }
2143
2144         return 0;
2145 }
2146
2147 /*
2148   stop a remote node
2149  */
2150 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2151 {
2152         int ret;
2153         struct ctdb_node_map *nodemap=NULL;
2154
2155         do {
2156                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2157                 if (ret != 0) {
2158                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2159                 }
2160         
2161                 sleep(1);
2162
2163                 /* read the nodemap and verify the change took effect */
2164                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2165                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2166                         exit(10);
2167                 }
2168
2169         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2170         ret = control_ipreallocate(ctdb, argc, argv);
2171         if (ret != 0) {
2172                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2173                 return ret;
2174         }
2175
2176         return 0;
2177 }
2178
2179 /*
2180   restart a stopped remote node
2181  */
2182 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2183 {
2184         int ret;
2185
2186         struct ctdb_node_map *nodemap=NULL;
2187
2188         do {
2189                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2190                 if (ret != 0) {
2191                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2192                         return ret;
2193                 }
2194         
2195                 sleep(1);
2196
2197                 /* read the nodemap and verify the change took effect */
2198                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2199                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2200                         exit(10);
2201                 }
2202
2203         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2204         ret = control_ipreallocate(ctdb, argc, argv);
2205         if (ret != 0) {
2206                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2207                 return ret;
2208         }
2209
2210         return 0;
2211 }
2212
2213 static uint32_t get_generation(struct ctdb_context *ctdb)
2214 {
2215         struct ctdb_vnn_map *vnnmap=NULL;
2216         int ret;
2217
2218         /* wait until the recmaster is not in recovery mode */
2219         while (1) {
2220                 uint32_t recmode, recmaster;
2221                 
2222                 if (vnnmap != NULL) {
2223                         talloc_free(vnnmap);
2224                         vnnmap = NULL;
2225                 }
2226
2227                 /* get the recmaster */
2228                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2229                 if (ret != 0) {
2230                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2231                         exit(10);
2232                 }
2233
2234                 /* get recovery mode */
2235                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2236                 if (ret != 0) {
2237                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2238                         exit(10);
2239                 }
2240
2241                 /* get the current generation number */
2242                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2243                 if (ret != 0) {
2244                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2245                         exit(10);
2246                 }
2247
2248                 if ((recmode == CTDB_RECOVERY_NORMAL)
2249                 &&  (vnnmap->generation != 1)){
2250                         return vnnmap->generation;
2251                 }
2252                 sleep(1);
2253         }
2254 }
2255
2256 /*
2257   ban a node from the cluster
2258  */
2259 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2260 {
2261         int ret;
2262         struct ctdb_node_map *nodemap=NULL;
2263         struct ctdb_ban_time bantime;
2264
2265         if (argc < 1) {
2266                 usage();
2267         }
2268         
2269         /* verify the node exists */
2270         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2271         if (ret != 0) {
2272                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2273                 return ret;
2274         }
2275
2276         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2277                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2278                 return -1;
2279         }
2280
2281         bantime.pnn  = options.pnn;
2282         bantime.time = strtoul(argv[0], NULL, 0);
2283
2284         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2287                 return -1;
2288         }       
2289
2290         ret = control_ipreallocate(ctdb, argc, argv);
2291         if (ret != 0) {
2292                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2293                 return ret;
2294         }
2295
2296         return 0;
2297 }
2298
2299
2300 /*
2301   unban a node from the cluster
2302  */
2303 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2304 {
2305         int ret;
2306         struct ctdb_node_map *nodemap=NULL;
2307         struct ctdb_ban_time bantime;
2308
2309         /* verify the node exists */
2310         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2313                 return ret;
2314         }
2315
2316         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2317                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2318                 return -1;
2319         }
2320
2321         bantime.pnn  = options.pnn;
2322         bantime.time = 0;
2323
2324         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2325         if (ret != 0) {
2326                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2327                 return -1;
2328         }       
2329
2330         ret = control_ipreallocate(ctdb, argc, argv);
2331         if (ret != 0) {
2332                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2333                 return ret;
2334         }
2335
2336         return 0;
2337 }
2338
2339
2340 /*
2341   show ban information for a node
2342  */
2343 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2344 {
2345         int ret;
2346         struct ctdb_node_map *nodemap=NULL;
2347         struct ctdb_ban_time *bantime;
2348
2349         /* verify the node exists */
2350         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2351         if (ret != 0) {
2352                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2353                 return ret;
2354         }
2355
2356         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2357         if (ret != 0) {
2358                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2359                 return -1;
2360         }       
2361
2362         if (bantime->time == 0) {
2363                 printf("Node %u is not banned\n", bantime->pnn);
2364         } else {
2365                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2366         }
2367
2368         return 0;
2369 }
2370
2371 /*
2372   shutdown a daemon
2373  */
2374 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2375 {
2376         int ret;
2377
2378         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2379         if (ret != 0) {
2380                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2381                 return ret;
2382         }
2383
2384         return 0;
2385 }
2386
2387 /*
2388   trigger a recovery
2389  */
2390 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2391 {
2392         int ret;
2393         uint32_t generation, next_generation;
2394
2395         /* record the current generation number */
2396         generation = get_generation(ctdb);
2397
2398         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2399         if (ret != 0) {
2400                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2401                 return ret;
2402         }
2403
2404         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2405         if (ret != 0) {
2406                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2407                 return ret;
2408         }
2409
2410         /* wait until we are in a new generation */
2411         while (1) {
2412                 next_generation = get_generation(ctdb);
2413                 if (next_generation != generation) {
2414                         return 0;
2415                 }
2416                 sleep(1);
2417         }
2418
2419         return 0;
2420 }
2421
2422
2423 /*
2424   display monitoring mode of a remote node
2425  */
2426 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2427 {
2428         uint32_t monmode;
2429         int ret;
2430
2431         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2432         if (ret != 0) {
2433                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2434                 return ret;
2435         }
2436         if (!options.machinereadable){
2437                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2438         } else {
2439                 printf(":mode:\n");
2440                 printf(":%d:\n",monmode);
2441         }
2442         return 0;
2443 }
2444
2445
2446 /*
2447   display capabilities of a remote node
2448  */
2449 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2450 {
2451         uint32_t capabilities;
2452         int ret;
2453
2454         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2455         if (ret != 0) {
2456                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2457                 return ret;
2458         }
2459         
2460         if (!options.machinereadable){
2461                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2462                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2463                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2464                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2465         } else {
2466                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2467                 printf(":%d:%d:%d:%d:\n",
2468                         !!(capabilities&CTDB_CAP_RECMASTER),
2469                         !!(capabilities&CTDB_CAP_LMASTER),
2470                         !!(capabilities&CTDB_CAP_LVS),
2471                         !!(capabilities&CTDB_CAP_NATGW));
2472         }
2473         return 0;
2474 }
2475
2476 /*
2477   display lvs configuration
2478  */
2479 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2480 {
2481         uint32_t *capabilities;
2482         struct ctdb_node_map *nodemap=NULL;
2483         int i, ret;
2484         int healthy_count = 0;
2485
2486         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2487         if (ret != 0) {
2488                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2489                 return ret;
2490         }
2491
2492         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2493         CTDB_NO_MEMORY(ctdb, capabilities);
2494         
2495         /* collect capabilities for all connected nodes */
2496         for (i=0; i<nodemap->num; i++) {
2497                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2498                         continue;
2499                 }
2500                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2501                         continue;
2502                 }
2503         
2504                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2505                 if (ret != 0) {
2506                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2507                         return ret;
2508                 }
2509
2510                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2511                         continue;
2512                 }
2513
2514                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2515                         healthy_count++;
2516                 }
2517         }
2518
2519         /* Print all LVS nodes */
2520         for (i=0; i<nodemap->num; i++) {
2521                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2522                         continue;
2523                 }
2524                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2525                         continue;
2526                 }
2527                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2528                         continue;
2529                 }
2530
2531                 if (healthy_count != 0) {
2532                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2533                                 continue;
2534                         }
2535                 }
2536
2537                 printf("%d:%s\n", i, 
2538                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2539         }
2540
2541         return 0;
2542 }
2543
2544 /*
2545   display who is the lvs master
2546  */
2547 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2548 {
2549         uint32_t *capabilities;
2550         struct ctdb_node_map *nodemap=NULL;
2551         int i, ret;
2552         int healthy_count = 0;
2553
2554         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2555         if (ret != 0) {
2556                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2557                 return ret;
2558         }
2559
2560         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2561         CTDB_NO_MEMORY(ctdb, capabilities);
2562         
2563         /* collect capabilities for all connected nodes */
2564         for (i=0; i<nodemap->num; i++) {
2565                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2566                         continue;
2567                 }
2568                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2569                         continue;
2570                 }
2571         
2572                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2573                 if (ret != 0) {
2574                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2575                         return ret;
2576                 }
2577
2578                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2579                         continue;
2580                 }
2581
2582                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2583                         healthy_count++;
2584                 }
2585         }
2586
2587         /* find and show the lvsmaster */
2588         for (i=0; i<nodemap->num; i++) {
2589                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2590                         continue;
2591                 }
2592                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2593                         continue;
2594                 }
2595                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2596                         continue;
2597                 }
2598
2599                 if (healthy_count != 0) {
2600                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2601                                 continue;
2602                         }
2603                 }
2604
2605                 if (options.machinereadable){
2606                         printf("%d\n", i);
2607                 } else {
2608                         printf("Node %d is LVS master\n", i);
2609                 }
2610                 return 0;
2611         }
2612
2613         printf("There is no LVS master\n");
2614         return -1;
2615 }
2616
2617 /*
2618   disable monitoring on a  node
2619  */
2620 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2621 {
2622         
2623         int ret;
2624
2625         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2626         if (ret != 0) {
2627                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2628                 return ret;
2629         }
2630         printf("Monitoring mode:%s\n","DISABLED");
2631
2632         return 0;
2633 }
2634
2635 /*
2636   enable monitoring on a  node
2637  */
2638 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2639 {
2640         
2641         int ret;
2642
2643         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2644         if (ret != 0) {
2645                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2646                 return ret;
2647         }
2648         printf("Monitoring mode:%s\n","ACTIVE");
2649
2650         return 0;
2651 }
2652
2653 /*
2654   display remote list of keys/data for a db
2655  */
2656 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2657 {
2658         const char *db_name;
2659         struct ctdb_db_context *ctdb_db;
2660         int ret;
2661
2662         if (argc < 1) {
2663                 usage();
2664         }
2665
2666         db_name = argv[0];
2667
2668
2669         if (db_exists(ctdb, db_name)) {
2670                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2671                 return -1;
2672         }
2673
2674         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2675
2676         if (ctdb_db == NULL) {
2677                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2678                 return -1;
2679         }
2680
2681         /* traverse and dump the cluster tdb */
2682         ret = ctdb_dump_db(ctdb_db, stdout);
2683         if (ret == -1) {
2684                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2685                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2686                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2687                                   db_name));
2688                 return -1;
2689         }
2690         talloc_free(ctdb_db);
2691
2692         printf("Dumped %d records\n", ret);
2693         return 0;
2694 }
2695
2696
2697 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2698                              TDB_DATA data, void *private_data)
2699 {
2700         DEBUG(DEBUG_ERR,("Log data received\n"));
2701         if (data.dsize > 0) {
2702                 printf("%s", data.dptr);
2703         }
2704
2705         exit(0);
2706 }
2707
2708 /*
2709   display a list of log messages from the in memory ringbuffer
2710  */
2711 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2712 {
2713         int ret;
2714         int32_t res;
2715         struct ctdb_get_log_addr log_addr;
2716         TDB_DATA data;
2717         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2718         char *errmsg;
2719         struct timeval tv;
2720
2721         if (argc != 1) {
2722                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2723                 talloc_free(tmp_ctx);
2724                 return -1;
2725         }
2726
2727         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2728         log_addr.srvid = getpid();
2729         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2730                 log_addr.level = get_debug_by_desc(argv[0]);
2731         } else {
2732                 log_addr.level = strtol(argv[0], NULL, 0);
2733         }
2734
2735
2736         data.dptr = (unsigned char *)&log_addr;
2737         data.dsize = sizeof(log_addr);
2738
2739         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2740
2741         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2742         sleep(1);
2743
2744         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2745
2746         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2747                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2748         if (ret != 0 || res != 0) {
2749                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2750                 talloc_free(tmp_ctx);
2751                 return -1;
2752         }
2753
2754
2755         tv = timeval_current();
2756         /* this loop will terminate when we have received the reply */
2757         while (timeval_elapsed(&tv) < 3.0) {    
2758                 event_loop_once(ctdb->ev);
2759         }
2760
2761         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2762
2763         talloc_free(tmp_ctx);
2764         return 0;
2765 }
2766
2767 /*
2768   clear the in memory log area
2769  */
2770 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2771 {
2772         int ret;
2773         int32_t res;
2774         char *errmsg;
2775         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2776
2777         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2778                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2779         if (ret != 0 || res != 0) {
2780                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2781                 talloc_free(tmp_ctx);
2782                 return -1;
2783         }
2784
2785         talloc_free(tmp_ctx);
2786         return 0;
2787 }
2788
2789
2790
2791 /*
2792   display a list of the databases on a remote ctdb
2793  */
2794 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2795 {
2796         int i, ret;
2797         struct ctdb_dbid_map *dbmap=NULL;
2798
2799         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2800         if (ret != 0) {
2801                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2802                 return ret;
2803         }
2804
2805         if(options.machinereadable){
2806                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2807                 for(i=0;i<dbmap->num;i++){
2808                         const char *path;
2809                         const char *name;
2810                         const char *health;
2811                         bool persistent;
2812
2813                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2814                                             dbmap->dbs[i].dbid, ctdb, &path);
2815                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2816                                             dbmap->dbs[i].dbid, ctdb, &name);
2817                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2818                                               dbmap->dbs[i].dbid, ctdb, &health);
2819                         persistent = dbmap->dbs[i].persistent;
2820                         printf(":0x%08X:%s:%s:%d:%d:\n",
2821                                dbmap->dbs[i].dbid, name, path,
2822                                !!(persistent), !!(health));
2823                 }
2824                 return 0;
2825         }
2826
2827         printf("Number of databases:%d\n", dbmap->num);
2828         for(i=0;i<dbmap->num;i++){
2829                 const char *path;
2830                 const char *name;
2831                 const char *health;
2832                 bool persistent;
2833
2834                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2835                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2836                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2837                 persistent = dbmap->dbs[i].persistent;
2838                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2839                        dbmap->dbs[i].dbid, name, path,
2840                        persistent?" PERSISTENT":"",
2841                        health?" UNHEALTHY":"");
2842         }
2843
2844         return 0;
2845 }
2846
2847 /*
2848   display the status of a database on a remote ctdb
2849  */
2850 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2851 {
2852         int i, ret;
2853         struct ctdb_dbid_map *dbmap=NULL;
2854         const char *db_name;
2855
2856         if (argc < 1) {
2857                 usage();
2858         }
2859
2860         db_name = argv[0];
2861
2862         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2863         if (ret != 0) {
2864                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2865                 return ret;
2866         }
2867
2868         for(i=0;i<dbmap->num;i++){
2869                 const char *path;
2870                 const char *name;
2871                 const char *health;
2872                 bool persistent;
2873
2874                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2875                 if (strcmp(name, db_name) != 0) {
2876                         continue;
2877                 }
2878
2879                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2880                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2881                 persistent = dbmap->dbs[i].persistent;
2882                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2883                        dbmap->dbs[i].dbid, name, path,
2884                        persistent?"yes":"no",
2885                        health?health:"OK");
2886                 return 0;
2887         }
2888
2889         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2890         return 0;
2891 }
2892
2893 /*
2894   check if the local node is recmaster or not
2895   it will return 1 if this node is the recmaster and 0 if it is not
2896   or if the local ctdb daemon could not be contacted
2897  */
2898 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2899 {
2900         uint32_t mypnn, recmaster;
2901         int ret;
2902
2903         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2904         if (mypnn == -1) {
2905                 printf("Failed to get pnn of node\n");
2906                 return 1;
2907         }
2908
2909         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2910         if (ret != 0) {
2911                 printf("Failed to get the recmaster\n");
2912                 return 1;
2913         }
2914
2915         if (recmaster != mypnn) {
2916                 printf("this node is not the recmaster\n");
2917                 return 1;
2918         }
2919
2920         printf("this node is the recmaster\n");
2921         return 0;
2922 }
2923
2924 /*
2925   ping a node
2926  */
2927 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2928 {
2929         int ret;
2930         struct timeval tv = timeval_current();
2931         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2932         if (ret == -1) {
2933                 printf("Unable to get ping response from node %u\n", options.pnn);
2934                 return -1;
2935         } else {
2936                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2937                        options.pnn, timeval_elapsed(&tv), ret);
2938         }
2939         return 0;
2940 }
2941
2942
2943 /*
2944   get a tunable
2945  */
2946 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2947 {
2948         const char *name;
2949         uint32_t value;
2950         int ret;
2951
2952         if (argc < 1) {
2953                 usage();
2954         }
2955
2956         name = argv[0];
2957         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2958         if (ret == -1) {
2959                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2960                 return -1;
2961         }
2962
2963         printf("%-19s = %u\n", name, value);
2964         return 0;
2965 }
2966
2967 /*
2968   set a tunable
2969  */
2970 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2971 {
2972         const char *name;
2973         uint32_t value;
2974         int ret;
2975
2976         if (argc < 2) {
2977                 usage();
2978         }
2979
2980         name = argv[0];
2981         value = strtoul(argv[1], NULL, 0);
2982
2983         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2984         if (ret == -1) {
2985                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2986                 return -1;
2987         }
2988         return 0;
2989 }
2990
2991 /*
2992   list all tunables
2993  */
2994 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2995 {
2996         uint32_t count;
2997         const char **list;
2998         int ret, i;
2999
3000         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3001         if (ret == -1) {
3002                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3003                 return -1;
3004         }
3005
3006         for (i=0;i<count;i++) {
3007                 control_getvar(ctdb, 1, &list[i]);
3008         }
3009
3010         talloc_free(list);
3011         
3012         return 0;
3013 }
3014
3015 /*
3016   display debug level on a node
3017  */
3018 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3019 {
3020         int ret;
3021         int32_t level;
3022
3023         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3024         if (ret != 0) {
3025                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3026                 return ret;
3027         } else {
3028                 if (options.machinereadable){
3029                         printf(":Name:Level:\n");
3030                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3031                 } else {
3032                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3033                 }
3034         }
3035         return 0;
3036 }
3037
3038 /*
3039   display reclock file of a node
3040  */
3041 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3042 {
3043         int ret;
3044         const char *reclock;
3045
3046         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3047         if (ret != 0) {
3048                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3049                 return ret;
3050         } else {
3051                 if (options.machinereadable){
3052                         if (reclock != NULL) {
3053                                 printf("%s", reclock);
3054                         }
3055                 } else {
3056                         if (reclock == NULL) {
3057                                 printf("No reclock file used.\n");
3058                         } else {
3059                                 printf("Reclock file:%s\n", reclock);
3060                         }
3061                 }
3062         }
3063         return 0;
3064 }
3065
3066 /*
3067   set the reclock file of a node
3068  */
3069 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3070 {
3071         int ret;
3072         const char *reclock;
3073
3074         if (argc == 0) {
3075                 reclock = NULL;
3076         } else if (argc == 1) {
3077                 reclock = argv[0];
3078         } else {
3079                 usage();
3080         }
3081
3082         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3083         if (ret != 0) {
3084                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3085                 return ret;
3086         }
3087         return 0;
3088 }
3089
3090 /*
3091   set the natgw state on/off
3092  */
3093 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3094 {
3095         int ret;
3096         uint32_t natgwstate;
3097
3098         if (argc == 0) {
3099                 usage();
3100         }
3101
3102         if (!strcmp(argv[0], "on")) {
3103                 natgwstate = 1;
3104         } else if (!strcmp(argv[0], "off")) {
3105                 natgwstate = 0;
3106         } else {
3107                 usage();
3108         }
3109
3110         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3111         if (ret != 0) {
3112                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3113                 return ret;
3114         }
3115
3116         return 0;
3117 }
3118
3119 /*
3120   set the lmaster role on/off
3121  */
3122 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3123 {
3124         int ret;
3125         uint32_t lmasterrole;
3126
3127         if (argc == 0) {
3128                 usage();
3129         }
3130
3131         if (!strcmp(argv[0], "on")) {
3132                 lmasterrole = 1;
3133         } else if (!strcmp(argv[0], "off")) {
3134                 lmasterrole = 0;
3135         } else {
3136                 usage();
3137         }
3138
3139         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3140         if (ret != 0) {
3141                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3142                 return ret;
3143         }
3144
3145         return 0;
3146 }
3147
3148 /*
3149   set the recmaster role on/off
3150  */
3151 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3152 {
3153         int ret;
3154         uint32_t recmasterrole;
3155
3156         if (argc == 0) {
3157                 usage();
3158         }
3159
3160         if (!strcmp(argv[0], "on")) {
3161                 recmasterrole = 1;
3162         } else if (!strcmp(argv[0], "off")) {
3163                 recmasterrole = 0;
3164         } else {
3165                 usage();
3166         }
3167
3168         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3169         if (ret != 0) {
3170                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3171                 return ret;
3172         }
3173
3174         return 0;
3175 }
3176
3177 /*
3178   set debug level on a node or all nodes
3179  */
3180 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3181 {
3182         int i, ret;
3183         int32_t level;
3184
3185         if (argc == 0) {
3186                 printf("You must specify the debug level. Valid levels are:\n");
3187                 for (i=0; debug_levels[i].description != NULL; i++) {
3188                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3189                 }
3190
3191                 return 0;
3192         }
3193
3194         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3195                 level = get_debug_by_desc(argv[0]);
3196         } else {
3197                 level = strtol(argv[0], NULL, 0);
3198         }
3199
3200         for (i=0; debug_levels[i].description != NULL; i++) {
3201                 if (level == debug_levels[i].level) {
3202                         break;
3203                 }
3204         }
3205         if (debug_levels[i].description == NULL) {
3206                 printf("Invalid debug level, must be one of\n");
3207                 for (i=0; debug_levels[i].description != NULL; i++) {
3208                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3209                 }
3210                 return -1;
3211         }
3212
3213         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3214         if (ret != 0) {
3215                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3216         }
3217         return 0;
3218 }
3219
3220
3221 /*
3222   freeze a node
3223  */
3224 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3225 {
3226         int ret;
3227         uint32_t priority;
3228         
3229         if (argc == 1) {
3230                 priority = strtol(argv[0], NULL, 0);
3231         } else {
3232                 priority = 0;
3233         }
3234         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3235
3236         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3237         if (ret != 0) {
3238                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3239         }               
3240         return 0;
3241 }
3242
3243 /*
3244   thaw a node
3245  */
3246 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3247 {
3248         int ret;
3249         uint32_t priority;
3250         
3251         if (argc == 1) {
3252                 priority = strtol(argv[0], NULL, 0);
3253         } else {
3254                 priority = 0;
3255         }
3256         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3257
3258         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3259         if (ret != 0) {
3260                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3261         }               
3262         return 0;
3263 }
3264
3265
3266 /*
3267   attach to a database
3268  */
3269 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3270 {
3271         const char *db_name;
3272         struct ctdb_db_context *ctdb_db;
3273
3274         if (argc < 1) {
3275                 usage();
3276         }
3277         db_name = argv[0];
3278
3279         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3280         if (ctdb_db == NULL) {
3281                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3282                 return -1;
3283         }
3284
3285         return 0;
3286 }
3287
3288 /*
3289   set db priority
3290  */
3291 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3292 {
3293         struct ctdb_db_priority db_prio;
3294         int ret;
3295
3296         if (argc < 2) {
3297                 usage();
3298         }
3299
3300         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3301         db_prio.priority = strtoul(argv[1], NULL, 0);
3302
3303         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3304         if (ret != 0) {
3305                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3306                 return -1;
3307         }
3308
3309         return 0;
3310 }
3311
3312 /*
3313   get db priority
3314  */
3315 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3316 {
3317         uint32_t db_id, priority;
3318         int ret;
3319
3320         if (argc < 1) {
3321                 usage();
3322         }
3323
3324         db_id = strtoul(argv[0], NULL, 0);
3325
3326         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3327         if (ret != 0) {
3328                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3329                 return -1;
3330         }
3331
3332         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3333
3334         return 0;
3335 }
3336
3337 /*
3338   run an eventscript on a node
3339  */
3340 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3341 {
3342         TDB_DATA data;
3343         int ret;
3344         int32_t res;
3345         char *errmsg;
3346         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3347
3348         if (argc != 1) {
3349                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3350                 return -1;
3351         }
3352
3353         data.dptr = (unsigned char *)discard_const(argv[0]);
3354         data.dsize = strlen((char *)data.dptr) + 1;
3355
3356         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3357
3358         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3359                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3360         if (ret != 0 || res != 0) {
3361                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3362                 talloc_free(tmp_ctx);
3363                 return -1;
3364         }
3365         talloc_free(tmp_ctx);
3366         return 0;
3367 }
3368
3369 #define DB_VERSION 1
3370 #define MAX_DB_NAME 64
3371 struct db_file_header {
3372         unsigned long version;
3373         time_t timestamp;
3374         unsigned long persistent;
3375         unsigned long size;
3376         const char name[MAX_DB_NAME];
3377 };
3378
3379 struct backup_data {
3380         struct ctdb_marshall_buffer *records;
3381         uint32_t len;
3382         uint32_t total;
3383         bool traverse_error;
3384 };
3385
3386 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3387 {
3388         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3389         struct ctdb_rec_data *rec;
3390
3391         /* add the record */
3392         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3393         if (rec == NULL) {
3394                 bd->traverse_error = true;
3395                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3396                 return -1;
3397         }
3398         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3399         if (bd->records == NULL) {
3400                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3401                 bd->traverse_error = true;
3402                 return -1;
3403         }
3404         bd->records->count++;
3405         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3406         bd->len += rec->length;
3407         talloc_free(rec);
3408
3409         bd->total++;
3410         return 0;
3411 }
3412
3413 /*
3414  * backup a database to a file 
3415  */
3416 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3417 {
3418         int i, ret;
3419         struct ctdb_dbid_map *dbmap=NULL;
3420         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3421         struct db_file_header dbhdr;
3422         struct ctdb_db_context *ctdb_db;
3423         struct backup_data *bd;
3424         int fh = -1;
3425         int status = -1;
3426         const char *reason = NULL;
3427
3428         if (argc != 2) {
3429                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3430                 return -1;
3431         }
3432
3433         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3434         if (ret != 0) {
3435                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3436                 return ret;
3437         }
3438
3439         for(i=0;i<dbmap->num;i++){
3440                 const char *name;
3441
3442                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3443                 if(!strcmp(argv[0], name)){
3444                         talloc_free(discard_const(name));
3445                         break;
3446                 }
3447                 talloc_free(discard_const(name));
3448         }
3449         if (i == dbmap->num) {
3450                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3451                 talloc_free(tmp_ctx);
3452                 return -1;
3453         }
3454
3455         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3456                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3457         if (ret != 0) {
3458                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3459                                  argv[0]));
3460                 talloc_free(tmp_ctx);
3461                 return -1;
3462         }
3463         if (reason) {
3464                 uint32_t allow_unhealthy = 0;
3465
3466                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3467                                       "AllowUnhealthyDBRead",
3468                                       &allow_unhealthy);
3469
3470                 if (allow_unhealthy != 1) {
3471                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3472                                          argv[0], reason));
3473
3474                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3475                                          allow_unhealthy));
3476                         talloc_free(tmp_ctx);
3477                         return -1;
3478                 }
3479
3480                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3481                                      argv[0], argv[0]));
3482                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3483                                      "tunnable AllowUnhealthyDBRead = %u\n",
3484                                      allow_unhealthy));
3485         }
3486
3487         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3488         if (ctdb_db == NULL) {
3489                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3490                 talloc_free(tmp_ctx);
3491                 return -1;
3492         }
3493
3494
3495         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3496         if (ret == -1) {
3497                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3498                 talloc_free(tmp_ctx);
3499                 return -1;
3500         }
3501
3502
3503         bd = talloc_zero(tmp_ctx, struct backup_data);
3504         if (bd == NULL) {
3505                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3506                 talloc_free(tmp_ctx);
3507                 return -1;
3508         }
3509
3510         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3511         if (bd->records == NULL) {
3512                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3513                 talloc_free(tmp_ctx);
3514                 return -1;
3515         }
3516
3517         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3518         bd->records->db_id = ctdb_db->db_id;
3519         /* traverse the database collecting all records */
3520         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3521             bd->traverse_error) {
3522                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3523                 talloc_free(tmp_ctx);
3524                 return -1;              
3525         }
3526
3527         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3528
3529
3530         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3531         if (fh == -1) {
3532                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3533                 talloc_free(tmp_ctx);
3534                 return -1;
3535         }
3536
3537         dbhdr.version = DB_VERSION;
3538         dbhdr.timestamp = time(NULL);
3539         dbhdr.persistent = dbmap->dbs[i].persistent;
3540         dbhdr.size = bd->len;
3541         if (strlen(argv[0]) >= MAX_DB_NAME) {
3542                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3543                 goto done;
3544         }
3545         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3546         ret = write(fh, &dbhdr, sizeof(dbhdr));
3547         if (ret == -1) {
3548                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3549                 goto done;
3550         }
3551         ret = write(fh, bd->records, bd->len);
3552         if (ret == -1) {
3553                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3554                 goto done;
3555         }
3556
3557         status = 0;
3558 done:
3559         if (fh != -1) {
3560                 ret = close(fh);
3561                 if (ret == -1) {
3562                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3563                 }
3564         }
3565         talloc_free(tmp_ctx);
3566         return status;
3567 }
3568
3569 /*
3570  * restore a database from a file 
3571  */
3572 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3573 {
3574         int ret;
3575         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3576         TDB_DATA outdata;
3577         TDB_DATA data;
3578         struct db_file_header dbhdr;
3579         struct ctdb_db_context *ctdb_db;
3580         struct ctdb_node_map *nodemap=NULL;
3581         struct ctdb_vnn_map *vnnmap=NULL;
3582         int i, fh;
3583         struct ctdb_control_wipe_database w;
3584         uint32_t *nodes;
3585         uint32_t generation;
3586         struct tm *tm;
3587         char tbuf[100];
3588
3589         if (argc != 1) {
3590                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3591                 return -1;
3592         }
3593
3594         fh = open(argv[0], O_RDONLY);
3595         if (fh == -1) {
3596                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3597                 talloc_free(tmp_ctx);
3598                 return -1;
3599         }
3600
3601         read(fh, &dbhdr, sizeof(dbhdr));
3602         if (dbhdr.version != DB_VERSION) {
3603                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3604                 talloc_free(tmp_ctx);
3605                 return -1;
3606         }
3607
3608         outdata.dsize = dbhdr.size;
3609         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3610         if (outdata.dptr == NULL) {
3611                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3612                 close(fh);
3613                 talloc_free(tmp_ctx);
3614                 return -1;
3615         }               
3616         read(fh, outdata.dptr, outdata.dsize);
3617         close(fh);
3618
3619         tm = localtime(&dbhdr.timestamp);
3620         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3621         printf("Restoring database '%s' from backup @ %s\n",
3622                 dbhdr.name, tbuf);
3623
3624
3625         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3626         if (ctdb_db == NULL) {
3627                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3628                 talloc_free(tmp_ctx);
3629                 return -1;
3630         }
3631
3632         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3633         if (ret != 0) {
3634                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3635                 talloc_free(tmp_ctx);
3636                 return ret;
3637         }
3638
3639
3640         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3641         if (ret != 0) {
3642                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3643                 talloc_free(tmp_ctx);
3644                 return ret;
3645         }
3646
3647         /* freeze all nodes */
3648         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3649         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3650                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3651                                         nodes, i,
3652                                         TIMELIMIT(),
3653                                         false, tdb_null,
3654                                         NULL, NULL,
3655                                         NULL) != 0) {
3656                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3657                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3658                         talloc_free(tmp_ctx);
3659                         return -1;
3660                 }
3661         }
3662
3663         generation = vnnmap->generation;
3664         data.dptr = (void *)&generation;
3665         data.dsize = sizeof(generation);
3666
3667         /* start a cluster wide transaction */
3668         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3669         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3670                                         nodes, 0,
3671                                         TIMELIMIT(), false, data,
3672                                         NULL, NULL,
3673                                         NULL) != 0) {
3674                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3675                 return -1;
3676         }
3677
3678
3679         w.db_id = ctdb_db->db_id;
3680         w.transaction_id = generation;
3681
3682         data.dptr = (void *)&w;
3683         data.dsize = sizeof(w);
3684
3685         /* wipe all the remote databases. */
3686         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3687         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3688                                         nodes, 0,
3689                                         TIMELIMIT(), false, data,
3690                                         NULL, NULL,
3691                                         NULL) != 0) {
3692                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3693                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3694                 talloc_free(tmp_ctx);
3695                 return -1;
3696         }
3697         
3698         /* push the database */
3699         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3700         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3701                                         nodes, 0,
3702                                         TIMELIMIT(), false, outdata,
3703                                         NULL, NULL,
3704                                         NULL) != 0) {
3705                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3706                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3707                 talloc_free(tmp_ctx);
3708                 return -1;
3709         }
3710
3711         data.dptr = (void *)&ctdb_db->db_id;
3712         data.dsize = sizeof(ctdb_db->db_id);
3713
3714         /* mark the database as healthy */
3715         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3716         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3717                                         nodes, 0,
3718                                         TIMELIMIT(), false, data,
3719                                         NULL, NULL,
3720                                         NULL) != 0) {
3721                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3722                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3723                 talloc_free(tmp_ctx);
3724                 return -1;
3725         }
3726
3727         data.dptr = (void *)&generation;
3728         data.dsize = sizeof(generation);
3729
3730         /* commit all the changes */
3731         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3732                                         nodes, 0,
3733                                         TIMELIMIT(), false, data,
3734                                         NULL, NULL,
3735                                         NULL) != 0) {
3736                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3737                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3738                 talloc_free(tmp_ctx);
3739                 return -1;
3740         }
3741
3742
3743         /* thaw all nodes */
3744         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3745         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3746                                         nodes, 0,
3747                                         TIMELIMIT(),
3748                                         false, tdb_null,
3749                                         NULL, NULL,
3750                                         NULL) != 0) {
3751                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3752                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3753                 talloc_free(tmp_ctx);
3754                 return -1;
3755         }
3756
3757
3758         talloc_free(tmp_ctx);
3759         return 0;
3760 }
3761
3762 /*
3763  * dump a database backup from a file
3764  */
3765 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3766 {
3767         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3768         TDB_DATA outdata;
3769         struct db_file_header dbhdr;
3770         int i, fh;
3771         struct tm *tm;
3772         char tbuf[100];
3773         struct ctdb_rec_data *rec = NULL;
3774         struct ctdb_marshall_buffer *m;
3775
3776         if (argc != 1) {
3777                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3778                 return -1;
3779         }
3780
3781         fh = open(argv[0], O_RDONLY);
3782         if (fh == -1) {
3783                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3784                 talloc_free(tmp_ctx);
3785                 return -1;
3786         }
3787
3788         read(fh, &dbhdr, sizeof(dbhdr));
3789         if (dbhdr.version != DB_VERSION) {
3790                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3791                 talloc_free(tmp_ctx);
3792                 return -1;
3793         }
3794
3795         outdata.dsize = dbhdr.size;
3796         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3797         if (outdata.dptr == NULL) {
3798                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3799                 close(fh);
3800                 talloc_free(tmp_ctx);
3801                 return -1;
3802         }
3803         read(fh, outdata.dptr, outdata.dsize);
3804         close(fh);
3805         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3806
3807         tm = localtime(&dbhdr.timestamp);
3808         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3809         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3810                 dbhdr.name, m->db_id, tbuf);
3811
3812         for (i=0; i < m->count; i++) {
3813                 uint32_t reqid = 0;
3814                 TDB_DATA key, data;
3815
3816                 /* we do not want the header splitted, so we pass NULL*/
3817                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3818                                               NULL, &key, &data);
3819
3820                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3821         }
3822
3823         printf("Dumped %d records\n", i);
3824         talloc_free(tmp_ctx);
3825         return 0;
3826 }
3827
3828 /*
3829  * wipe a database from a file
3830  */
3831 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3832                           const char **argv)
3833 {
3834         int ret;
3835         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3836         TDB_DATA data;
3837         struct ctdb_db_context *ctdb_db;
3838         struct ctdb_node_map *nodemap = NULL;
3839         struct ctdb_vnn_map *vnnmap = NULL;
3840         int i;
3841         struct ctdb_control_wipe_database w;
3842         uint32_t *nodes;
3843         uint32_t generation;
3844         struct ctdb_dbid_map *dbmap = NULL;
3845
3846         if (argc != 1) {
3847                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3848                 return -1;
3849         }
3850
3851         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3852                                  &dbmap);
3853         if (ret != 0) {
3854                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3855                                   options.pnn));
3856                 return ret;
3857         }
3858
3859         for(i=0;i<dbmap->num;i++){
3860                 const char *name;
3861
3862                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3863                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3864                 if(!strcmp(argv[0], name)){
3865                         talloc_free(discard_const(name));
3866                         break;
3867                 }
3868                 talloc_free(discard_const(name));
3869         }
3870         if (i == dbmap->num) {
3871                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3872                                   argv[0]));
3873                 talloc_free(tmp_ctx);
3874                 return -1;
3875         }
3876
3877         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3878         if (ctdb_db == NULL) {
3879                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3880                                   argv[0]));
3881                 talloc_free(tmp_ctx);
3882                 return -1;
3883         }
3884
3885         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3886                                    &nodemap);
3887         if (ret != 0) {
3888                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3889                                   options.pnn));
3890                 talloc_free(tmp_ctx);
3891                 return ret;
3892         }
3893
3894         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3895                                   &vnnmap);
3896         if (ret != 0) {
3897                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3898                                   options.pnn));
3899                 talloc_free(tmp_ctx);
3900                 return ret;
3901         }
3902
3903         /* freeze all nodes */
3904         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3905         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3906                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3907                                                 nodes, i,
3908                                                 TIMELIMIT(),
3909                                                 false, tdb_null,
3910                                                 NULL, NULL,
3911                                                 NULL);
3912                 if (ret != 0) {
3913                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3914                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3915                                              CTDB_RECOVERY_ACTIVE);
3916                         talloc_free(tmp_ctx);
3917                         return -1;
3918                 }
3919         }
3920
3921         generation = vnnmap->generation;
3922         data.dptr = (void *)&generation;
3923         data.dsize = sizeof(generation);
3924
3925         /* start a cluster wide transaction */
3926         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3927         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3928                                         nodes, 0,
3929                                         TIMELIMIT(), false, data,
3930                                         NULL, NULL,
3931                                         NULL);
3932         if (ret!= 0) {
3933                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3934                                   "transactions.\n"));
3935                 return -1;
3936         }
3937
3938         w.db_id = ctdb_db->db_id;
3939         w.transaction_id = generation;
3940
3941         data.dptr = (void *)&w;
3942         data.dsize = sizeof(w);
3943
3944         /* wipe all the remote databases. */
3945         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3946         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3947                                         nodes, 0,
3948                                         TIMELIMIT(), false, data,
3949                                         NULL, NULL,
3950                                         NULL) != 0) {
3951                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3952                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3953                 talloc_free(tmp_ctx);
3954                 return -1;
3955         }
3956
3957         data.dptr = (void *)&ctdb_db->db_id;
3958         data.dsize = sizeof(ctdb_db->db_id);
3959
3960         /* mark the database as healthy */
3961         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3962         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3963                                         nodes, 0,
3964                                         TIMELIMIT(), false, data,
3965                                         NULL, NULL,
3966                                         NULL) != 0) {
3967                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3968                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3969                 talloc_free(tmp_ctx);
3970                 return -1;
3971         }
3972
3973         data.dptr = (void *)&generation;
3974         data.dsize = sizeof(generation);
3975
3976         /* commit all the changes */
3977         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3978                                         nodes, 0,
3979                                         TIMELIMIT(), false, data,
3980                                         NULL, NULL,
3981                                         NULL) != 0) {
3982                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3983                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3984                 talloc_free(tmp_ctx);
3985                 return -1;
3986         }
3987
3988         /* thaw all nodes */
3989         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3990         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3991                                         nodes, 0,
3992                                         TIMELIMIT(),
3993                                         false, tdb_null,
3994                                         NULL, NULL,
3995                                         NULL) != 0) {
3996                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3997                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3998                 talloc_free(tmp_ctx);
3999                 return -1;
4000         }
4001
4002         talloc_free(tmp_ctx);
4003         return 0;
4004 }
4005
4006 /*
4007  * set flags of a node in the nodemap
4008  */
4009 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4010 {
4011         int ret;
4012         int32_t status;
4013         int node;
4014         int flags;
4015         TDB_DATA data;
4016         struct ctdb_node_flag_change c;
4017
4018         if (argc != 2) {
4019                 usage();
4020                 return -1;
4021         }
4022
4023         if (sscanf(argv[0], "%d", &node) != 1) {
4024                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4025                 usage();
4026                 return -1;
4027         }
4028         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4029                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4030                 usage();
4031                 return -1;
4032         }
4033
4034         c.pnn       = node;
4035         c.old_flags = 0;
4036         c.new_flags = flags;
4037
4038         data.dsize = sizeof(c);
4039         data.dptr = (unsigned char *)&c;
4040
4041         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4042                            data, NULL, NULL, &status, NULL, NULL);
4043         if (ret != 0 || status != 0) {
4044                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4045                 return -1;
4046         }
4047         return 0;
4048 }
4049
4050 /*
4051   dump memory usage
4052  */
4053 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4054 {
4055         TDB_DATA data;
4056         int ret;
4057         int32_t res;
4058         char *errmsg;
4059         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4060         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4061                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4062         if (ret != 0 || res != 0) {
4063                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4064                 talloc_free(tmp_ctx);
4065                 return -1;
4066         }
4067         write(1, data.dptr, data.dsize);
4068         talloc_free(tmp_ctx);
4069         return 0;
4070 }
4071
4072 /*
4073   handler for memory dumps
4074 */
4075 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4076                              TDB_DATA data, void *private_data)
4077 {
4078         write(1, data.dptr, data.dsize);
4079         exit(0);
4080 }
4081
4082 /*
4083   dump memory usage on the recovery daemon
4084  */
4085 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4086 {
4087         int ret;
4088         TDB_DATA data;
4089         struct rd_memdump_reply rd;
4090
4091         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4092         if (rd.pnn == -1) {
4093                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4094                 return -1;
4095         }
4096         rd.srvid = getpid();
4097
4098         /* register a message port for receiveing the reply so that we
4099            can receive the reply
4100         */
4101         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4102
4103
4104         data.dptr = (uint8_t *)&rd;
4105         data.dsize = sizeof(rd);
4106
4107         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4108         if (ret != 0) {
4109                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4110                 return -1;
4111         }
4112
4113         /* this loop will terminate when we have received the reply */
4114         while (1) {     
4115                 event_loop_once(ctdb->ev);
4116         }
4117
4118         return 0;
4119 }
4120
4121 /*
4122   send a message to a srvid
4123  */
4124 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4125 {
4126         unsigned long srvid;
4127         int ret;
4128         TDB_DATA data;
4129
4130         if (argc < 2) {
4131                 usage();
4132         }
4133
4134         srvid      = strtoul(argv[0], NULL, 0);
4135
4136         data.dptr = (uint8_t *)discard_const(argv[1]);
4137         data.dsize= strlen(argv[1]);
4138
4139         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4140         if (ret != 0) {
4141                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4142                 return -1;
4143         }
4144
4145         return 0;
4146 }
4147
4148 /*
4149   handler for msglisten
4150 */
4151 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4152                              TDB_DATA data, void *private_data)
4153 {
4154         int i;
4155
4156         printf("Message received: ");
4157         for (i=0;i<data.dsize;i++) {
4158                 printf("%c", data.dptr[i]);
4159         }
4160         printf("\n");
4161 }
4162
4163 /*
4164   listen for messages on a messageport
4165  */
4166 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4167 {
4168         uint64_t srvid;
4169
4170         srvid = getpid();
4171
4172         /* register a message port and listen for messages
4173         */
4174         ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4175         printf("Listening for messages on srvid:%d\n", (int)srvid);
4176
4177         while (1) {     
4178                 event_loop_once(ctdb->ev);
4179         }
4180
4181         return 0;
4182 }
4183
4184 /*
4185   list all nodes in the cluster
4186   if the daemon is running, we read the data from the daemon.
4187   if the daemon is not running we parse the nodes file directly
4188  */
4189 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4190 {
4191         int i, ret;
4192         struct ctdb_node_map *nodemap=NULL;
4193
4194         if (ctdb != NULL) {
4195                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4196                 if (ret != 0) {
4197                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4198                         return ret;
4199                 }
4200
4201                 for(i=0;i<nodemap->num;i++){
4202                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4203                                 continue;
4204                         }
4205                         if (options.machinereadable){
4206                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4207                         } else {
4208                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4209                         }
4210                 }
4211         } else {
4212                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4213                 struct pnn_node *pnn_nodes;
4214                 struct pnn_node *pnn_node;
4215         
4216                 pnn_nodes = read_nodes_file(mem_ctx);
4217                 if (pnn_nodes == NULL) {
4218                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4219                         talloc_free(mem_ctx);
4220                         return -1;
4221                 }
4222
4223                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4224                         ctdb_sock_addr addr;
4225
4226                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4227                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4228                                 talloc_free(mem_ctx);
4229                                 return -1;
4230                         }
4231
4232                         if (options.machinereadable){
4233                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4234                         } else {
4235                                 printf("%s\n", pnn_node->addr);
4236                         }
4237                 }
4238                 talloc_free(mem_ctx);
4239         }
4240
4241         return 0;
4242 }
4243
4244 /*
4245   reload the nodes file on the local node
4246  */
4247 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4248 {
4249         int i, ret;
4250         int mypnn;
4251         struct ctdb_node_map *nodemap=NULL;
4252
4253         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4254         if (mypnn == -1) {
4255                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4256                 return -1;
4257         }
4258
4259         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4260         if (ret != 0) {
4261                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4262                 return ret;
4263         }
4264
4265         /* reload the nodes file on all remote nodes */
4266         for (i=0;i<nodemap->num;i++) {
4267                 if (nodemap->nodes[i].pnn == mypnn) {
4268                         continue;
4269                 }
4270                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4271                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4272                         nodemap->nodes[i].pnn);
4273                 if (ret != 0) {
4274                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4275                 }
4276         }
4277
4278         /* reload the nodes file on the local node */
4279         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4280         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4281         if (ret != 0) {
4282                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4283         }
4284
4285         /* initiate a recovery */
4286         control_recover(ctdb, argc, argv);
4287
4288         return 0;
4289 }
4290
4291
4292 static const struct {
4293         const char *name;
4294         int (*fn)(struct ctdb_context *, int, const char **);
4295         bool auto_all;
4296         bool without_daemon; /* can be run without daemon running ? */
4297         const char *msg;
4298         const char *args;
4299 } ctdb_commands[] = {
4300 #ifdef CTDB_VERS
4301         { "version",         control_version,           true,   false,  "show version of ctdb" },
4302 #endif
4303         { "status",          control_status,            true,   false,  "show node status" },
4304         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4305         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4306         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4307         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4308         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4309         { "statistics",      control_statistics,        false,  false, "show statistics" },
4310         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4311         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4312         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4313         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4314         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4315         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4316         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4317         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4318         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4319         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4320         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4321         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4322         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4323         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4324         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4325         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4326         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4327         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4328         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4329         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4330         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4331         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4332         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4333         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4334         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4335         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4336         { "stop",            control_stop,              true,   false,  "stop a node" },
4337         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4338         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4339         { "unban",           control_unban,             true,   false,  "unban a node" },
4340         { "showban",         control_showban,           true,   false,  "show ban information"},
4341         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4342         { "recover",         control_recover,           true,   false,  "force recovery" },
4343         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4344         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4345         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4346         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4347         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4348         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4349         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4350         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4351
4352         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4353         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4354         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4355         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4356         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4357         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4358         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4359         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4360         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4361         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4362         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4363         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4364         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4365         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4366         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4367         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4368         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4369         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4370         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4371         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4372         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4373         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4374         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4375         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4376         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4377         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4378         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4379         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4380         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4381         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4382         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4383         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4384 };
4385
4386 /*
4387   show usage message
4388  */
4389 static void usage(void)
4390 {
4391         int i;
4392         printf(
4393 "Usage: ctdb [options] <control>\n" \
4394 "Options:\n" \
4395 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4396 "   -Y                 generate machinereadable output\n"
4397 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4398         printf("Controls:\n");
4399         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4400                 printf("  %-15s %-27s  %s\n", 
4401                        ctdb_commands[i].name, 
4402                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4403                        ctdb_commands[i].msg);
4404         }
4405         exit(1);
4406 }
4407
4408
4409 static void ctdb_alarm(int sig)
4410 {
4411         printf("Maximum runtime exceeded - exiting\n");
4412         _exit(ERR_TIMEOUT);
4413 }
4414
4415 /*
4416   main program
4417 */
4418 int main(int argc, const char *argv[])
4419 {
4420         struct ctdb_context *ctdb;
4421         char *nodestring = NULL;
4422         struct poptOption popt_options[] = {
4423                 POPT_AUTOHELP
4424                 POPT_CTDB_CMDLINE
4425                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4426                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4427                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4428                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4429                 POPT_TABLEEND
4430         };
4431         int opt;
4432         const char **extra_argv;
4433         int extra_argc = 0;
4434         int ret=-1, i;
4435         poptContext pc;
4436         struct event_context *ev;
4437         const char *control;
4438
4439         setlinebuf(stdout);
4440         
4441         /* set some defaults */
4442         options.maxruntime = 0;
4443         options.timelimit = 3;
4444         options.pnn = CTDB_CURRENT_NODE;
4445
4446         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4447
4448         while ((opt = poptGetNextOpt(pc)) != -1) {
4449                 switch (opt) {
4450                 default:
4451                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4452                                 poptBadOption(pc, 0), poptStrerror(opt)));
4453                         exit(1);
4454                 }
4455         }
4456
4457         /* setup the remaining options for the main program to use */
4458         extra_argv = poptGetArgs(pc);
4459         if (extra_argv) {
4460                 extra_argv++;
4461                 while (extra_argv[extra_argc]) extra_argc++;
4462         }
4463
4464         if (extra_argc < 1) {
4465                 usage();
4466         }
4467
4468         if (options.maxruntime == 0) {
4469                 const char *ctdb_timeout;
4470                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4471                 if (ctdb_timeout != NULL) {
4472                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4473                 } else {
4474                         /* default timeout is 120 seconds */
4475                         options.maxruntime = 120;
4476                 }
4477         }
4478
4479         signal(SIGALRM, ctdb_alarm);
4480         alarm(options.maxruntime);
4481
4482         /* setup the node number to contact */
4483         if (nodestring != NULL) {
4484                 if (strcmp(nodestring, "all") == 0) {
4485                         options.pnn = CTDB_BROADCAST_ALL;
4486                 } else {
4487                         options.pnn = strtoul(nodestring, NULL, 0);
4488                 }
4489         }
4490
4491         control = extra_argv[0];
4492
4493         ev = event_context_init(NULL);
4494
4495         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4496                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4497                         int j;
4498
4499                         if (ctdb_commands[i].without_daemon == true) {
4500                                 close(2);
4501                         }
4502
4503                         /* initialise ctdb */
4504                         ctdb = ctdb_cmdline_client(ev);
4505
4506                         if (ctdb_commands[i].without_daemon == false) {
4507                                 if (ctdb == NULL) {
4508                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4509                                         exit(1);
4510                                 }
4511
4512                                 /* verify the node exists */
4513                                 verify_node(ctdb);
4514
4515                                 if (options.pnn == CTDB_CURRENT_NODE) {
4516                                         int pnn;
4517                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4518                                         if (pnn == -1) {
4519                                                 return -1;
4520                                         }
4521                                         options.pnn = pnn;
4522                                 }
4523                         }
4524
4525                         if (ctdb_commands[i].auto_all && 
4526                             options.pnn == CTDB_BROADCAST_ALL) {
4527                                 uint32_t *nodes;
4528                                 uint32_t num_nodes;
4529                                 ret = 0;
4530
4531                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4532                                 CTDB_NO_MEMORY(ctdb, nodes);
4533         
4534                                 for (j=0;j<num_nodes;j++) {
4535                                         options.pnn = nodes[j];
4536                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4537                                 }
4538                                 talloc_free(nodes);
4539                         } else {
4540                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4541                         }
4542                         break;
4543                 }
4544         }
4545
4546         if (i == ARRAY_SIZE(ctdb_commands)) {
4547                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4548                 exit(1);
4549         }
4550
4551         return ret;
4552 }