2939f88c57884dc76b9c156865c4c3eccb7921c2
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(num_recoveries),
170                 STATISTICS_FIELD(client_packets_sent),
171                 STATISTICS_FIELD(client_packets_recv),
172                 STATISTICS_FIELD(node_packets_sent),
173                 STATISTICS_FIELD(node_packets_recv),
174                 STATISTICS_FIELD(keepalive_packets_sent),
175                 STATISTICS_FIELD(keepalive_packets_recv),
176                 STATISTICS_FIELD(node.req_call),
177                 STATISTICS_FIELD(node.reply_call),
178                 STATISTICS_FIELD(node.req_dmaster),
179                 STATISTICS_FIELD(node.reply_dmaster),
180                 STATISTICS_FIELD(node.reply_error),
181                 STATISTICS_FIELD(node.req_message),
182                 STATISTICS_FIELD(node.req_control),
183                 STATISTICS_FIELD(node.reply_control),
184                 STATISTICS_FIELD(client.req_call),
185                 STATISTICS_FIELD(client.req_message),
186                 STATISTICS_FIELD(client.req_control),
187                 STATISTICS_FIELD(timeouts.call),
188                 STATISTICS_FIELD(timeouts.control),
189                 STATISTICS_FIELD(timeouts.traverse),
190                 STATISTICS_FIELD(total_calls),
191                 STATISTICS_FIELD(pending_calls),
192                 STATISTICS_FIELD(lockwait_calls),
193                 STATISTICS_FIELD(pending_lockwait_calls),
194                 STATISTICS_FIELD(childwrite_calls),
195                 STATISTICS_FIELD(pending_childwrite_calls),
196                 STATISTICS_FIELD(memory_used),
197                 STATISTICS_FIELD(max_hop_count),
198         };
199         printf("CTDB version %u\n", CTDB_VERSION);
200         for (i=0;i<ARRAY_SIZE(fields);i++) {
201                 if (strchr(fields[i].name, '.')) {
202                         preflen = strcspn(fields[i].name, ".")+1;
203                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
204                                 prefix = fields[i].name;
205                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
206                         }
207                 } else {
208                         preflen = 0;
209                 }
210                 printf(" %*s%-22s%*s%10u\n", 
211                        preflen?4:0, "",
212                        fields[i].name+preflen, 
213                        preflen?0:4, "",
214                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
215         }
216         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
217         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
218
219         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
220         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
221         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
222         talloc_free(tmp_ctx);
223 }
224
225 /*
226   display remote ctdb statistics combined from all nodes
227  */
228 static int control_statistics_all(struct ctdb_context *ctdb)
229 {
230         int ret, i;
231         struct ctdb_statistics statistics;
232         uint32_t *nodes;
233         uint32_t num_nodes;
234
235         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
236         CTDB_NO_MEMORY(ctdb, nodes);
237         
238         ZERO_STRUCT(statistics);
239
240         for (i=0;i<num_nodes;i++) {
241                 struct ctdb_statistics s1;
242                 int j;
243                 uint32_t *v1 = (uint32_t *)&s1;
244                 uint32_t *v2 = (uint32_t *)&statistics;
245                 uint32_t num_ints = 
246                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
247                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
248                 if (ret != 0) {
249                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
250                         return ret;
251                 }
252                 for (j=0;j<num_ints;j++) {
253                         v2[j] += v1[j];
254                 }
255                 statistics.max_hop_count = 
256                         MAX(statistics.max_hop_count, s1.max_hop_count);
257                 statistics.max_call_latency = 
258                         MAX(statistics.max_call_latency, s1.max_call_latency);
259                 statistics.max_lockwait_latency = 
260                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
261         }
262         talloc_free(nodes);
263         printf("Gathered statistics for %u nodes\n", num_nodes);
264         show_statistics(&statistics);
265         return 0;
266 }
267
268 /*
269   display remote ctdb statistics
270  */
271 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
272 {
273         int ret;
274         struct ctdb_statistics statistics;
275
276         if (options.pnn == CTDB_BROADCAST_ALL) {
277                 return control_statistics_all(ctdb);
278         }
279
280         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
281         if (ret != 0) {
282                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
283                 return ret;
284         }
285         show_statistics(&statistics);
286         return 0;
287 }
288
289
290 /*
291   reset remote ctdb statistics
292  */
293 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
294 {
295         int ret;
296
297         ret = ctdb_statistics_reset(ctdb, options.pnn);
298         if (ret != 0) {
299                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
300                 return ret;
301         }
302         return 0;
303 }
304
305
306 /*
307   display uptime of remote node
308  */
309 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
310 {
311         int ret;
312         struct ctdb_uptime *uptime = NULL;
313         int tmp, days, hours, minutes, seconds;
314
315         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
316         if (ret != 0) {
317                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
318                 return ret;
319         }
320
321         if (options.machinereadable){
322                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
323                 printf(":%u:%u:%u:%lf\n",
324                         (unsigned int)uptime->current_time.tv_sec,
325                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
326                         (unsigned int)uptime->last_recovery_finished.tv_sec,
327                         timeval_delta(&uptime->last_recovery_finished,
328                                       &uptime->last_recovery_started)
329                 );
330                 return 0;
331         }
332
333         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
334
335         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
336         seconds = tmp%60;
337         tmp    /= 60;
338         minutes = tmp%60;
339         tmp    /= 60;
340         hours   = tmp%24;
341         tmp    /= 24;
342         days    = tmp;
343         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
344
345         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
346         seconds = tmp%60;
347         tmp    /= 60;
348         minutes = tmp%60;
349         tmp    /= 60;
350         hours   = tmp%24;
351         tmp    /= 24;
352         days    = tmp;
353         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
354         
355         printf("Duration of last recovery/failover: %lf seconds\n",
356                 timeval_delta(&uptime->last_recovery_finished,
357                               &uptime->last_recovery_started));
358
359         return 0;
360 }
361
362 /*
363   show the PNN of the current node
364  */
365 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
366 {
367         int mypnn;
368
369         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
370         if (mypnn == -1) {
371                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
372                 return -1;
373         }
374
375         printf("PNN:%d\n", mypnn);
376         return 0;
377 }
378
379
380 struct pnn_node {
381         struct pnn_node *next;
382         const char *addr;
383         int pnn;
384 };
385
386 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
387 {
388         const char *nodes_list;
389         int nlines;
390         char **lines;
391         int i, pnn;
392         struct pnn_node *pnn_nodes = NULL;
393         struct pnn_node *pnn_node;
394         struct pnn_node *tmp_node;
395
396         /* read the nodes file */
397         nodes_list = getenv("CTDB_NODES");
398         if (nodes_list == NULL) {
399                 nodes_list = "/etc/ctdb/nodes";
400         }
401         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
402         if (lines == NULL) {
403                 return NULL;
404         }
405         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
406                 nlines--;
407         }
408         for (i=0, pnn=0; i<nlines; i++) {
409                 char *node;
410
411                 node = lines[i];
412                 /* strip leading spaces */
413                 while((*node == ' ') || (*node == '\t')) {
414                         node++;
415                 }
416                 if (*node == '#') {
417                         pnn++;
418                         continue;
419                 }
420                 if (strcmp(node, "") == 0) {
421                         continue;
422                 }
423                 pnn_node = talloc(mem_ctx, struct pnn_node);
424                 pnn_node->pnn = pnn++;
425                 pnn_node->addr = talloc_strdup(pnn_node, node);
426                 pnn_node->next = pnn_nodes;
427                 pnn_nodes = pnn_node;
428         }
429
430         /* swap them around so we return them in incrementing order */
431         pnn_node = pnn_nodes;
432         pnn_nodes = NULL;
433         while (pnn_node) {
434                 tmp_node = pnn_node;
435                 pnn_node = pnn_node->next;
436
437                 tmp_node->next = pnn_nodes;
438                 pnn_nodes = tmp_node;
439         }
440
441         return pnn_nodes;
442 }
443
444 /*
445   show the PNN of the current node
446   discover the pnn by loading the nodes file and try to bind to all
447   addresses one at a time until the ip address is found.
448  */
449 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
450 {
451         TALLOC_CTX *mem_ctx = talloc_new(NULL);
452         struct pnn_node *pnn_nodes;
453         struct pnn_node *pnn_node;
454
455         pnn_nodes = read_nodes_file(mem_ctx);
456         if (pnn_nodes == NULL) {
457                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
458                 talloc_free(mem_ctx);
459                 return -1;
460         }
461
462         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
463                 ctdb_sock_addr addr;
464
465                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
466                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
467                         talloc_free(mem_ctx);
468                         return -1;
469                 }
470
471                 if (ctdb_sys_have_ip(&addr)) {
472                         printf("PNN:%d\n", pnn_node->pnn);
473                         talloc_free(mem_ctx);
474                         return 0;
475                 }
476         }
477
478         printf("Failed to detect which PNN this node is\n");
479         talloc_free(mem_ctx);
480         return -1;
481 }
482
483 /*
484   display remote ctdb status
485  */
486 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
487 {
488         int i, ret;
489         struct ctdb_vnn_map *vnnmap=NULL;
490         struct ctdb_node_map *nodemap=NULL;
491         uint32_t recmode, recmaster;
492         int mypnn;
493
494         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
495         if (mypnn == -1) {
496                 return -1;
497         }
498
499         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
500         if (ret != 0) {
501                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
502                 return ret;
503         }
504
505         if(options.machinereadable){
506                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:\n");
507                 for(i=0;i<nodemap->num;i++){
508                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
509                                 continue;
510                         }
511                         printf(":%d:%s:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
512                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
517                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
518                 }
519                 return 0;
520         }
521
522         printf("Number of nodes:%d\n", nodemap->num);
523         for(i=0;i<nodemap->num;i++){
524                 static const struct {
525                         uint32_t flag;
526                         const char *name;
527                 } flag_names[] = {
528                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
529                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
530                         { NODE_FLAGS_BANNED,                "BANNED" },
531                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
532                         { NODE_FLAGS_DELETED,               "DELETED" },
533                         { NODE_FLAGS_STOPPED,               "STOPPED" },
534                 };
535                 char *flags_str = NULL;
536                 int j;
537
538                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
539                         continue;
540                 }
541                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
542                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
543                                 if (flags_str == NULL) {
544                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
545                                 } else {
546                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
547                                                                            flag_names[j].name);
548                                 }
549                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
550                         }
551                 }
552                 if (flags_str == NULL) {
553                         flags_str = talloc_strdup(ctdb, "OK");
554                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
555                 }
556                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
557                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
558                        flags_str,
559                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
560                 talloc_free(flags_str);
561         }
562
563         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
564         if (ret != 0) {
565                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
566                 return ret;
567         }
568         if (vnnmap->generation == INVALID_GENERATION) {
569                 printf("Generation:INVALID\n");
570         } else {
571                 printf("Generation:%d\n",vnnmap->generation);
572         }
573         printf("Size:%d\n",vnnmap->size);
574         for(i=0;i<vnnmap->size;i++){
575                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
576         }
577
578         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
579         if (ret != 0) {
580                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
581                 return ret;
582         }
583         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
584
585         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
586         if (ret != 0) {
587                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
588                 return ret;
589         }
590         printf("Recovery master:%d\n",recmaster);
591
592         return 0;
593 }
594
595
596 struct natgw_node {
597         struct natgw_node *next;
598         const char *addr;
599 };
600
601 /*
602   display the list of nodes belonging to this natgw configuration
603  */
604 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
605 {
606         int i, ret;
607         const char *natgw_list;
608         int nlines;
609         char **lines;
610         struct natgw_node *natgw_nodes = NULL;
611         struct natgw_node *natgw_node;
612         struct ctdb_node_map *nodemap=NULL;
613
614
615         /* read the natgw nodes file into a linked list */
616         natgw_list = getenv("NATGW_NODES");
617         if (natgw_list == NULL) {
618                 natgw_list = "/etc/ctdb/natgw_nodes";
619         }
620         lines = file_lines_load(natgw_list, &nlines, ctdb);
621         if (lines == NULL) {
622                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
623                 return -1;
624         }
625         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
626                 nlines--;
627         }
628         for (i=0;i<nlines;i++) {
629                 char *node;
630
631                 node = lines[i];
632                 /* strip leading spaces */
633                 while((*node == ' ') || (*node == '\t')) {
634                         node++;
635                 }
636                 if (*node == '#') {
637                         continue;
638                 }
639                 if (strcmp(node, "") == 0) {
640                         continue;
641                 }
642                 natgw_node = talloc(ctdb, struct natgw_node);
643                 natgw_node->addr = talloc_strdup(natgw_node, node);
644                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
645                 natgw_node->next = natgw_nodes;
646                 natgw_nodes = natgw_node;
647         }
648
649         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
650         if (ret != 0) {
651                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
652                 return ret;
653         }
654
655         i=0;
656         while(i<nodemap->num) {
657                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
658                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
659                                 break;
660                         }
661                 }
662
663                 /* this node was not in the natgw so we just remove it from
664                  * the list
665                  */
666                 if ((natgw_node == NULL) 
667                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
668                         int j;
669
670                         for (j=i+1; j<nodemap->num; j++) {
671                                 nodemap->nodes[j-1] = nodemap->nodes[j];
672                         }
673                         nodemap->num--;
674                         continue;
675                 }
676
677                 i++;
678         }               
679
680         /* pick a node to be natgwmaster
681          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
682          */
683         for(i=0;i<nodemap->num;i++){
684                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
685                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
686                         break;
687                 }
688         }
689         /* we couldnt find any healthy node, try unhealthy ones */
690         if (i == nodemap->num) {
691                 for(i=0;i<nodemap->num;i++){
692                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
693                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
694                                 break;
695                         }
696                 }
697         }
698         /* unless all nodes are STOPPED, when we pick one anyway */
699         if (i == nodemap->num) {
700                 for(i=0;i<nodemap->num;i++){
701                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
702                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
703                                 break;
704                         }
705                 }
706                 /* or if we still can not find any */
707                 if (i == nodemap->num) {
708                         printf("-1 0.0.0.0\n");
709                 }
710         }
711
712         /* print the pruned list of nodes belonging to this natgw list */
713         for(i=0;i<nodemap->num;i++){
714                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
715                         continue;
716                 }
717                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
718                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
719                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
723                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
724         }
725
726         return 0;
727 }
728
729 /*
730   display the status of the scripts for monitoring (or other events)
731  */
732 static int control_one_scriptstatus(struct ctdb_context *ctdb,
733                                     enum ctdb_eventscript_call type)
734 {
735         struct ctdb_scripts_wire *script_status;
736         int ret, i;
737
738         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
739         if (ret != 0) {
740                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
741                 return ret;
742         }
743
744         if (script_status == NULL) {
745                 if (!options.machinereadable) {
746                         printf("%s cycle never run\n",
747                                ctdb_eventscript_call_names[type]);
748                 }
749                 return 0;
750         }
751
752         if (!options.machinereadable) {
753                 printf("%d scripts were executed last %s cycle\n",
754                        script_status->num_scripts,
755                        ctdb_eventscript_call_names[type]);
756         }
757         for (i=0; i<script_status->num_scripts; i++) {
758                 const char *status = NULL;
759
760                 switch (script_status->scripts[i].status) {
761                 case -ETIME:
762                         status = "TIMEDOUT";
763                         break;
764                 case -ENOEXEC:
765                         status = "DISABLED";
766                         break;
767                 case 0:
768                         status = "OK";
769                         break;
770                 default:
771                         if (script_status->scripts[i].status > 0)
772                                 status = "ERROR";
773                         break;
774                 }
775                 if (options.machinereadable) {
776                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
777                                ctdb_eventscript_call_names[type],
778                                script_status->scripts[i].name,
779                                script_status->scripts[i].status,
780                                status,
781                                (long)script_status->scripts[i].start.tv_sec,
782                                (long)script_status->scripts[i].start.tv_usec,
783                                (long)script_status->scripts[i].finished.tv_sec,
784                                (long)script_status->scripts[i].finished.tv_usec,
785                                script_status->scripts[i].output);
786                         continue;
787                 }
788                 if (status)
789                         printf("%-20s Status:%s    ",
790                                script_status->scripts[i].name, status);
791                 else
792                         /* Some other error, eg from stat. */
793                         printf("%-20s Status:CANNOT RUN (%s)",
794                                script_status->scripts[i].name,
795                                strerror(-script_status->scripts[i].status));
796
797                 if (script_status->scripts[i].status >= 0) {
798                         printf("Duration:%.3lf ",
799                         timeval_delta(&script_status->scripts[i].finished,
800                               &script_status->scripts[i].start));
801                 }
802                 if (script_status->scripts[i].status != -ENOEXEC) {
803                         printf("%s",
804                                ctime(&script_status->scripts[i].start.tv_sec));
805                         if (script_status->scripts[i].status != 0) {
806                                 printf("   OUTPUT:%s\n",
807                                        script_status->scripts[i].output);
808                         }
809                 } else {
810                         printf("\n");
811                 }
812         }
813         return 0;
814 }
815
816
817 static int control_scriptstatus(struct ctdb_context *ctdb,
818                                 int argc, const char **argv)
819 {
820         int ret;
821         enum ctdb_eventscript_call type, min, max;
822         const char *arg;
823
824         if (argc > 1) {
825                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
826                 return -1;
827         }
828
829         if (argc == 0)
830                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
831         else
832                 arg = argv[0];
833
834         for (type = 0; type < CTDB_EVENT_MAX; type++) {
835                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
836                         min = type;
837                         max = type+1;
838                         break;
839                 }
840         }
841         if (type == CTDB_EVENT_MAX) {
842                 if (strcmp(arg, "all") == 0) {
843                         min = 0;
844                         max = CTDB_EVENT_MAX;
845                 } else {
846                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
847                         return -1;
848                 }
849         }
850
851         if (options.machinereadable) {
852                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
853         }
854
855         for (type = min; type < max; type++) {
856                 ret = control_one_scriptstatus(ctdb, type);
857                 if (ret != 0) {
858                         return ret;
859                 }
860         }
861
862         return 0;
863 }
864
865 /*
866   enable an eventscript
867  */
868 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
869 {
870         int ret;
871
872         if (argc < 1) {
873                 usage();
874         }
875
876         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
877         if (ret != 0) {
878           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
879                 return ret;
880         }
881
882         return 0;
883 }
884
885 /*
886   disable an eventscript
887  */
888 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
889 {
890         int ret;
891
892         if (argc < 1) {
893                 usage();
894         }
895
896         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
897         if (ret != 0) {
898           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
899                 return ret;
900         }
901
902         return 0;
903 }
904
905 /*
906   display the pnn of the recovery master
907  */
908 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
909 {
910         int ret;
911         uint32_t recmaster;
912
913         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
914         if (ret != 0) {
915                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
916                 return ret;
917         }
918         printf("%d\n",recmaster);
919
920         return 0;
921 }
922
923 /*
924   get a list of all tickles for this pnn
925  */
926 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
927 {
928         struct ctdb_control_tcp_tickle_list *list;
929         ctdb_sock_addr addr;
930         int i, ret;
931
932         if (argc < 1) {
933                 usage();
934         }
935
936         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
937                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
938                 return -1;
939         }
940
941         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
942         if (ret == -1) {
943                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
944                 return -1;
945         }
946
947         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
948         printf("Num tickles:%u\n", list->tickles.num);
949         for (i=0;i<list->tickles.num;i++) {
950                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
951                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
952         }
953
954         talloc_free(list);
955         
956         return 0;
957 }
958
959
960
961 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
962 {
963         struct ctdb_all_public_ips *ips;
964         struct ctdb_public_ip ip;
965         int i, ret;
966         uint32_t *nodes;
967         uint32_t disable_time;
968         TDB_DATA data;
969         struct ctdb_node_map *nodemap=NULL;
970         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
971
972         disable_time = 30;
973         data.dptr  = (uint8_t*)&disable_time;
974         data.dsize = sizeof(disable_time);
975         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
976         if (ret != 0) {
977                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
978                 return -1;
979         }
980
981
982
983         /* read the public ip list from the node */
984         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
985         if (ret != 0) {
986                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
987                 talloc_free(tmp_ctx);
988                 return -1;
989         }
990
991         for (i=0;i<ips->num;i++) {
992                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
993                         break;
994                 }
995         }
996         if (i==ips->num) {
997                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
998                         pnn, ctdb_addr_to_str(addr)));
999                 talloc_free(tmp_ctx);
1000                 return -1;
1001         }
1002
1003         ip.pnn  = pnn;
1004         ip.addr = *addr;
1005
1006         data.dptr  = (uint8_t *)&ip;
1007         data.dsize = sizeof(ip);
1008
1009         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1010         if (ret != 0) {
1011                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1012                 talloc_free(tmp_ctx);
1013                 return ret;
1014         }
1015
1016         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1017         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1018                                         nodes, 0,
1019                                         TIMELIMIT(),
1020                                         false, data,
1021                                         NULL, NULL,
1022                                         NULL);
1023         if (ret != 0) {
1024                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1025                 talloc_free(tmp_ctx);
1026                 return -1;
1027         }
1028
1029         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1030         if (ret != 0) {
1031                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1032                 talloc_free(tmp_ctx);
1033                 return -1;
1034         }
1035
1036         /* update the recovery daemon so it now knows to expect the new
1037            node assignment for this ip.
1038         */
1039         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1040         if (ret != 0) {
1041                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1042                 return -1;
1043         }
1044
1045         talloc_free(tmp_ctx);
1046         return 0;
1047 }
1048
1049 /*
1050   move/failover an ip address to a specific node
1051  */
1052 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1053 {
1054         uint32_t pnn;
1055         ctdb_sock_addr addr;
1056
1057         if (argc < 2) {
1058                 usage();
1059                 return -1;
1060         }
1061
1062         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1063                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1064                 return -1;
1065         }
1066
1067
1068         if (sscanf(argv[1], "%u", &pnn) != 1) {
1069                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1070                 return -1;
1071         }
1072
1073         if (move_ip(ctdb, &addr, pnn) != 0) {
1074                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1075                 return -1;
1076         }
1077
1078         return 0;
1079 }
1080
1081 void getips_store_callback(void *param, void *data)
1082 {
1083         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1084         struct ctdb_all_public_ips *ips = param;
1085         int i;
1086
1087         i = ips->num++;
1088         ips->ips[i].pnn  = node_ip->pnn;
1089         ips->ips[i].addr = node_ip->addr;
1090 }
1091
1092 void getips_count_callback(void *param, void *data)
1093 {
1094         uint32_t *count = param;
1095
1096         (*count)++;
1097 }
1098
1099 #define IP_KEYLEN       4
1100 static uint32_t *ip_key(ctdb_sock_addr *ip)
1101 {
1102         static uint32_t key[IP_KEYLEN];
1103
1104         bzero(key, sizeof(key));
1105
1106         switch (ip->sa.sa_family) {
1107         case AF_INET:
1108                 key[0]  = ip->ip.sin_addr.s_addr;
1109                 break;
1110         case AF_INET6:
1111                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1112                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1113                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1114                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1115                 break;
1116         default:
1117                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1118                 return key;
1119         }
1120
1121         return key;
1122 }
1123
1124 static void *add_ip_callback(void *parm, void *data)
1125 {
1126         return parm;
1127 }
1128
1129 static int
1130 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1131 {
1132         struct ctdb_all_public_ips *tmp_ips;
1133         struct ctdb_node_map *nodemap=NULL;
1134         trbt_tree_t *ip_tree;
1135         int i, j, len, ret;
1136         uint32_t count;
1137
1138         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1141                 return ret;
1142         }
1143
1144         ip_tree = trbt_create(tmp_ctx, 0);
1145
1146         for(i=0;i<nodemap->num;i++){
1147                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1148                         continue;
1149                 }
1150                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1151                         continue;
1152                 }
1153
1154                 /* read the public ip list from this node */
1155                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1156                 if (ret != 0) {
1157                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1158                         return -1;
1159                 }
1160         
1161                 for (j=0; j<tmp_ips->num;j++) {
1162                         struct ctdb_public_ip *node_ip;
1163
1164                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1165                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1166                         node_ip->addr = tmp_ips->ips[j].addr;
1167
1168                         trbt_insertarray32_callback(ip_tree,
1169                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1170                                 add_ip_callback,
1171                                 node_ip);
1172                 }
1173                 talloc_free(tmp_ips);
1174         }
1175
1176         /* traverse */
1177         count = 0;
1178         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1179
1180         len = offsetof(struct ctdb_all_public_ips, ips) + 
1181                 count*sizeof(struct ctdb_public_ip);
1182         tmp_ips = talloc_zero_size(tmp_ctx, len);
1183         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1184
1185         *ips = tmp_ips;
1186
1187         return 0;
1188 }
1189
1190
1191 /* 
1192  * scans all other nodes and returns a pnn for another node that can host this 
1193  * ip address or -1
1194  */
1195 static int
1196 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1197 {
1198         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1199         struct ctdb_all_public_ips *ips;
1200         struct ctdb_node_map *nodemap=NULL;
1201         int i, j, ret;
1202
1203         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1206                 talloc_free(tmp_ctx);
1207                 return ret;
1208         }
1209
1210         for(i=0;i<nodemap->num;i++){
1211                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1212                         continue;
1213                 }
1214                 if (nodemap->nodes[i].pnn == options.pnn) {
1215                         continue;
1216                 }
1217
1218                 /* read the public ip list from this node */
1219                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1220                 if (ret != 0) {
1221                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1222                         return -1;
1223                 }
1224
1225                 for (j=0;j<ips->num;j++) {
1226                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1227                                 talloc_free(tmp_ctx);
1228                                 return nodemap->nodes[i].pnn;
1229                         }
1230                 }
1231                 talloc_free(ips);
1232         }
1233
1234         talloc_free(tmp_ctx);
1235         return -1;
1236 }
1237
1238 /*
1239   add a public ip address to a node
1240  */
1241 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1242 {
1243         int i, ret;
1244         int len;
1245         uint32_t pnn;
1246         unsigned mask;
1247         ctdb_sock_addr addr;
1248         struct ctdb_control_ip_iface *pub;
1249         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1250         struct ctdb_all_public_ips *ips;
1251
1252
1253         if (argc != 2) {
1254                 talloc_free(tmp_ctx);
1255                 usage();
1256         }
1257
1258         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1259                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1260                 talloc_free(tmp_ctx);
1261                 return -1;
1262         }
1263
1264         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1265         if (ret != 0) {
1266                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1267                 talloc_free(tmp_ctx);
1268                 return ret;
1269         }
1270
1271
1272         /* check if some other node is already serving this ip, if not,
1273          * we will claim it
1274          */
1275         for (i=0;i<ips->num;i++) {
1276                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1277                         break;
1278                 }
1279         }
1280
1281         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1282         pub = talloc_size(tmp_ctx, len); 
1283         CTDB_NO_MEMORY(ctdb, pub);
1284
1285         pub->addr  = addr;
1286         pub->mask  = mask;
1287         pub->len   = strlen(argv[1])+1;
1288         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1289
1290         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1291         if (ret != 0) {
1292                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1293                 talloc_free(tmp_ctx);
1294                 return ret;
1295         }
1296
1297         if (i == ips->num) {
1298                 /* no one has this ip so we claim it */
1299                 pnn  = options.pnn;
1300         } else {
1301                 pnn  = ips->ips[i].pnn;
1302         }
1303
1304         if (move_ip(ctdb, &addr, pnn) != 0) {
1305                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1306                 return -1;
1307         }
1308
1309         talloc_free(tmp_ctx);
1310         return 0;
1311 }
1312
1313 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1314
1315 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1316 {
1317         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1318         struct ctdb_node_map *nodemap=NULL;
1319         struct ctdb_all_public_ips *ips;
1320         int ret, i, j;
1321
1322         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1323         if (ret != 0) {
1324                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1325                 return ret;
1326         }
1327
1328         /* remove it from the nodes that are not hosting the ip currently */
1329         for(i=0;i<nodemap->num;i++){
1330                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1331                         continue;
1332                 }
1333                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1334                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1335                         continue;
1336                 }
1337
1338                 for (j=0;j<ips->num;j++) {
1339                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1340                                 break;
1341                         }
1342                 }
1343                 if (j==ips->num) {
1344                         continue;
1345                 }
1346
1347                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1348                         continue;
1349                 }
1350
1351                 options.pnn = nodemap->nodes[i].pnn;
1352                 control_delip(ctdb, argc, argv);
1353         }
1354
1355
1356         /* remove it from every node (also the one hosting it) */
1357         for(i=0;i<nodemap->num;i++){
1358                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1359                         continue;
1360                 }
1361                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1362                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1363                         continue;
1364                 }
1365
1366                 for (j=0;j<ips->num;j++) {
1367                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1368                                 break;
1369                         }
1370                 }
1371                 if (j==ips->num) {
1372                         continue;
1373                 }
1374
1375                 options.pnn = nodemap->nodes[i].pnn;
1376                 control_delip(ctdb, argc, argv);
1377         }
1378
1379         talloc_free(tmp_ctx);
1380         return 0;
1381 }
1382         
1383 /*
1384   delete a public ip address from a node
1385  */
1386 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1387 {
1388         int i, ret;
1389         ctdb_sock_addr addr;
1390         struct ctdb_control_ip_iface pub;
1391         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1392         struct ctdb_all_public_ips *ips;
1393
1394         if (argc != 1) {
1395                 talloc_free(tmp_ctx);
1396                 usage();
1397         }
1398
1399         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1400                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1401                 return -1;
1402         }
1403
1404         if (options.pnn == CTDB_BROADCAST_ALL) {
1405                 return control_delip_all(ctdb, argc, argv, &addr);
1406         }
1407
1408         pub.addr  = addr;
1409         pub.mask  = 0;
1410         pub.len   = 0;
1411
1412         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1413         if (ret != 0) {
1414                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1415                 talloc_free(tmp_ctx);
1416                 return ret;
1417         }
1418         
1419         for (i=0;i<ips->num;i++) {
1420                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1421                         break;
1422                 }
1423         }
1424
1425         if (i==ips->num) {
1426                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1427                         ctdb_addr_to_str(&addr)));
1428                 talloc_free(tmp_ctx);
1429                 return -1;
1430         }
1431
1432         if (ips->ips[i].pnn == options.pnn) {
1433                 ret = find_other_host_for_public_ip(ctdb, &addr);
1434                 if (ret != -1) {
1435                         if (move_ip(ctdb, &addr, ret) != 0) {
1436                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1437                                 return -1;
1438                         }
1439                 }
1440         }
1441
1442         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1445                 talloc_free(tmp_ctx);
1446                 return ret;
1447         }
1448
1449         talloc_free(tmp_ctx);
1450         return 0;
1451 }
1452
1453 /*
1454   kill a tcp connection
1455  */
1456 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1457 {
1458         int ret;
1459         struct ctdb_control_killtcp killtcp;
1460
1461         if (argc < 2) {
1462                 usage();
1463         }
1464
1465         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1466                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1467                 return -1;
1468         }
1469
1470         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1471                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1472                 return -1;
1473         }
1474
1475         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1476         if (ret != 0) {
1477                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1478                 return ret;
1479         }
1480
1481         return 0;
1482 }
1483
1484
1485 /*
1486   send a gratious arp
1487  */
1488 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1489 {
1490         int ret;
1491         ctdb_sock_addr addr;
1492
1493         if (argc < 2) {
1494                 usage();
1495         }
1496
1497         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1498                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1499                 return -1;
1500         }
1501
1502         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1503         if (ret != 0) {
1504                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1505                 return ret;
1506         }
1507
1508         return 0;
1509 }
1510
1511 /*
1512   register a server id
1513  */
1514 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1515 {
1516         int ret;
1517         struct ctdb_server_id server_id;
1518
1519         if (argc < 3) {
1520                 usage();
1521         }
1522
1523         server_id.pnn       = strtoul(argv[0], NULL, 0);
1524         server_id.type      = strtoul(argv[1], NULL, 0);
1525         server_id.server_id = strtoul(argv[2], NULL, 0);
1526
1527         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1528         if (ret != 0) {
1529                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1530                 return ret;
1531         }
1532         return -1;
1533 }
1534
1535 /*
1536   unregister a server id
1537  */
1538 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1539 {
1540         int ret;
1541         struct ctdb_server_id server_id;
1542
1543         if (argc < 3) {
1544                 usage();
1545         }
1546
1547         server_id.pnn       = strtoul(argv[0], NULL, 0);
1548         server_id.type      = strtoul(argv[1], NULL, 0);
1549         server_id.server_id = strtoul(argv[2], NULL, 0);
1550
1551         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1552         if (ret != 0) {
1553                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1554                 return ret;
1555         }
1556         return -1;
1557 }
1558
1559 /*
1560   check if a server id exists
1561  */
1562 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1563 {
1564         uint32_t status;
1565         int ret;
1566         struct ctdb_server_id server_id;
1567
1568         if (argc < 3) {
1569                 usage();
1570         }
1571
1572         server_id.pnn       = strtoul(argv[0], NULL, 0);
1573         server_id.type      = strtoul(argv[1], NULL, 0);
1574         server_id.server_id = strtoul(argv[2], NULL, 0);
1575
1576         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1577         if (ret != 0) {
1578                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1579                 return ret;
1580         }
1581
1582         if (status) {
1583                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1584         } else {
1585                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1586         }
1587         return 0;
1588 }
1589
1590 /*
1591   get a list of all server ids that are registered on a node
1592  */
1593 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1594 {
1595         int i, ret;
1596         struct ctdb_server_id_list *server_ids;
1597
1598         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1599         if (ret != 0) {
1600                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1601                 return ret;
1602         }
1603
1604         for (i=0; i<server_ids->num; i++) {
1605                 printf("Server id %d:%d:%d\n", 
1606                         server_ids->server_ids[i].pnn, 
1607                         server_ids->server_ids[i].type, 
1608                         server_ids->server_ids[i].server_id); 
1609         }
1610
1611         return -1;
1612 }
1613
1614 /*
1615   send a tcp tickle ack
1616  */
1617 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1618 {
1619         int ret;
1620         ctdb_sock_addr  src, dst;
1621
1622         if (argc < 2) {
1623                 usage();
1624         }
1625
1626         if (!parse_ip_port(argv[0], &src)) {
1627                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1628                 return -1;
1629         }
1630
1631         if (!parse_ip_port(argv[1], &dst)) {
1632                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1633                 return -1;
1634         }
1635
1636         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1637         if (ret==0) {
1638                 return 0;
1639         }
1640         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1641
1642         return -1;
1643 }
1644
1645
1646 /*
1647   display public ip status
1648  */
1649 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1650 {
1651         int i, ret;
1652         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1653         struct ctdb_all_public_ips *ips;
1654
1655         if (options.pnn == CTDB_BROADCAST_ALL) {
1656                 /* read the list of public ips from all nodes */
1657                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1658         } else {
1659                 /* read the public ip list from this node */
1660                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1661         }
1662         if (ret != 0) {
1663                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1664                 talloc_free(tmp_ctx);
1665                 return ret;
1666         }
1667
1668         if (options.machinereadable){
1669                 printf(":Public IP:Node:\n");
1670         } else {
1671                 if (options.pnn == CTDB_BROADCAST_ALL) {
1672                         printf("Public IPs on ALL nodes\n");
1673                 } else {
1674                         printf("Public IPs on node %u\n", options.pnn);
1675                 }
1676         }
1677
1678         for (i=1;i<=ips->num;i++) {
1679                 if (options.machinereadable){
1680                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1681                 } else {
1682                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1683                 }
1684         }
1685
1686         talloc_free(tmp_ctx);
1687         return 0;
1688 }
1689
1690 /*
1691   display pid of a ctdb daemon
1692  */
1693 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1694 {
1695         uint32_t pid;
1696         int ret;
1697
1698         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1699         if (ret != 0) {
1700                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1701                 return ret;
1702         }
1703         printf("Pid:%d\n", pid);
1704
1705         return 0;
1706 }
1707
1708 /*
1709   handler for receiving the response to ipreallocate
1710 */
1711 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1712                              TDB_DATA data, void *private_data)
1713 {
1714         exit(0);
1715 }
1716
1717 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1718 {
1719         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1720
1721         event_add_timed(ctdb->ev, ctdb, 
1722                                 timeval_current_ofs(1, 0),
1723                                 ctdb_every_second, ctdb);
1724 }
1725
1726 /*
1727   ask the recovery daemon on the recovery master to perform a ip reallocation
1728  */
1729 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1730 {
1731         int i, ret;
1732         TDB_DATA data;
1733         struct takeover_run_reply rd;
1734         uint32_t recmaster;
1735         struct ctdb_node_map *nodemap=NULL;
1736         int retries=0;
1737         struct timeval tv = timeval_current();
1738
1739         /* we need some events to trigger so we can timeout and restart
1740            the loop
1741         */
1742         event_add_timed(ctdb->ev, ctdb, 
1743                                 timeval_current_ofs(1, 0),
1744                                 ctdb_every_second, ctdb);
1745
1746         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1747         if (rd.pnn == -1) {
1748                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1749                 return -1;
1750         }
1751         rd.srvid = getpid();
1752
1753         /* register a message port for receiveing the reply so that we
1754            can receive the reply
1755         */
1756         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1757
1758         data.dptr = (uint8_t *)&rd;
1759         data.dsize = sizeof(rd);
1760
1761 again:
1762         /* check that there are valid nodes available */
1763         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1764                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1765                 return -1;
1766         }
1767         for (i=0; i<nodemap->num;i++) {
1768                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1769                         break;
1770                 }
1771         }
1772         if (i==nodemap->num) {
1773                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1774                 return 0;
1775         }
1776
1777
1778         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1779         if (ret != 0) {
1780                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1781                 return ret;
1782         }
1783
1784         /* verify the node exists */
1785         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1786                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1787                 return -1;
1788         }
1789
1790
1791         /* check tha there are nodes available that can act as a recmaster */
1792         for (i=0; i<nodemap->num; i++) {
1793                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1794                         continue;
1795                 }
1796                 break;
1797         }
1798         if (i == nodemap->num) {
1799                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1800                 return 0;
1801         }
1802
1803         /* verify the recovery master is not STOPPED, nor BANNED */
1804         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1805                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1806                 retries++;
1807                 sleep(1);
1808                 goto again;
1809         } 
1810
1811         
1812         /* verify the recovery master is not STOPPED, nor BANNED */
1813         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1814                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1815                 retries++;
1816                 sleep(1);
1817                 goto again;
1818         } 
1819
1820         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1821         if (ret != 0) {
1822                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1823                 return -1;
1824         }
1825
1826         tv = timeval_current();
1827         /* this loop will terminate when we have received the reply */
1828         while (timeval_elapsed(&tv) < 3.0) {
1829                 event_loop_once(ctdb->ev);
1830         }
1831
1832         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1833         retries++;
1834         sleep(1);
1835         goto again;
1836
1837         return 0;
1838 }
1839
1840
1841 /*
1842   disable a remote node
1843  */
1844 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1845 {
1846         int ret;
1847         struct ctdb_node_map *nodemap=NULL;
1848
1849         do {
1850                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1851                 if (ret != 0) {
1852                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1853                         return ret;
1854                 }
1855
1856                 sleep(1);
1857
1858                 /* read the nodemap and verify the change took effect */
1859                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1860                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1861                         exit(10);
1862                 }
1863
1864         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1865         ret = control_ipreallocate(ctdb, argc, argv);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1868                 return ret;
1869         }
1870
1871         return 0;
1872 }
1873
1874 /*
1875   enable a disabled remote node
1876  */
1877 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1878 {
1879         int ret;
1880
1881         struct ctdb_node_map *nodemap=NULL;
1882
1883         do {
1884                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1885                 if (ret != 0) {
1886                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1887                         return ret;
1888                 }
1889
1890                 sleep(1);
1891
1892                 /* read the nodemap and verify the change took effect */
1893                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1894                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1895                         exit(10);
1896                 }
1897
1898         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1899         ret = control_ipreallocate(ctdb, argc, argv);
1900         if (ret != 0) {
1901                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1902                 return ret;
1903         }
1904
1905         return 0;
1906 }
1907
1908 /*
1909   stop a remote node
1910  */
1911 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1912 {
1913         int ret;
1914         struct ctdb_node_map *nodemap=NULL;
1915
1916         do {
1917                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1918                 if (ret != 0) {
1919                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1920                 }
1921         
1922                 sleep(1);
1923
1924                 /* read the nodemap and verify the change took effect */
1925                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1926                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1927                         exit(10);
1928                 }
1929
1930         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1931         ret = control_ipreallocate(ctdb, argc, argv);
1932         if (ret != 0) {
1933                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1934                 return ret;
1935         }
1936
1937         return 0;
1938 }
1939
1940 /*
1941   restart a stopped remote node
1942  */
1943 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1944 {
1945         int ret;
1946
1947         struct ctdb_node_map *nodemap=NULL;
1948
1949         do {
1950                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1951                 if (ret != 0) {
1952                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1953                         return ret;
1954                 }
1955         
1956                 sleep(1);
1957
1958                 /* read the nodemap and verify the change took effect */
1959                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1960                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1961                         exit(10);
1962                 }
1963
1964         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1965         ret = control_ipreallocate(ctdb, argc, argv);
1966         if (ret != 0) {
1967                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1968                 return ret;
1969         }
1970
1971         return 0;
1972 }
1973
1974 static uint32_t get_generation(struct ctdb_context *ctdb)
1975 {
1976         struct ctdb_vnn_map *vnnmap=NULL;
1977         int ret;
1978
1979         /* wait until the recmaster is not in recovery mode */
1980         while (1) {
1981                 uint32_t recmode, recmaster;
1982                 
1983                 if (vnnmap != NULL) {
1984                         talloc_free(vnnmap);
1985                         vnnmap = NULL;
1986                 }
1987
1988                 /* get the recmaster */
1989                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1990                 if (ret != 0) {
1991                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1992                         exit(10);
1993                 }
1994
1995                 /* get recovery mode */
1996                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1997                 if (ret != 0) {
1998                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1999                         exit(10);
2000                 }
2001
2002                 /* get the current generation number */
2003                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2004                 if (ret != 0) {
2005                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2006                         exit(10);
2007                 }
2008
2009                 if ((recmode == CTDB_RECOVERY_NORMAL)
2010                 &&  (vnnmap->generation != 1)){
2011                         return vnnmap->generation;
2012                 }
2013                 sleep(1);
2014         }
2015 }
2016
2017 /*
2018   ban a node from the cluster
2019  */
2020 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2021 {
2022         int ret;
2023         struct ctdb_node_map *nodemap=NULL;
2024         struct ctdb_ban_time bantime;
2025
2026         if (argc < 1) {
2027                 usage();
2028         }
2029         
2030         /* verify the node exists */
2031         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2032         if (ret != 0) {
2033                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2034                 return ret;
2035         }
2036
2037         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2038                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2039                 return -1;
2040         }
2041
2042         bantime.pnn  = options.pnn;
2043         bantime.time = strtoul(argv[0], NULL, 0);
2044
2045         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2048                 return -1;
2049         }       
2050
2051         ret = control_ipreallocate(ctdb, argc, argv);
2052         if (ret != 0) {
2053                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2054                 return ret;
2055         }
2056
2057         return 0;
2058 }
2059
2060
2061 /*
2062   unban a node from the cluster
2063  */
2064 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2065 {
2066         int ret;
2067         struct ctdb_node_map *nodemap=NULL;
2068         struct ctdb_ban_time bantime;
2069
2070         /* verify the node exists */
2071         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2072         if (ret != 0) {
2073                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2074                 return ret;
2075         }
2076
2077         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2078                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2079                 return -1;
2080         }
2081
2082         bantime.pnn  = options.pnn;
2083         bantime.time = 0;
2084
2085         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2086         if (ret != 0) {
2087                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2088                 return -1;
2089         }       
2090
2091         ret = control_ipreallocate(ctdb, argc, argv);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2094                 return ret;
2095         }
2096
2097         return 0;
2098 }
2099
2100
2101 /*
2102   show ban information for a node
2103  */
2104 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2105 {
2106         int ret;
2107         struct ctdb_node_map *nodemap=NULL;
2108         struct ctdb_ban_time *bantime;
2109
2110         /* verify the node exists */
2111         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2112         if (ret != 0) {
2113                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2114                 return ret;
2115         }
2116
2117         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2118         if (ret != 0) {
2119                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2120                 return -1;
2121         }       
2122
2123         if (bantime->time == 0) {
2124                 printf("Node %u is not banned\n", bantime->pnn);
2125         } else {
2126                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2127         }
2128
2129         return 0;
2130 }
2131
2132 /*
2133   shutdown a daemon
2134  */
2135 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2136 {
2137         int ret;
2138
2139         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2140         if (ret != 0) {
2141                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2142                 return ret;
2143         }
2144
2145         return 0;
2146 }
2147
2148 /*
2149   trigger a recovery
2150  */
2151 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2152 {
2153         int ret;
2154         uint32_t generation, next_generation;
2155
2156         /* record the current generation number */
2157         generation = get_generation(ctdb);
2158
2159         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2160         if (ret != 0) {
2161                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2162                 return ret;
2163         }
2164
2165         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2166         if (ret != 0) {
2167                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2168                 return ret;
2169         }
2170
2171         /* wait until we are in a new generation */
2172         while (1) {
2173                 next_generation = get_generation(ctdb);
2174                 if (next_generation != generation) {
2175                         return 0;
2176                 }
2177                 sleep(1);
2178         }
2179
2180         return 0;
2181 }
2182
2183
2184 /*
2185   display monitoring mode of a remote node
2186  */
2187 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2188 {
2189         uint32_t monmode;
2190         int ret;
2191
2192         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2193         if (ret != 0) {
2194                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2195                 return ret;
2196         }
2197         if (!options.machinereadable){
2198                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2199         } else {
2200                 printf(":mode:\n");
2201                 printf(":%d:\n",monmode);
2202         }
2203         return 0;
2204 }
2205
2206
2207 /*
2208   display capabilities of a remote node
2209  */
2210 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2211 {
2212         uint32_t capabilities;
2213         int ret;
2214
2215         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2216         if (ret != 0) {
2217                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2218                 return ret;
2219         }
2220         
2221         if (!options.machinereadable){
2222                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2223                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2224                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2225                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2226         } else {
2227                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2228                 printf(":%d:%d:%d:%d:\n",
2229                         !!(capabilities&CTDB_CAP_RECMASTER),
2230                         !!(capabilities&CTDB_CAP_LMASTER),
2231                         !!(capabilities&CTDB_CAP_LVS),
2232                         !!(capabilities&CTDB_CAP_NATGW));
2233         }
2234         return 0;
2235 }
2236
2237 /*
2238   display lvs configuration
2239  */
2240 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2241 {
2242         uint32_t *capabilities;
2243         struct ctdb_node_map *nodemap=NULL;
2244         int i, ret;
2245         int healthy_count = 0;
2246
2247         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2248         if (ret != 0) {
2249                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2250                 return ret;
2251         }
2252
2253         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2254         CTDB_NO_MEMORY(ctdb, capabilities);
2255         
2256         /* collect capabilities for all connected nodes */
2257         for (i=0; i<nodemap->num; i++) {
2258                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2259                         continue;
2260                 }
2261                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2262                         continue;
2263                 }
2264         
2265                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2266                 if (ret != 0) {
2267                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2268                         return ret;
2269                 }
2270
2271                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2272                         continue;
2273                 }
2274
2275                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2276                         healthy_count++;
2277                 }
2278         }
2279
2280         /* Print all LVS nodes */
2281         for (i=0; i<nodemap->num; i++) {
2282                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2283                         continue;
2284                 }
2285                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2286                         continue;
2287                 }
2288                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2289                         continue;
2290                 }
2291
2292                 if (healthy_count != 0) {
2293                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2294                                 continue;
2295                         }
2296                 }
2297
2298                 printf("%d:%s\n", i, 
2299                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2300         }
2301
2302         return 0;
2303 }
2304
2305 /*
2306   display who is the lvs master
2307  */
2308 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2309 {
2310         uint32_t *capabilities;
2311         struct ctdb_node_map *nodemap=NULL;
2312         int i, ret;
2313         int healthy_count = 0;
2314
2315         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2316         if (ret != 0) {
2317                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2318                 return ret;
2319         }
2320
2321         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2322         CTDB_NO_MEMORY(ctdb, capabilities);
2323         
2324         /* collect capabilities for all connected nodes */
2325         for (i=0; i<nodemap->num; i++) {
2326                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2327                         continue;
2328                 }
2329                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2330                         continue;
2331                 }
2332         
2333                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2334                 if (ret != 0) {
2335                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2336                         return ret;
2337                 }
2338
2339                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2340                         continue;
2341                 }
2342
2343                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2344                         healthy_count++;
2345                 }
2346         }
2347
2348         /* find and show the lvsmaster */
2349         for (i=0; i<nodemap->num; i++) {
2350                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2351                         continue;
2352                 }
2353                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2354                         continue;
2355                 }
2356                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2357                         continue;
2358                 }
2359
2360                 if (healthy_count != 0) {
2361                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2362                                 continue;
2363                         }
2364                 }
2365
2366                 if (options.machinereadable){
2367                         printf("%d\n", i);
2368                 } else {
2369                         printf("Node %d is LVS master\n", i);
2370                 }
2371                 return 0;
2372         }
2373
2374         printf("There is no LVS master\n");
2375         return -1;
2376 }
2377
2378 /*
2379   disable monitoring on a  node
2380  */
2381 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2382 {
2383         
2384         int ret;
2385
2386         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2387         if (ret != 0) {
2388                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2389                 return ret;
2390         }
2391         printf("Monitoring mode:%s\n","DISABLED");
2392
2393         return 0;
2394 }
2395
2396 /*
2397   enable monitoring on a  node
2398  */
2399 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2400 {
2401         
2402         int ret;
2403
2404         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2405         if (ret != 0) {
2406                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2407                 return ret;
2408         }
2409         printf("Monitoring mode:%s\n","ACTIVE");
2410
2411         return 0;
2412 }
2413
2414 /*
2415   display remote list of keys/data for a db
2416  */
2417 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2418 {
2419         const char *db_name;
2420         struct ctdb_db_context *ctdb_db;
2421         int ret;
2422
2423         if (argc < 1) {
2424                 usage();
2425         }
2426
2427         db_name = argv[0];
2428
2429
2430         if (db_exists(ctdb, db_name)) {
2431                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2432                 return -1;
2433         }
2434
2435         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2436
2437         if (ctdb_db == NULL) {
2438                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2439                 return -1;
2440         }
2441
2442         /* traverse and dump the cluster tdb */
2443         ret = ctdb_dump_db(ctdb_db, stdout);
2444         if (ret == -1) {
2445                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2446                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2447                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2448                                   db_name));
2449                 return -1;
2450         }
2451         talloc_free(ctdb_db);
2452
2453         printf("Dumped %d records\n", ret);
2454         return 0;
2455 }
2456
2457
2458 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2459                              TDB_DATA data, void *private_data)
2460 {
2461         DEBUG(DEBUG_ERR,("Log data received\n"));
2462         if (data.dsize > 0) {
2463                 printf("%s", data.dptr);
2464         }
2465
2466         exit(0);
2467 }
2468
2469 /*
2470   display a list of log messages from the in memory ringbuffer
2471  */
2472 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2473 {
2474         int ret;
2475         int32_t res;
2476         struct ctdb_get_log_addr log_addr;
2477         TDB_DATA data;
2478         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2479         char *errmsg;
2480         struct timeval tv;
2481
2482         if (argc != 1) {
2483                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2484                 talloc_free(tmp_ctx);
2485                 return -1;
2486         }
2487
2488         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2489         log_addr.srvid = getpid();
2490         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2491                 log_addr.level = get_debug_by_desc(argv[0]);
2492         } else {
2493                 log_addr.level = strtol(argv[0], NULL, 0);
2494         }
2495
2496
2497         data.dptr = (unsigned char *)&log_addr;
2498         data.dsize = sizeof(log_addr);
2499
2500         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2501
2502         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2503         sleep(1);
2504
2505         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2506
2507         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2508                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2509         if (ret != 0 || res != 0) {
2510                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2511                 talloc_free(tmp_ctx);
2512                 return -1;
2513         }
2514
2515
2516         tv = timeval_current();
2517         /* this loop will terminate when we have received the reply */
2518         while (timeval_elapsed(&tv) < 3.0) {    
2519                 event_loop_once(ctdb->ev);
2520         }
2521
2522         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2523
2524         talloc_free(tmp_ctx);
2525         return 0;
2526 }
2527
2528 /*
2529   clear the in memory log area
2530  */
2531 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2532 {
2533         int ret;
2534         int32_t res;
2535         char *errmsg;
2536         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2537
2538         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2539                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2540         if (ret != 0 || res != 0) {
2541                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2542                 talloc_free(tmp_ctx);
2543                 return -1;
2544         }
2545
2546         talloc_free(tmp_ctx);
2547         return 0;
2548 }
2549
2550
2551
2552 /*
2553   display a list of the databases on a remote ctdb
2554  */
2555 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2556 {
2557         int i, ret;
2558         struct ctdb_dbid_map *dbmap=NULL;
2559
2560         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2561         if (ret != 0) {
2562                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2563                 return ret;
2564         }
2565
2566         if(options.machinereadable){
2567                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2568                 for(i=0;i<dbmap->num;i++){
2569                         const char *path;
2570                         const char *name;
2571                         const char *health;
2572                         bool persistent;
2573
2574                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2575                                             dbmap->dbs[i].dbid, ctdb, &path);
2576                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2577                                             dbmap->dbs[i].dbid, ctdb, &name);
2578                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2579                                               dbmap->dbs[i].dbid, ctdb, &health);
2580                         persistent = dbmap->dbs[i].persistent;
2581                         printf(":0x%08X:%s:%s:%d:%d:\n",
2582                                dbmap->dbs[i].dbid, name, path,
2583                                !!(persistent), !!(health));
2584                 }
2585                 return 0;
2586         }
2587
2588         printf("Number of databases:%d\n", dbmap->num);
2589         for(i=0;i<dbmap->num;i++){
2590                 const char *path;
2591                 const char *name;
2592                 const char *health;
2593                 bool persistent;
2594
2595                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2596                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2597                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2598                 persistent = dbmap->dbs[i].persistent;
2599                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2600                        dbmap->dbs[i].dbid, name, path,
2601                        persistent?" PERSISTENT":"",
2602                        health?" UNHEALTHY":"");
2603         }
2604
2605         return 0;
2606 }
2607
2608 /*
2609   display the status of a database on a remote ctdb
2610  */
2611 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2612 {
2613         int i, ret;
2614         struct ctdb_dbid_map *dbmap=NULL;
2615         const char *db_name;
2616
2617         if (argc < 1) {
2618                 usage();
2619         }
2620
2621         db_name = argv[0];
2622
2623         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2624         if (ret != 0) {
2625                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2626                 return ret;
2627         }
2628
2629         for(i=0;i<dbmap->num;i++){
2630                 const char *path;
2631                 const char *name;
2632                 const char *health;
2633                 bool persistent;
2634
2635                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2636                 if (strcmp(name, db_name) != 0) {
2637                         continue;
2638                 }
2639
2640                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2641                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2642                 persistent = dbmap->dbs[i].persistent;
2643                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2644                        dbmap->dbs[i].dbid, name, path,
2645                        persistent?"yes":"no",
2646                        health?health:"OK");
2647                 return 0;
2648         }
2649
2650         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2651         return 0;
2652 }
2653
2654 /*
2655   check if the local node is recmaster or not
2656   it will return 1 if this node is the recmaster and 0 if it is not
2657   or if the local ctdb daemon could not be contacted
2658  */
2659 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2660 {
2661         uint32_t mypnn, recmaster;
2662         int ret;
2663
2664         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2665         if (mypnn == -1) {
2666                 printf("Failed to get pnn of node\n");
2667                 return 1;
2668         }
2669
2670         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2671         if (ret != 0) {
2672                 printf("Failed to get the recmaster\n");
2673                 return 1;
2674         }
2675
2676         if (recmaster != mypnn) {
2677                 printf("this node is not the recmaster\n");
2678                 return 1;
2679         }
2680
2681         printf("this node is the recmaster\n");
2682         return 0;
2683 }
2684
2685 /*
2686   ping a node
2687  */
2688 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2689 {
2690         int ret;
2691         struct timeval tv = timeval_current();
2692         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2693         if (ret == -1) {
2694                 printf("Unable to get ping response from node %u\n", options.pnn);
2695                 return -1;
2696         } else {
2697                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2698                        options.pnn, timeval_elapsed(&tv), ret);
2699         }
2700         return 0;
2701 }
2702
2703
2704 /*
2705   get a tunable
2706  */
2707 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2708 {
2709         const char *name;
2710         uint32_t value;
2711         int ret;
2712
2713         if (argc < 1) {
2714                 usage();
2715         }
2716
2717         name = argv[0];
2718         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2719         if (ret == -1) {
2720                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2721                 return -1;
2722         }
2723
2724         printf("%-19s = %u\n", name, value);
2725         return 0;
2726 }
2727
2728 /*
2729   set a tunable
2730  */
2731 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2732 {
2733         const char *name;
2734         uint32_t value;
2735         int ret;
2736
2737         if (argc < 2) {
2738                 usage();
2739         }
2740
2741         name = argv[0];
2742         value = strtoul(argv[1], NULL, 0);
2743
2744         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2745         if (ret == -1) {
2746                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2747                 return -1;
2748         }
2749         return 0;
2750 }
2751
2752 /*
2753   list all tunables
2754  */
2755 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2756 {
2757         uint32_t count;
2758         const char **list;
2759         int ret, i;
2760
2761         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2762         if (ret == -1) {
2763                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2764                 return -1;
2765         }
2766
2767         for (i=0;i<count;i++) {
2768                 control_getvar(ctdb, 1, &list[i]);
2769         }
2770
2771         talloc_free(list);
2772         
2773         return 0;
2774 }
2775
2776 /*
2777   display debug level on a node
2778  */
2779 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2780 {
2781         int ret;
2782         int32_t level;
2783
2784         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2785         if (ret != 0) {
2786                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2787                 return ret;
2788         } else {
2789                 if (options.machinereadable){
2790                         printf(":Name:Level:\n");
2791                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2792                 } else {
2793                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2794                 }
2795         }
2796         return 0;
2797 }
2798
2799 /*
2800   display reclock file of a node
2801  */
2802 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2803 {
2804         int ret;
2805         const char *reclock;
2806
2807         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2808         if (ret != 0) {
2809                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2810                 return ret;
2811         } else {
2812                 if (options.machinereadable){
2813                         if (reclock != NULL) {
2814                                 printf("%s", reclock);
2815                         }
2816                 } else {
2817                         if (reclock == NULL) {
2818                                 printf("No reclock file used.\n");
2819                         } else {
2820                                 printf("Reclock file:%s\n", reclock);
2821                         }
2822                 }
2823         }
2824         return 0;
2825 }
2826
2827 /*
2828   set the reclock file of a node
2829  */
2830 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2831 {
2832         int ret;
2833         const char *reclock;
2834
2835         if (argc == 0) {
2836                 reclock = NULL;
2837         } else if (argc == 1) {
2838                 reclock = argv[0];
2839         } else {
2840                 usage();
2841         }
2842
2843         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2844         if (ret != 0) {
2845                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2846                 return ret;
2847         }
2848         return 0;
2849 }
2850
2851 /*
2852   set the natgw state on/off
2853  */
2854 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2855 {
2856         int ret;
2857         uint32_t natgwstate;
2858
2859         if (argc == 0) {
2860                 usage();
2861         }
2862
2863         if (!strcmp(argv[0], "on")) {
2864                 natgwstate = 1;
2865         } else if (!strcmp(argv[0], "off")) {
2866                 natgwstate = 0;
2867         } else {
2868                 usage();
2869         }
2870
2871         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2872         if (ret != 0) {
2873                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2874                 return ret;
2875         }
2876
2877         return 0;
2878 }
2879
2880 /*
2881   set the lmaster role on/off
2882  */
2883 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2884 {
2885         int ret;
2886         uint32_t lmasterrole;
2887
2888         if (argc == 0) {
2889                 usage();
2890         }
2891
2892         if (!strcmp(argv[0], "on")) {
2893                 lmasterrole = 1;
2894         } else if (!strcmp(argv[0], "off")) {
2895                 lmasterrole = 0;
2896         } else {
2897                 usage();
2898         }
2899
2900         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2901         if (ret != 0) {
2902                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2903                 return ret;
2904         }
2905
2906         return 0;
2907 }
2908
2909 /*
2910   set the recmaster role on/off
2911  */
2912 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2913 {
2914         int ret;
2915         uint32_t recmasterrole;
2916
2917         if (argc == 0) {
2918                 usage();
2919         }
2920
2921         if (!strcmp(argv[0], "on")) {
2922                 recmasterrole = 1;
2923         } else if (!strcmp(argv[0], "off")) {
2924                 recmasterrole = 0;
2925         } else {
2926                 usage();
2927         }
2928
2929         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2930         if (ret != 0) {
2931                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2932                 return ret;
2933         }
2934
2935         return 0;
2936 }
2937
2938 /*
2939   set debug level on a node or all nodes
2940  */
2941 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2942 {
2943         int i, ret;
2944         int32_t level;
2945
2946         if (argc == 0) {
2947                 printf("You must specify the debug level. Valid levels are:\n");
2948                 for (i=0; debug_levels[i].description != NULL; i++) {
2949                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2950                 }
2951
2952                 return 0;
2953         }
2954
2955         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2956                 level = get_debug_by_desc(argv[0]);
2957         } else {
2958                 level = strtol(argv[0], NULL, 0);
2959         }
2960
2961         for (i=0; debug_levels[i].description != NULL; i++) {
2962                 if (level == debug_levels[i].level) {
2963                         break;
2964                 }
2965         }
2966         if (debug_levels[i].description == NULL) {
2967                 printf("Invalid debug level, must be one of\n");
2968                 for (i=0; debug_levels[i].description != NULL; i++) {
2969                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2970                 }
2971                 return -1;
2972         }
2973
2974         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2977         }
2978         return 0;
2979 }
2980
2981
2982 /*
2983   freeze a node
2984  */
2985 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2986 {
2987         int ret;
2988         uint32_t priority;
2989         
2990         if (argc == 1) {
2991                 priority = strtol(argv[0], NULL, 0);
2992         } else {
2993                 priority = 0;
2994         }
2995         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2996
2997         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2998         if (ret != 0) {
2999                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3000         }               
3001         return 0;
3002 }
3003
3004 /*
3005   thaw a node
3006  */
3007 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3008 {
3009         int ret;
3010         uint32_t priority;
3011         
3012         if (argc == 1) {
3013                 priority = strtol(argv[0], NULL, 0);
3014         } else {
3015                 priority = 0;
3016         }
3017         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3018
3019         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3020         if (ret != 0) {
3021                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3022         }               
3023         return 0;
3024 }
3025
3026
3027 /*
3028   attach to a database
3029  */
3030 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3031 {
3032         const char *db_name;
3033         struct ctdb_db_context *ctdb_db;
3034
3035         if (argc < 1) {
3036                 usage();
3037         }
3038         db_name = argv[0];
3039
3040         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3041         if (ctdb_db == NULL) {
3042                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3043                 return -1;
3044         }
3045
3046         return 0;
3047 }
3048
3049 /*
3050   set db priority
3051  */
3052 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3053 {
3054         struct ctdb_db_priority db_prio;
3055         int ret;
3056
3057         if (argc < 2) {
3058                 usage();
3059         }
3060
3061         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3062         db_prio.priority = strtoul(argv[1], NULL, 0);
3063
3064         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3065         if (ret != 0) {
3066                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3067                 return -1;
3068         }
3069
3070         return 0;
3071 }
3072
3073 /*
3074   get db priority
3075  */
3076 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3077 {
3078         uint32_t db_id, priority;
3079         int ret;
3080
3081         if (argc < 1) {
3082                 usage();
3083         }
3084
3085         db_id = strtoul(argv[0], NULL, 0);
3086
3087         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3088         if (ret != 0) {
3089                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3090                 return -1;
3091         }
3092
3093         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3094
3095         return 0;
3096 }
3097
3098 /*
3099   run an eventscript on a node
3100  */
3101 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3102 {
3103         TDB_DATA data;
3104         int ret;
3105         int32_t res;
3106         char *errmsg;
3107         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3108
3109         if (argc != 1) {
3110                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3111                 return -1;
3112         }
3113
3114         data.dptr = (unsigned char *)discard_const(argv[0]);
3115         data.dsize = strlen((char *)data.dptr) + 1;
3116
3117         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3118
3119         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3120                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3121         if (ret != 0 || res != 0) {
3122                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3123                 talloc_free(tmp_ctx);
3124                 return -1;
3125         }
3126         talloc_free(tmp_ctx);
3127         return 0;
3128 }
3129
3130 #define DB_VERSION 1
3131 #define MAX_DB_NAME 64
3132 struct db_file_header {
3133         unsigned long version;
3134         time_t timestamp;
3135         unsigned long persistent;
3136         unsigned long size;
3137         const char name[MAX_DB_NAME];
3138 };
3139
3140 struct backup_data {
3141         struct ctdb_marshall_buffer *records;
3142         uint32_t len;
3143         uint32_t total;
3144         bool traverse_error;
3145 };
3146
3147 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3148 {
3149         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3150         struct ctdb_rec_data *rec;
3151
3152         /* add the record */
3153         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3154         if (rec == NULL) {
3155                 bd->traverse_error = true;
3156                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3157                 return -1;
3158         }
3159         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3160         if (bd->records == NULL) {
3161                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3162                 bd->traverse_error = true;
3163                 return -1;
3164         }
3165         bd->records->count++;
3166         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3167         bd->len += rec->length;
3168         talloc_free(rec);
3169
3170         bd->total++;
3171         return 0;
3172 }
3173
3174 /*
3175  * backup a database to a file 
3176  */
3177 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3178 {
3179         int i, ret;
3180         struct ctdb_dbid_map *dbmap=NULL;
3181         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3182         struct db_file_header dbhdr;
3183         struct ctdb_db_context *ctdb_db;
3184         struct backup_data *bd;
3185         int fh = -1;
3186         int status = -1;
3187         const char *reason = NULL;
3188
3189         if (argc != 2) {
3190                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3191                 return -1;
3192         }
3193
3194         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3195         if (ret != 0) {
3196                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3197                 return ret;
3198         }
3199
3200         for(i=0;i<dbmap->num;i++){
3201                 const char *name;
3202
3203                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3204                 if(!strcmp(argv[0], name)){
3205                         talloc_free(discard_const(name));
3206                         break;
3207                 }
3208                 talloc_free(discard_const(name));
3209         }
3210         if (i == dbmap->num) {
3211                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3212                 talloc_free(tmp_ctx);
3213                 return -1;
3214         }
3215
3216         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3217                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3218         if (ret != 0) {
3219                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3220                                  argv[0]));
3221                 talloc_free(tmp_ctx);
3222                 return -1;
3223         }
3224         if (reason) {
3225                 uint32_t allow_unhealthy = 0;
3226
3227                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3228                                       "AllowUnhealthyDBRead",
3229                                       &allow_unhealthy);
3230
3231                 if (allow_unhealthy != 1) {
3232                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3233                                          argv[0], reason));
3234
3235                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3236                                          allow_unhealthy));
3237                         talloc_free(tmp_ctx);
3238                         return -1;
3239                 }
3240
3241                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3242                                      argv[0], argv[0]));
3243                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3244                                      "tunnable AllowUnhealthyDBRead = %u\n",
3245                                      allow_unhealthy));
3246         }
3247
3248         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3249         if (ctdb_db == NULL) {
3250                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3251                 talloc_free(tmp_ctx);
3252                 return -1;
3253         }
3254
3255
3256         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3257         if (ret == -1) {
3258                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3259                 talloc_free(tmp_ctx);
3260                 return -1;
3261         }
3262
3263
3264         bd = talloc_zero(tmp_ctx, struct backup_data);
3265         if (bd == NULL) {
3266                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3267                 talloc_free(tmp_ctx);
3268                 return -1;
3269         }
3270
3271         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3272         if (bd->records == NULL) {
3273                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3274                 talloc_free(tmp_ctx);
3275                 return -1;
3276         }
3277
3278         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3279         bd->records->db_id = ctdb_db->db_id;
3280         /* traverse the database collecting all records */
3281         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3282             bd->traverse_error) {
3283                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3284                 talloc_free(tmp_ctx);
3285                 return -1;              
3286         }
3287
3288         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3289
3290
3291         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3292         if (fh == -1) {
3293                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3294                 talloc_free(tmp_ctx);
3295                 return -1;
3296         }
3297
3298         dbhdr.version = DB_VERSION;
3299         dbhdr.timestamp = time(NULL);
3300         dbhdr.persistent = dbmap->dbs[i].persistent;
3301         dbhdr.size = bd->len;
3302         if (strlen(argv[0]) >= MAX_DB_NAME) {
3303                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3304                 goto done;
3305         }
3306         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3307         ret = write(fh, &dbhdr, sizeof(dbhdr));
3308         if (ret == -1) {
3309                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3310                 goto done;
3311         }
3312         ret = write(fh, bd->records, bd->len);
3313         if (ret == -1) {
3314                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3315                 goto done;
3316         }
3317
3318         status = 0;
3319 done:
3320         if (fh != -1) {
3321                 ret = close(fh);
3322                 if (ret == -1) {
3323                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3324                 }
3325         }
3326         talloc_free(tmp_ctx);
3327         return status;
3328 }
3329
3330 /*
3331  * restore a database from a file 
3332  */
3333 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3334 {
3335         int ret;
3336         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3337         TDB_DATA outdata;
3338         TDB_DATA data;
3339         struct db_file_header dbhdr;
3340         struct ctdb_db_context *ctdb_db;
3341         struct ctdb_node_map *nodemap=NULL;
3342         struct ctdb_vnn_map *vnnmap=NULL;
3343         int i, fh;
3344         struct ctdb_control_wipe_database w;
3345         uint32_t *nodes;
3346         uint32_t generation;
3347         struct tm *tm;
3348         char tbuf[100];
3349         char *dbname;
3350
3351         if (argc < 1 || argc > 2) {
3352                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3353                 return -1;
3354         }
3355
3356         fh = open(argv[0], O_RDONLY);
3357         if (fh == -1) {
3358                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3359                 talloc_free(tmp_ctx);
3360                 return -1;
3361         }
3362
3363         read(fh, &dbhdr, sizeof(dbhdr));
3364         if (dbhdr.version != DB_VERSION) {
3365                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3366                 talloc_free(tmp_ctx);
3367                 return -1;
3368         }
3369
3370         dbname = dbhdr.name;
3371         if (argc == 2) {
3372                 dbname = argv[1];
3373         }
3374
3375         outdata.dsize = dbhdr.size;
3376         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3377         if (outdata.dptr == NULL) {
3378                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3379                 close(fh);
3380                 talloc_free(tmp_ctx);
3381                 return -1;
3382         }               
3383         read(fh, outdata.dptr, outdata.dsize);
3384         close(fh);
3385
3386         tm = localtime(&dbhdr.timestamp);
3387         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3388         printf("Restoring database '%s' from backup @ %s\n",
3389                 dbname, tbuf);
3390
3391
3392         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3393         if (ctdb_db == NULL) {
3394                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3395                 talloc_free(tmp_ctx);
3396                 return -1;
3397         }
3398
3399         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3400         if (ret != 0) {
3401                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3402                 talloc_free(tmp_ctx);
3403                 return ret;
3404         }
3405
3406
3407         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3408         if (ret != 0) {
3409                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3410                 talloc_free(tmp_ctx);
3411                 return ret;
3412         }
3413
3414         /* freeze all nodes */
3415         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3416         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3417                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3418                                         nodes, i,
3419                                         TIMELIMIT(),
3420                                         false, tdb_null,
3421                                         NULL, NULL,
3422                                         NULL) != 0) {
3423                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3424                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3425                         talloc_free(tmp_ctx);
3426                         return -1;
3427                 }
3428         }
3429
3430         generation = vnnmap->generation;
3431         data.dptr = (void *)&generation;
3432         data.dsize = sizeof(generation);
3433
3434         /* start a cluster wide transaction */
3435         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3436         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3437                                         nodes, 0,
3438                                         TIMELIMIT(), false, data,
3439                                         NULL, NULL,
3440                                         NULL) != 0) {
3441                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3442                 return -1;
3443         }
3444
3445
3446         w.db_id = ctdb_db->db_id;
3447         w.transaction_id = generation;
3448
3449         data.dptr = (void *)&w;
3450         data.dsize = sizeof(w);
3451
3452         /* wipe all the remote databases. */
3453         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3454         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3455                                         nodes, 0,
3456                                         TIMELIMIT(), false, data,
3457                                         NULL, NULL,
3458                                         NULL) != 0) {
3459                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3460                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3461                 talloc_free(tmp_ctx);
3462                 return -1;
3463         }
3464         
3465         /* push the database */
3466         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3467         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3468                                         nodes, 0,
3469                                         TIMELIMIT(), false, outdata,
3470                                         NULL, NULL,
3471                                         NULL) != 0) {
3472                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3473                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3474                 talloc_free(tmp_ctx);
3475                 return -1;
3476         }
3477
3478         data.dptr = (void *)&ctdb_db->db_id;
3479         data.dsize = sizeof(ctdb_db->db_id);
3480
3481         /* mark the database as healthy */
3482         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3483         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3484                                         nodes, 0,
3485                                         TIMELIMIT(), false, data,
3486                                         NULL, NULL,
3487                                         NULL) != 0) {
3488                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3489                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3490                 talloc_free(tmp_ctx);
3491                 return -1;
3492         }
3493
3494         data.dptr = (void *)&generation;
3495         data.dsize = sizeof(generation);
3496
3497         /* commit all the changes */
3498         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3499                                         nodes, 0,
3500                                         TIMELIMIT(), false, data,
3501                                         NULL, NULL,
3502                                         NULL) != 0) {
3503                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3504                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3505                 talloc_free(tmp_ctx);
3506                 return -1;
3507         }
3508
3509
3510         /* thaw all nodes */
3511         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3512         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3513                                         nodes, 0,
3514                                         TIMELIMIT(),
3515                                         false, tdb_null,
3516                                         NULL, NULL,
3517                                         NULL) != 0) {
3518                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3519                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3520                 talloc_free(tmp_ctx);
3521                 return -1;
3522         }
3523
3524
3525         talloc_free(tmp_ctx);
3526         return 0;
3527 }
3528
3529 /*
3530  * dump a database backup from a file
3531  */
3532 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3533 {
3534         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3535         TDB_DATA outdata;
3536         struct db_file_header dbhdr;
3537         int i, fh;
3538         struct tm *tm;
3539         char tbuf[100];
3540         struct ctdb_rec_data *rec = NULL;
3541         struct ctdb_marshall_buffer *m;
3542
3543         if (argc != 1) {
3544                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3545                 return -1;
3546         }
3547
3548         fh = open(argv[0], O_RDONLY);
3549         if (fh == -1) {
3550                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3551                 talloc_free(tmp_ctx);
3552                 return -1;
3553         }
3554
3555         read(fh, &dbhdr, sizeof(dbhdr));
3556         if (dbhdr.version != DB_VERSION) {
3557                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3558                 talloc_free(tmp_ctx);
3559                 return -1;
3560         }
3561
3562         outdata.dsize = dbhdr.size;
3563         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3564         if (outdata.dptr == NULL) {
3565                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3566                 close(fh);
3567                 talloc_free(tmp_ctx);
3568                 return -1;
3569         }
3570         read(fh, outdata.dptr, outdata.dsize);
3571         close(fh);
3572         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3573
3574         tm = localtime(&dbhdr.timestamp);
3575         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3576         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3577                 dbhdr.name, m->db_id, tbuf);
3578
3579         for (i=0; i < m->count; i++) {
3580                 uint32_t reqid = 0;
3581                 TDB_DATA key, data;
3582
3583                 /* we do not want the header splitted, so we pass NULL*/
3584                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3585                                               NULL, &key, &data);
3586
3587                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3588         }
3589
3590         printf("Dumped %d records\n", i);
3591         talloc_free(tmp_ctx);
3592         return 0;
3593 }
3594
3595 /*
3596  * wipe a database from a file
3597  */
3598 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3599                           const char **argv)
3600 {
3601         int ret;
3602         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3603         TDB_DATA data;
3604         struct ctdb_db_context *ctdb_db;
3605         struct ctdb_node_map *nodemap = NULL;
3606         struct ctdb_vnn_map *vnnmap = NULL;
3607         int i;
3608         struct ctdb_control_wipe_database w;
3609         uint32_t *nodes;
3610         uint32_t generation;
3611         struct ctdb_dbid_map *dbmap = NULL;
3612
3613         if (argc != 1) {
3614                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3615                 return -1;
3616         }
3617
3618         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3619                                  &dbmap);
3620         if (ret != 0) {
3621                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3622                                   options.pnn));
3623                 return ret;
3624         }
3625
3626         for(i=0;i<dbmap->num;i++){
3627                 const char *name;
3628
3629                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3630                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3631                 if(!strcmp(argv[0], name)){
3632                         talloc_free(discard_const(name));
3633                         break;
3634                 }
3635                 talloc_free(discard_const(name));
3636         }
3637         if (i == dbmap->num) {
3638                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3639                                   argv[0]));
3640                 talloc_free(tmp_ctx);
3641                 return -1;
3642         }
3643
3644         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3645         if (ctdb_db == NULL) {
3646                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3647                                   argv[0]));
3648                 talloc_free(tmp_ctx);
3649                 return -1;
3650         }
3651
3652         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3653                                    &nodemap);
3654         if (ret != 0) {
3655                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3656                                   options.pnn));
3657                 talloc_free(tmp_ctx);
3658                 return ret;
3659         }
3660
3661         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3662                                   &vnnmap);
3663         if (ret != 0) {
3664                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3665                                   options.pnn));
3666                 talloc_free(tmp_ctx);
3667                 return ret;
3668         }
3669
3670         /* freeze all nodes */
3671         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3672         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3673                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3674                                                 nodes, i,
3675                                                 TIMELIMIT(),
3676                                                 false, tdb_null,
3677                                                 NULL, NULL,
3678                                                 NULL);
3679                 if (ret != 0) {
3680                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3681                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3682                                              CTDB_RECOVERY_ACTIVE);
3683                         talloc_free(tmp_ctx);
3684                         return -1;
3685                 }
3686         }
3687
3688         generation = vnnmap->generation;
3689         data.dptr = (void *)&generation;
3690         data.dsize = sizeof(generation);
3691
3692         /* start a cluster wide transaction */
3693         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3694         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3695                                         nodes, 0,
3696                                         TIMELIMIT(), false, data,
3697                                         NULL, NULL,
3698                                         NULL);
3699         if (ret!= 0) {
3700                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3701                                   "transactions.\n"));
3702                 return -1;
3703         }
3704
3705         w.db_id = ctdb_db->db_id;
3706         w.transaction_id = generation;
3707
3708         data.dptr = (void *)&w;
3709         data.dsize = sizeof(w);
3710
3711         /* wipe all the remote databases. */
3712         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3713         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3714                                         nodes, 0,
3715                                         TIMELIMIT(), false, data,
3716                                         NULL, NULL,
3717                                         NULL) != 0) {
3718                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3719                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3720                 talloc_free(tmp_ctx);
3721                 return -1;
3722         }
3723
3724         data.dptr = (void *)&ctdb_db->db_id;
3725         data.dsize = sizeof(ctdb_db->db_id);
3726
3727         /* mark the database as healthy */
3728         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3729         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3730                                         nodes, 0,
3731                                         TIMELIMIT(), false, data,
3732                                         NULL, NULL,
3733                                         NULL) != 0) {
3734                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3735                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3736                 talloc_free(tmp_ctx);
3737                 return -1;
3738         }
3739
3740         data.dptr = (void *)&generation;
3741         data.dsize = sizeof(generation);
3742
3743         /* commit all the changes */
3744         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3745                                         nodes, 0,
3746                                         TIMELIMIT(), false, data,
3747                                         NULL, NULL,
3748                                         NULL) != 0) {
3749                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3750                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3751                 talloc_free(tmp_ctx);
3752                 return -1;
3753         }
3754
3755         /* thaw all nodes */
3756         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3757         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3758                                         nodes, 0,
3759                                         TIMELIMIT(),
3760                                         false, tdb_null,
3761                                         NULL, NULL,
3762                                         NULL) != 0) {
3763                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3764                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3765                 talloc_free(tmp_ctx);
3766                 return -1;
3767         }
3768
3769         talloc_free(tmp_ctx);
3770         return 0;
3771 }
3772
3773 /*
3774  * set flags of a node in the nodemap
3775  */
3776 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3777 {
3778         int ret;
3779         int32_t status;
3780         int node;
3781         int flags;
3782         TDB_DATA data;
3783         struct ctdb_node_flag_change c;
3784
3785         if (argc != 2) {
3786                 usage();
3787                 return -1;
3788         }
3789
3790         if (sscanf(argv[0], "%d", &node) != 1) {
3791                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3792                 usage();
3793                 return -1;
3794         }
3795         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3796                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3797                 usage();
3798                 return -1;
3799         }
3800
3801         c.pnn       = node;
3802         c.old_flags = 0;
3803         c.new_flags = flags;
3804
3805         data.dsize = sizeof(c);
3806         data.dptr = (unsigned char *)&c;
3807
3808         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3809                            data, NULL, NULL, &status, NULL, NULL);
3810         if (ret != 0 || status != 0) {
3811                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3812                 return -1;
3813         }
3814         return 0;
3815 }
3816
3817 /*
3818   dump memory usage
3819  */
3820 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3821 {
3822         TDB_DATA data;
3823         int ret;
3824         int32_t res;
3825         char *errmsg;
3826         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3827         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3828                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3829         if (ret != 0 || res != 0) {
3830                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3831                 talloc_free(tmp_ctx);
3832                 return -1;
3833         }
3834         write(1, data.dptr, data.dsize);
3835         talloc_free(tmp_ctx);
3836         return 0;
3837 }
3838
3839 /*
3840   handler for memory dumps
3841 */
3842 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3843                              TDB_DATA data, void *private_data)
3844 {
3845         write(1, data.dptr, data.dsize);
3846         exit(0);
3847 }
3848
3849 /*
3850   dump memory usage on the recovery daemon
3851  */
3852 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3853 {
3854         int ret;
3855         TDB_DATA data;
3856         struct rd_memdump_reply rd;
3857
3858         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3859         if (rd.pnn == -1) {
3860                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3861                 return -1;
3862         }
3863         rd.srvid = getpid();
3864
3865         /* register a message port for receiveing the reply so that we
3866            can receive the reply
3867         */
3868         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3869
3870
3871         data.dptr = (uint8_t *)&rd;
3872         data.dsize = sizeof(rd);
3873
3874         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3875         if (ret != 0) {
3876                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3877                 return -1;
3878         }
3879
3880         /* this loop will terminate when we have received the reply */
3881         while (1) {     
3882                 event_loop_once(ctdb->ev);
3883         }
3884
3885         return 0;
3886 }
3887
3888 /*
3889   list all nodes in the cluster
3890   if the daemon is running, we read the data from the daemon.
3891   if the daemon is not running we parse the nodes file directly
3892  */
3893 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3894 {
3895         int i, ret;
3896         struct ctdb_node_map *nodemap=NULL;
3897
3898         if (ctdb != NULL) {
3899                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3900                 if (ret != 0) {
3901                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3902                         return ret;
3903                 }
3904
3905                 for(i=0;i<nodemap->num;i++){
3906                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3907                                 continue;
3908                         }
3909                         if (options.machinereadable){
3910                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3911                         } else {
3912                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3913                         }
3914                 }
3915         } else {
3916                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3917                 struct pnn_node *pnn_nodes;
3918                 struct pnn_node *pnn_node;
3919         
3920                 pnn_nodes = read_nodes_file(mem_ctx);
3921                 if (pnn_nodes == NULL) {
3922                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3923                         talloc_free(mem_ctx);
3924                         return -1;
3925                 }
3926
3927                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3928                         ctdb_sock_addr addr;
3929
3930                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3931                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3932                                 talloc_free(mem_ctx);
3933                                 return -1;
3934                         }
3935
3936                         if (options.machinereadable){
3937                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3938                         } else {
3939                                 printf("%s\n", pnn_node->addr);
3940                         }
3941                 }
3942                 talloc_free(mem_ctx);
3943         }
3944
3945         return 0;
3946 }
3947
3948 /*
3949   reload the nodes file on the local node
3950  */
3951 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3952 {
3953         int i, ret;
3954         int mypnn;
3955         struct ctdb_node_map *nodemap=NULL;
3956
3957         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3958         if (mypnn == -1) {
3959                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3960                 return -1;
3961         }
3962
3963         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3964         if (ret != 0) {
3965                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3966                 return ret;
3967         }
3968
3969         /* reload the nodes file on all remote nodes */
3970         for (i=0;i<nodemap->num;i++) {
3971                 if (nodemap->nodes[i].pnn == mypnn) {
3972                         continue;
3973                 }
3974                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3975                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3976                         nodemap->nodes[i].pnn);
3977                 if (ret != 0) {
3978                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3979                 }
3980         }
3981
3982         /* reload the nodes file on the local node */
3983         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3984         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3985         if (ret != 0) {
3986                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3987         }
3988
3989         /* initiate a recovery */
3990         control_recover(ctdb, argc, argv);
3991
3992         return 0;
3993 }
3994
3995
3996 static const struct {
3997         const char *name;
3998         int (*fn)(struct ctdb_context *, int, const char **);
3999         bool auto_all;
4000         bool without_daemon; /* can be run without daemon running ? */
4001         const char *msg;
4002         const char *args;
4003 } ctdb_commands[] = {
4004 #ifdef CTDB_VERS
4005         { "version",         control_version,           true,   false,  "show version of ctdb" },
4006 #endif
4007         { "status",          control_status,            true,   false,  "show node status" },
4008         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4009         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4010         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4011         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4012         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4013         { "statistics",      control_statistics,        false,  false, "show statistics" },
4014         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4015         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4016         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4017         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4018         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4019         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4020         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4021         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4022         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4023         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4024         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4025         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4026         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4027         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4028         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4029         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4030         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4031         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4032         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4033         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4034         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4035         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4036         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4037         { "stop",            control_stop,              true,   false,  "stop a node" },
4038         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4039         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4040         { "unban",           control_unban,             true,   false,  "unban a node" },
4041         { "showban",         control_showban,           true,   false,  "show ban information"},
4042         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4043         { "recover",         control_recover,           true,   false,  "force recovery" },
4044         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4045         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4046         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4047         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4048         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4049         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4050         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4051         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4052
4053         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4054         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4055         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4056         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4057         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4058         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4059         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4060         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4061         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4062         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4063         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4064         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4065         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4066         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4067         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4068         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4069         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4070         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4071         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4072         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4073         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4074         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4075         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4076         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4077         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4078         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4079         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4080         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4081         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4082         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4083 };
4084
4085 /*
4086   show usage message
4087  */
4088 static void usage(void)
4089 {
4090         int i;
4091         printf(
4092 "Usage: ctdb [options] <control>\n" \
4093 "Options:\n" \
4094 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4095 "   -Y                 generate machinereadable output\n"
4096 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4097         printf("Controls:\n");
4098         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4099                 printf("  %-15s %-27s  %s\n", 
4100                        ctdb_commands[i].name, 
4101                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4102                        ctdb_commands[i].msg);
4103         }
4104         exit(1);
4105 }
4106
4107
4108 static void ctdb_alarm(int sig)
4109 {
4110         printf("Maximum runtime exceeded - exiting\n");
4111         _exit(ERR_TIMEOUT);
4112 }
4113
4114 /*
4115   main program
4116 */
4117 int main(int argc, const char *argv[])
4118 {
4119         struct ctdb_context *ctdb;
4120         char *nodestring = NULL;
4121         struct poptOption popt_options[] = {
4122                 POPT_AUTOHELP
4123                 POPT_CTDB_CMDLINE
4124                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4125                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4126                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4127                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4128                 POPT_TABLEEND
4129         };
4130         int opt;
4131         const char **extra_argv;
4132         int extra_argc = 0;
4133         int ret=-1, i;
4134         poptContext pc;
4135         struct event_context *ev;
4136         const char *control;
4137
4138         setlinebuf(stdout);
4139         
4140         /* set some defaults */
4141         options.maxruntime = 0;
4142         options.timelimit = 3;
4143         options.pnn = CTDB_CURRENT_NODE;
4144
4145         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4146
4147         while ((opt = poptGetNextOpt(pc)) != -1) {
4148                 switch (opt) {
4149                 default:
4150                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4151                                 poptBadOption(pc, 0), poptStrerror(opt)));
4152                         exit(1);
4153                 }
4154         }
4155
4156         /* setup the remaining options for the main program to use */
4157         extra_argv = poptGetArgs(pc);
4158         if (extra_argv) {
4159                 extra_argv++;
4160                 while (extra_argv[extra_argc]) extra_argc++;
4161         }
4162
4163         if (extra_argc < 1) {
4164                 usage();
4165         }
4166
4167         if (options.maxruntime == 0) {
4168                 const char *ctdb_timeout;
4169                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4170                 if (ctdb_timeout != NULL) {
4171                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4172                 } else {
4173                         /* default timeout is 120 seconds */
4174                         options.maxruntime = 120;
4175                 }
4176         }
4177
4178         signal(SIGALRM, ctdb_alarm);
4179         alarm(options.maxruntime);
4180
4181         /* setup the node number to contact */
4182         if (nodestring != NULL) {
4183                 if (strcmp(nodestring, "all") == 0) {
4184                         options.pnn = CTDB_BROADCAST_ALL;
4185                 } else {
4186                         options.pnn = strtoul(nodestring, NULL, 0);
4187                 }
4188         }
4189
4190         control = extra_argv[0];
4191
4192         ev = event_context_init(NULL);
4193
4194         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4195                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4196                         int j;
4197
4198                         if (ctdb_commands[i].without_daemon == true) {
4199                                 close(2);
4200                         }
4201
4202                         /* initialise ctdb */
4203                         ctdb = ctdb_cmdline_client(ev);
4204
4205                         if (ctdb_commands[i].without_daemon == false) {
4206                                 if (ctdb == NULL) {
4207                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4208                                         exit(1);
4209                                 }
4210
4211                                 /* verify the node exists */
4212                                 verify_node(ctdb);
4213
4214                                 if (options.pnn == CTDB_CURRENT_NODE) {
4215                                         int pnn;
4216                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4217                                         if (pnn == -1) {
4218                                                 return -1;
4219                                         }
4220                                         options.pnn = pnn;
4221                                 }
4222                         }
4223
4224                         if (ctdb_commands[i].auto_all && 
4225                             options.pnn == CTDB_BROADCAST_ALL) {
4226                                 uint32_t *nodes;
4227                                 uint32_t num_nodes;
4228                                 ret = 0;
4229
4230                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4231                                 CTDB_NO_MEMORY(ctdb, nodes);
4232         
4233                                 for (j=0;j<num_nodes;j++) {
4234                                         options.pnn = nodes[j];
4235                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4236                                 }
4237                                 talloc_free(nodes);
4238                         } else {
4239                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4240                         }
4241                         break;
4242                 }
4243         }
4244
4245         if (i == ARRAY_SIZE(ctdb_commands)) {
4246                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4247                 exit(1);
4248         }
4249
4250         return ret;
4251 }