add a command to write a record to a persistent database
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         int machinereadable;
47         int verbose;
48         int maxruntime;
49 } options;
50
51 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
52 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
53
54 #ifdef CTDB_VERS
55 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
56 {
57 #define STR(x) #x
58 #define XSTR(x) STR(x)
59         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
60         return 0;
61 }
62 #endif
63
64
65 /*
66   verify that a node exists and is reachable
67  */
68 static void verify_node(struct ctdb_context *ctdb)
69 {
70         int ret;
71         struct ctdb_node_map *nodemap=NULL;
72
73         if (options.pnn == CTDB_CURRENT_NODE) {
74                 return;
75         }
76         if (options.pnn == CTDB_BROADCAST_ALL) {
77                 return;
78         }
79
80         /* verify the node exists */
81         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
82                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
83                 exit(10);
84         }
85         if (options.pnn >= nodemap->num) {
86                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
87                 exit(ERR_NONODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
94                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
95                 exit(ERR_DISNODE);
96         }
97
98         /* verify we can access the node */
99         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
100         if (ret == -1) {
101                 DEBUG(DEBUG_ERR,("Can not access node. Node is not operational.\n"));
102                 exit(10);
103         }
104 }
105
106 /*
107  check if a database exists
108 */
109 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
110 {
111         int i, ret;
112         struct ctdb_dbid_map *dbmap=NULL;
113
114         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
115         if (ret != 0) {
116                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
117                 return -1;
118         }
119
120         for(i=0;i<dbmap->num;i++){
121                 const char *name;
122
123                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
124                 if (!strcmp(name, db_name)) {
125                         return 0;
126                 }
127         }
128
129         return -1;
130 }
131
132 /*
133   see if a process exists
134  */
135 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
136 {
137         uint32_t pnn, pid;
138         int ret;
139         if (argc < 1) {
140                 usage();
141         }
142
143         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
144                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
145                 return -1;
146         }
147
148         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
149         if (ret == 0) {
150                 printf("%u:%u exists\n", pnn, pid);
151         } else {
152                 printf("%u:%u does not exist\n", pnn, pid);
153         }
154         return ret;
155 }
156
157 /*
158   display statistics structure
159  */
160 static void show_statistics(struct ctdb_statistics *s)
161 {
162         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
163         int i;
164         const char *prefix=NULL;
165         int preflen=0;
166         int tmp, days, hours, minutes, seconds;
167         const struct {
168                 const char *name;
169                 uint32_t offset;
170         } fields[] = {
171 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
172                 STATISTICS_FIELD(num_clients),
173                 STATISTICS_FIELD(frozen),
174                 STATISTICS_FIELD(recovering),
175                 STATISTICS_FIELD(num_recoveries),
176                 STATISTICS_FIELD(client_packets_sent),
177                 STATISTICS_FIELD(client_packets_recv),
178                 STATISTICS_FIELD(node_packets_sent),
179                 STATISTICS_FIELD(node_packets_recv),
180                 STATISTICS_FIELD(keepalive_packets_sent),
181                 STATISTICS_FIELD(keepalive_packets_recv),
182                 STATISTICS_FIELD(node.req_call),
183                 STATISTICS_FIELD(node.reply_call),
184                 STATISTICS_FIELD(node.req_dmaster),
185                 STATISTICS_FIELD(node.reply_dmaster),
186                 STATISTICS_FIELD(node.reply_error),
187                 STATISTICS_FIELD(node.req_message),
188                 STATISTICS_FIELD(node.req_control),
189                 STATISTICS_FIELD(node.reply_control),
190                 STATISTICS_FIELD(client.req_call),
191                 STATISTICS_FIELD(client.req_message),
192                 STATISTICS_FIELD(client.req_control),
193                 STATISTICS_FIELD(timeouts.call),
194                 STATISTICS_FIELD(timeouts.control),
195                 STATISTICS_FIELD(timeouts.traverse),
196                 STATISTICS_FIELD(total_calls),
197                 STATISTICS_FIELD(pending_calls),
198                 STATISTICS_FIELD(lockwait_calls),
199                 STATISTICS_FIELD(pending_lockwait_calls),
200                 STATISTICS_FIELD(childwrite_calls),
201                 STATISTICS_FIELD(pending_childwrite_calls),
202                 STATISTICS_FIELD(memory_used),
203                 STATISTICS_FIELD(max_hop_count),
204         };
205         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
206         seconds = tmp%60;
207         tmp    /= 60;
208         minutes = tmp%60;
209         tmp    /= 60;
210         hours   = tmp%24;
211         tmp    /= 24;
212         days    = tmp;
213
214         printf("CTDB version %u\n", CTDB_VERSION);
215         printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
216         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
217
218         for (i=0;i<ARRAY_SIZE(fields);i++) {
219                 if (strchr(fields[i].name, '.')) {
220                         preflen = strcspn(fields[i].name, ".")+1;
221                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
222                                 prefix = fields[i].name;
223                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
224                         }
225                 } else {
226                         preflen = 0;
227                 }
228                 printf(" %*s%-22s%*s%10u\n", 
229                        preflen?4:0, "",
230                        fields[i].name+preflen, 
231                        preflen?0:4, "",
232                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
233         }
234         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
235         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
236
237         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
238         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
239         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
240         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
241
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   display remote ctdb statistics combined from all nodes
247  */
248 static int control_statistics_all(struct ctdb_context *ctdb)
249 {
250         int ret, i;
251         struct ctdb_statistics statistics;
252         uint32_t *nodes;
253         uint32_t num_nodes;
254
255         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
256         CTDB_NO_MEMORY(ctdb, nodes);
257         
258         ZERO_STRUCT(statistics);
259
260         for (i=0;i<num_nodes;i++) {
261                 struct ctdb_statistics s1;
262                 int j;
263                 uint32_t *v1 = (uint32_t *)&s1;
264                 uint32_t *v2 = (uint32_t *)&statistics;
265                 uint32_t num_ints = 
266                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
267                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
268                 if (ret != 0) {
269                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
270                         return ret;
271                 }
272                 for (j=0;j<num_ints;j++) {
273                         v2[j] += v1[j];
274                 }
275                 statistics.max_hop_count = 
276                         MAX(statistics.max_hop_count, s1.max_hop_count);
277                 statistics.max_call_latency = 
278                         MAX(statistics.max_call_latency, s1.max_call_latency);
279                 statistics.max_lockwait_latency = 
280                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
281         }
282         talloc_free(nodes);
283         printf("Gathered statistics for %u nodes\n", num_nodes);
284         show_statistics(&statistics);
285         return 0;
286 }
287
288 /*
289   display remote ctdb statistics
290  */
291 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
292 {
293         int ret;
294         struct ctdb_statistics statistics;
295
296         if (options.pnn == CTDB_BROADCAST_ALL) {
297                 return control_statistics_all(ctdb);
298         }
299
300         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
301         if (ret != 0) {
302                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
303                 return ret;
304         }
305         show_statistics(&statistics);
306         return 0;
307 }
308
309
310 /*
311   reset remote ctdb statistics
312  */
313 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
314 {
315         int ret;
316
317         ret = ctdb_statistics_reset(ctdb, options.pnn);
318         if (ret != 0) {
319                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
320                 return ret;
321         }
322         return 0;
323 }
324
325
326 /*
327   display uptime of remote node
328  */
329 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
330 {
331         int ret;
332         struct ctdb_uptime *uptime = NULL;
333         int tmp, days, hours, minutes, seconds;
334
335         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
336         if (ret != 0) {
337                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
338                 return ret;
339         }
340
341         if (options.machinereadable){
342                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
343                 printf(":%u:%u:%u:%lf\n",
344                         (unsigned int)uptime->current_time.tv_sec,
345                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
346                         (unsigned int)uptime->last_recovery_finished.tv_sec,
347                         timeval_delta(&uptime->last_recovery_finished,
348                                       &uptime->last_recovery_started)
349                 );
350                 return 0;
351         }
352
353         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
354
355         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
356         seconds = tmp%60;
357         tmp    /= 60;
358         minutes = tmp%60;
359         tmp    /= 60;
360         hours   = tmp%24;
361         tmp    /= 24;
362         days    = tmp;
363         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
364
365         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
366         seconds = tmp%60;
367         tmp    /= 60;
368         minutes = tmp%60;
369         tmp    /= 60;
370         hours   = tmp%24;
371         tmp    /= 24;
372         days    = tmp;
373         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
374         
375         printf("Duration of last recovery/failover: %lf seconds\n",
376                 timeval_delta(&uptime->last_recovery_finished,
377                               &uptime->last_recovery_started));
378
379         return 0;
380 }
381
382 /*
383   show the PNN of the current node
384  */
385 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
386 {
387         uint32_t mypnn;
388         bool ret;
389
390         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
391         if (!ret) {
392                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
393                 return -1;
394         }
395
396         printf("PNN:%d\n", mypnn);
397         return 0;
398 }
399
400
401 struct pnn_node {
402         struct pnn_node *next;
403         const char *addr;
404         int pnn;
405 };
406
407 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
408 {
409         const char *nodes_list;
410         int nlines;
411         char **lines;
412         int i, pnn;
413         struct pnn_node *pnn_nodes = NULL;
414         struct pnn_node *pnn_node;
415         struct pnn_node *tmp_node;
416
417         /* read the nodes file */
418         nodes_list = getenv("CTDB_NODES");
419         if (nodes_list == NULL) {
420                 nodes_list = "/etc/ctdb/nodes";
421         }
422         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
423         if (lines == NULL) {
424                 return NULL;
425         }
426         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
427                 nlines--;
428         }
429         for (i=0, pnn=0; i<nlines; i++) {
430                 char *node;
431
432                 node = lines[i];
433                 /* strip leading spaces */
434                 while((*node == ' ') || (*node == '\t')) {
435                         node++;
436                 }
437                 if (*node == '#') {
438                         pnn++;
439                         continue;
440                 }
441                 if (strcmp(node, "") == 0) {
442                         continue;
443                 }
444                 pnn_node = talloc(mem_ctx, struct pnn_node);
445                 pnn_node->pnn = pnn++;
446                 pnn_node->addr = talloc_strdup(pnn_node, node);
447                 pnn_node->next = pnn_nodes;
448                 pnn_nodes = pnn_node;
449         }
450
451         /* swap them around so we return them in incrementing order */
452         pnn_node = pnn_nodes;
453         pnn_nodes = NULL;
454         while (pnn_node) {
455                 tmp_node = pnn_node;
456                 pnn_node = pnn_node->next;
457
458                 tmp_node->next = pnn_nodes;
459                 pnn_nodes = tmp_node;
460         }
461
462         return pnn_nodes;
463 }
464
465 /*
466   show the PNN of the current node
467   discover the pnn by loading the nodes file and try to bind to all
468   addresses one at a time until the ip address is found.
469  */
470 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
471 {
472         TALLOC_CTX *mem_ctx = talloc_new(NULL);
473         struct pnn_node *pnn_nodes;
474         struct pnn_node *pnn_node;
475
476         pnn_nodes = read_nodes_file(mem_ctx);
477         if (pnn_nodes == NULL) {
478                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
479                 talloc_free(mem_ctx);
480                 return -1;
481         }
482
483         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
484                 ctdb_sock_addr addr;
485
486                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
487                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
488                         talloc_free(mem_ctx);
489                         return -1;
490                 }
491
492                 if (ctdb_sys_have_ip(&addr)) {
493                         printf("PNN:%d\n", pnn_node->pnn);
494                         talloc_free(mem_ctx);
495                         return 0;
496                 }
497         }
498
499         printf("Failed to detect which PNN this node is\n");
500         talloc_free(mem_ctx);
501         return -1;
502 }
503
504 /*
505   display remote ctdb status
506  */
507 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
508 {
509         int i, ret;
510         struct ctdb_vnn_map *vnnmap=NULL;
511         struct ctdb_node_map *nodemap=NULL;
512         uint32_t recmode, recmaster;
513         int mypnn;
514
515         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
516         if (mypnn == -1) {
517                 return -1;
518         }
519
520         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
521         if (ret != 0) {
522                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
523                 return ret;
524         }
525
526         if(options.machinereadable){
527                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:\n");
528                 for(i=0;i<nodemap->num;i++){
529                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
530                                 continue;
531                         }
532                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
533                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
538                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
539                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE));
540                 }
541                 return 0;
542         }
543
544         printf("Number of nodes:%d\n", nodemap->num);
545         for(i=0;i<nodemap->num;i++){
546                 static const struct {
547                         uint32_t flag;
548                         const char *name;
549                 } flag_names[] = {
550                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
551                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
552                         { NODE_FLAGS_BANNED,                "BANNED" },
553                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
554                         { NODE_FLAGS_DELETED,               "DELETED" },
555                         { NODE_FLAGS_STOPPED,               "STOPPED" },
556                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
557                 };
558                 char *flags_str = NULL;
559                 int j;
560
561                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
562                         continue;
563                 }
564                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
565                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
566                                 if (flags_str == NULL) {
567                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
568                                 } else {
569                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
570                                                                            flag_names[j].name);
571                                 }
572                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
573                         }
574                 }
575                 if (flags_str == NULL) {
576                         flags_str = talloc_strdup(ctdb, "OK");
577                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
578                 }
579                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
580                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
581                        flags_str,
582                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
583                 talloc_free(flags_str);
584         }
585
586         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
587         if (ret != 0) {
588                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
589                 return ret;
590         }
591         if (vnnmap->generation == INVALID_GENERATION) {
592                 printf("Generation:INVALID\n");
593         } else {
594                 printf("Generation:%d\n",vnnmap->generation);
595         }
596         printf("Size:%d\n",vnnmap->size);
597         for(i=0;i<vnnmap->size;i++){
598                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
599         }
600
601         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
602         if (ret != 0) {
603                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
604                 return ret;
605         }
606         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
607
608         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
609         if (ret != 0) {
610                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
611                 return ret;
612         }
613         printf("Recovery master:%d\n",recmaster);
614
615         return 0;
616 }
617
618
619 struct natgw_node {
620         struct natgw_node *next;
621         const char *addr;
622 };
623
624 /*
625   display the list of nodes belonging to this natgw configuration
626  */
627 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
628 {
629         int i, ret;
630         const char *natgw_list;
631         int nlines;
632         char **lines;
633         struct natgw_node *natgw_nodes = NULL;
634         struct natgw_node *natgw_node;
635         struct ctdb_node_map *nodemap=NULL;
636
637
638         /* read the natgw nodes file into a linked list */
639         natgw_list = getenv("NATGW_NODES");
640         if (natgw_list == NULL) {
641                 natgw_list = "/etc/ctdb/natgw_nodes";
642         }
643         lines = file_lines_load(natgw_list, &nlines, ctdb);
644         if (lines == NULL) {
645                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
646                 return -1;
647         }
648         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
649                 nlines--;
650         }
651         for (i=0;i<nlines;i++) {
652                 char *node;
653
654                 node = lines[i];
655                 /* strip leading spaces */
656                 while((*node == ' ') || (*node == '\t')) {
657                         node++;
658                 }
659                 if (*node == '#') {
660                         continue;
661                 }
662                 if (strcmp(node, "") == 0) {
663                         continue;
664                 }
665                 natgw_node = talloc(ctdb, struct natgw_node);
666                 natgw_node->addr = talloc_strdup(natgw_node, node);
667                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
668                 natgw_node->next = natgw_nodes;
669                 natgw_nodes = natgw_node;
670         }
671
672         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
673         if (ret != 0) {
674                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
675                 return ret;
676         }
677
678         i=0;
679         while(i<nodemap->num) {
680                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
681                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
682                                 break;
683                         }
684                 }
685
686                 /* this node was not in the natgw so we just remove it from
687                  * the list
688                  */
689                 if ((natgw_node == NULL) 
690                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
691                         int j;
692
693                         for (j=i+1; j<nodemap->num; j++) {
694                                 nodemap->nodes[j-1] = nodemap->nodes[j];
695                         }
696                         nodemap->num--;
697                         continue;
698                 }
699
700                 i++;
701         }               
702
703         /* pick a node to be natgwmaster
704          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
705          */
706         for(i=0;i<nodemap->num;i++){
707                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
708                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
709                         break;
710                 }
711         }
712         /* we couldnt find any healthy node, try unhealthy ones */
713         if (i == nodemap->num) {
714                 for(i=0;i<nodemap->num;i++){
715                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
716                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
717                                 break;
718                         }
719                 }
720         }
721         /* unless all nodes are STOPPED, when we pick one anyway */
722         if (i == nodemap->num) {
723                 for(i=0;i<nodemap->num;i++){
724                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
725                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
726                                 break;
727                         }
728                 }
729                 /* or if we still can not find any */
730                 if (i == nodemap->num) {
731                         printf("-1 0.0.0.0\n");
732                 }
733         }
734
735         /* print the pruned list of nodes belonging to this natgw list */
736         for(i=0;i<nodemap->num;i++){
737                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
738                         continue;
739                 }
740                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
741                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
742                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
743                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
744                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
745                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
746                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
747         }
748
749         return 0;
750 }
751
752 /*
753   display the status of the scripts for monitoring (or other events)
754  */
755 static int control_one_scriptstatus(struct ctdb_context *ctdb,
756                                     enum ctdb_eventscript_call type)
757 {
758         struct ctdb_scripts_wire *script_status;
759         int ret, i;
760
761         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
762         if (ret != 0) {
763                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
764                 return ret;
765         }
766
767         if (script_status == NULL) {
768                 if (!options.machinereadable) {
769                         printf("%s cycle never run\n",
770                                ctdb_eventscript_call_names[type]);
771                 }
772                 return 0;
773         }
774
775         if (!options.machinereadable) {
776                 printf("%d scripts were executed last %s cycle\n",
777                        script_status->num_scripts,
778                        ctdb_eventscript_call_names[type]);
779         }
780         for (i=0; i<script_status->num_scripts; i++) {
781                 const char *status = NULL;
782
783                 switch (script_status->scripts[i].status) {
784                 case -ETIME:
785                         status = "TIMEDOUT";
786                         break;
787                 case -ENOEXEC:
788                         status = "DISABLED";
789                         break;
790                 case 0:
791                         status = "OK";
792                         break;
793                 default:
794                         if (script_status->scripts[i].status > 0)
795                                 status = "ERROR";
796                         break;
797                 }
798                 if (options.machinereadable) {
799                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
800                                ctdb_eventscript_call_names[type],
801                                script_status->scripts[i].name,
802                                script_status->scripts[i].status,
803                                status,
804                                (long)script_status->scripts[i].start.tv_sec,
805                                (long)script_status->scripts[i].start.tv_usec,
806                                (long)script_status->scripts[i].finished.tv_sec,
807                                (long)script_status->scripts[i].finished.tv_usec,
808                                script_status->scripts[i].output);
809                         continue;
810                 }
811                 if (status)
812                         printf("%-20s Status:%s    ",
813                                script_status->scripts[i].name, status);
814                 else
815                         /* Some other error, eg from stat. */
816                         printf("%-20s Status:CANNOT RUN (%s)",
817                                script_status->scripts[i].name,
818                                strerror(-script_status->scripts[i].status));
819
820                 if (script_status->scripts[i].status >= 0) {
821                         printf("Duration:%.3lf ",
822                         timeval_delta(&script_status->scripts[i].finished,
823                               &script_status->scripts[i].start));
824                 }
825                 if (script_status->scripts[i].status != -ENOEXEC) {
826                         printf("%s",
827                                ctime(&script_status->scripts[i].start.tv_sec));
828                         if (script_status->scripts[i].status != 0) {
829                                 printf("   OUTPUT:%s\n",
830                                        script_status->scripts[i].output);
831                         }
832                 } else {
833                         printf("\n");
834                 }
835         }
836         return 0;
837 }
838
839
840 static int control_scriptstatus(struct ctdb_context *ctdb,
841                                 int argc, const char **argv)
842 {
843         int ret;
844         enum ctdb_eventscript_call type, min, max;
845         const char *arg;
846
847         if (argc > 1) {
848                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
849                 return -1;
850         }
851
852         if (argc == 0)
853                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
854         else
855                 arg = argv[0];
856
857         for (type = 0; type < CTDB_EVENT_MAX; type++) {
858                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
859                         min = type;
860                         max = type+1;
861                         break;
862                 }
863         }
864         if (type == CTDB_EVENT_MAX) {
865                 if (strcmp(arg, "all") == 0) {
866                         min = 0;
867                         max = CTDB_EVENT_MAX;
868                 } else {
869                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
870                         return -1;
871                 }
872         }
873
874         if (options.machinereadable) {
875                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
876         }
877
878         for (type = min; type < max; type++) {
879                 ret = control_one_scriptstatus(ctdb, type);
880                 if (ret != 0) {
881                         return ret;
882                 }
883         }
884
885         return 0;
886 }
887
888 /*
889   enable an eventscript
890  */
891 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
892 {
893         int ret;
894
895         if (argc < 1) {
896                 usage();
897         }
898
899         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
900         if (ret != 0) {
901           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
902                 return ret;
903         }
904
905         return 0;
906 }
907
908 /*
909   disable an eventscript
910  */
911 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
912 {
913         int ret;
914
915         if (argc < 1) {
916                 usage();
917         }
918
919         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
920         if (ret != 0) {
921           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
922                 return ret;
923         }
924
925         return 0;
926 }
927
928 /*
929   display the pnn of the recovery master
930  */
931 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
932 {
933         int ret;
934         uint32_t recmaster;
935
936         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
937         if (ret != 0) {
938                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
939                 return ret;
940         }
941         printf("%d\n",recmaster);
942
943         return 0;
944 }
945
946 /*
947   add a tickle to a public address
948  */
949 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
950 {
951         struct ctdb_tcp_connection t;
952         TDB_DATA data;
953         int ret;
954
955         if (argc < 2) {
956                 usage();
957         }
958
959         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
960                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
961                 return -1;
962         }
963         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
964                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
965                 return -1;
966         }
967
968         data.dptr = (uint8_t *)&t;
969         data.dsize = sizeof(t);
970
971         /* tell all nodes about this tcp connection */
972         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
973                            0, data, ctdb, NULL, NULL, NULL, NULL);
974         if (ret != 0) {
975                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
976                 return -1;
977         }
978         
979         return 0;
980 }
981
982
983 /*
984   delete a tickle from a node
985  */
986 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
987 {
988         struct ctdb_tcp_connection t;
989         TDB_DATA data;
990         int ret;
991
992         if (argc < 2) {
993                 usage();
994         }
995
996         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
997                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
998                 return -1;
999         }
1000         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1001                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1002                 return -1;
1003         }
1004
1005         data.dptr = (uint8_t *)&t;
1006         data.dsize = sizeof(t);
1007
1008         /* tell all nodes about this tcp connection */
1009         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1010                            0, data, ctdb, NULL, NULL, NULL, NULL);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1013                 return -1;
1014         }
1015         
1016         return 0;
1017 }
1018
1019
1020 /*
1021   get a list of all tickles for this pnn
1022  */
1023 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1024 {
1025         struct ctdb_control_tcp_tickle_list *list;
1026         ctdb_sock_addr addr;
1027         int i, ret;
1028         unsigned port = 0;
1029
1030         if (argc < 1) {
1031                 usage();
1032         }
1033
1034         if (argc == 2) {
1035                 port = atoi(argv[1]);
1036         }
1037
1038         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1039                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1040                 return -1;
1041         }
1042
1043         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1044         if (ret == -1) {
1045                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1046                 return -1;
1047         }
1048
1049         if (options.machinereadable){
1050                 printf(":source ip:port:destination ip:port:\n");
1051                 for (i=0;i<list->tickles.num;i++) {
1052                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1053                                 continue;
1054                         }
1055                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1056                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1057                 }
1058         } else {
1059                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1060                 printf("Num tickles:%u\n", list->tickles.num);
1061                 for (i=0;i<list->tickles.num;i++) {
1062                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1063                                 continue;
1064                         }
1065                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1066                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1067                 }
1068         }
1069
1070         talloc_free(list);
1071         
1072         return 0;
1073 }
1074
1075
1076 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1077 {
1078         struct ctdb_all_public_ips *ips;
1079         struct ctdb_public_ip ip;
1080         int i, ret;
1081         uint32_t *nodes;
1082         uint32_t disable_time;
1083         TDB_DATA data;
1084         struct ctdb_node_map *nodemap=NULL;
1085         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1086
1087         disable_time = 30;
1088         data.dptr  = (uint8_t*)&disable_time;
1089         data.dsize = sizeof(disable_time);
1090         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1091         if (ret != 0) {
1092                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1093                 return -1;
1094         }
1095
1096
1097
1098         /* read the public ip list from the node */
1099         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1100         if (ret != 0) {
1101                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1102                 talloc_free(tmp_ctx);
1103                 return -1;
1104         }
1105
1106         for (i=0;i<ips->num;i++) {
1107                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1108                         break;
1109                 }
1110         }
1111         if (i==ips->num) {
1112                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1113                         pnn, ctdb_addr_to_str(addr)));
1114                 talloc_free(tmp_ctx);
1115                 return -1;
1116         }
1117
1118         ip.pnn  = pnn;
1119         ip.addr = *addr;
1120
1121         data.dptr  = (uint8_t *)&ip;
1122         data.dsize = sizeof(ip);
1123
1124         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1125         if (ret != 0) {
1126                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1127                 talloc_free(tmp_ctx);
1128                 return ret;
1129         }
1130
1131         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1132         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1133                                         nodes, 0,
1134                                         LONGTIMELIMIT(),
1135                                         false, data,
1136                                         NULL, NULL,
1137                                         NULL);
1138         if (ret != 0) {
1139                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1145         if (ret != 0) {
1146                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1147                 talloc_free(tmp_ctx);
1148                 return -1;
1149         }
1150
1151         /* update the recovery daemon so it now knows to expect the new
1152            node assignment for this ip.
1153         */
1154         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1155         if (ret != 0) {
1156                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1157                 return -1;
1158         }
1159
1160         talloc_free(tmp_ctx);
1161         return 0;
1162 }
1163
1164 /*
1165   move/failover an ip address to a specific node
1166  */
1167 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1168 {
1169         uint32_t pnn;
1170         ctdb_sock_addr addr;
1171
1172         if (argc < 2) {
1173                 usage();
1174                 return -1;
1175         }
1176
1177         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1178                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1179                 return -1;
1180         }
1181
1182
1183         if (sscanf(argv[1], "%u", &pnn) != 1) {
1184                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1185                 return -1;
1186         }
1187
1188         if (move_ip(ctdb, &addr, pnn) != 0) {
1189                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1190                 return -1;
1191         }
1192
1193         return 0;
1194 }
1195
1196 void getips_store_callback(void *param, void *data)
1197 {
1198         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1199         struct ctdb_all_public_ips *ips = param;
1200         int i;
1201
1202         i = ips->num++;
1203         ips->ips[i].pnn  = node_ip->pnn;
1204         ips->ips[i].addr = node_ip->addr;
1205 }
1206
1207 void getips_count_callback(void *param, void *data)
1208 {
1209         uint32_t *count = param;
1210
1211         (*count)++;
1212 }
1213
1214 #define IP_KEYLEN       4
1215 static uint32_t *ip_key(ctdb_sock_addr *ip)
1216 {
1217         static uint32_t key[IP_KEYLEN];
1218
1219         bzero(key, sizeof(key));
1220
1221         switch (ip->sa.sa_family) {
1222         case AF_INET:
1223                 key[0]  = ip->ip.sin_addr.s_addr;
1224                 break;
1225         case AF_INET6:
1226                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1227                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1228                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1229                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1230                 break;
1231         default:
1232                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1233                 return key;
1234         }
1235
1236         return key;
1237 }
1238
1239 static void *add_ip_callback(void *parm, void *data)
1240 {
1241         return parm;
1242 }
1243
1244 static int
1245 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1246 {
1247         struct ctdb_all_public_ips *tmp_ips;
1248         struct ctdb_node_map *nodemap=NULL;
1249         trbt_tree_t *ip_tree;
1250         int i, j, len, ret;
1251         uint32_t count;
1252
1253         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1256                 return ret;
1257         }
1258
1259         ip_tree = trbt_create(tmp_ctx, 0);
1260
1261         for(i=0;i<nodemap->num;i++){
1262                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1263                         continue;
1264                 }
1265                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1266                         continue;
1267                 }
1268
1269                 /* read the public ip list from this node */
1270                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1273                         return -1;
1274                 }
1275         
1276                 for (j=0; j<tmp_ips->num;j++) {
1277                         struct ctdb_public_ip *node_ip;
1278
1279                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1280                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1281                         node_ip->addr = tmp_ips->ips[j].addr;
1282
1283                         trbt_insertarray32_callback(ip_tree,
1284                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1285                                 add_ip_callback,
1286                                 node_ip);
1287                 }
1288                 talloc_free(tmp_ips);
1289         }
1290
1291         /* traverse */
1292         count = 0;
1293         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1294
1295         len = offsetof(struct ctdb_all_public_ips, ips) + 
1296                 count*sizeof(struct ctdb_public_ip);
1297         tmp_ips = talloc_zero_size(tmp_ctx, len);
1298         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1299
1300         *ips = tmp_ips;
1301
1302         return 0;
1303 }
1304
1305
1306 /* 
1307  * scans all other nodes and returns a pnn for another node that can host this 
1308  * ip address or -1
1309  */
1310 static int
1311 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1312 {
1313         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1314         struct ctdb_all_public_ips *ips;
1315         struct ctdb_node_map *nodemap=NULL;
1316         int i, j, ret;
1317
1318         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1319         if (ret != 0) {
1320                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1321                 talloc_free(tmp_ctx);
1322                 return ret;
1323         }
1324
1325         for(i=0;i<nodemap->num;i++){
1326                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1327                         continue;
1328                 }
1329                 if (nodemap->nodes[i].pnn == options.pnn) {
1330                         continue;
1331                 }
1332
1333                 /* read the public ip list from this node */
1334                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1335                 if (ret != 0) {
1336                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1337                         return -1;
1338                 }
1339
1340                 for (j=0;j<ips->num;j++) {
1341                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1342                                 talloc_free(tmp_ctx);
1343                                 return nodemap->nodes[i].pnn;
1344                         }
1345                 }
1346                 talloc_free(ips);
1347         }
1348
1349         talloc_free(tmp_ctx);
1350         return -1;
1351 }
1352
1353 /*
1354   add a public ip address to a node
1355  */
1356 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1357 {
1358         int i, ret;
1359         int len;
1360         uint32_t pnn;
1361         unsigned mask;
1362         ctdb_sock_addr addr;
1363         struct ctdb_control_ip_iface *pub;
1364         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1365         struct ctdb_all_public_ips *ips;
1366
1367
1368         if (argc != 2) {
1369                 talloc_free(tmp_ctx);
1370                 usage();
1371         }
1372
1373         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1374                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1375                 talloc_free(tmp_ctx);
1376                 return -1;
1377         }
1378
1379         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1380         if (ret != 0) {
1381                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1382                 talloc_free(tmp_ctx);
1383                 return ret;
1384         }
1385
1386
1387         /* check if some other node is already serving this ip, if not,
1388          * we will claim it
1389          */
1390         for (i=0;i<ips->num;i++) {
1391                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1392                         break;
1393                 }
1394         }
1395
1396         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1397         pub = talloc_size(tmp_ctx, len); 
1398         CTDB_NO_MEMORY(ctdb, pub);
1399
1400         pub->addr  = addr;
1401         pub->mask  = mask;
1402         pub->len   = strlen(argv[1])+1;
1403         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1404
1405         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1406         if (ret != 0) {
1407                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1408                 talloc_free(tmp_ctx);
1409                 return ret;
1410         }
1411
1412         if (i == ips->num) {
1413                 /* no one has this ip so we claim it */
1414                 pnn  = options.pnn;
1415         } else {
1416                 pnn  = ips->ips[i].pnn;
1417         }
1418
1419         if (move_ip(ctdb, &addr, pnn) != 0) {
1420                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1421                 return -1;
1422         }
1423
1424         talloc_free(tmp_ctx);
1425         return 0;
1426 }
1427
1428 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1429
1430 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1431 {
1432         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1433         struct ctdb_node_map *nodemap=NULL;
1434         struct ctdb_all_public_ips *ips;
1435         int ret, i, j;
1436
1437         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1438         if (ret != 0) {
1439                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1440                 return ret;
1441         }
1442
1443         /* remove it from the nodes that are not hosting the ip currently */
1444         for(i=0;i<nodemap->num;i++){
1445                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1446                         continue;
1447                 }
1448                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1449                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1450                         continue;
1451                 }
1452
1453                 for (j=0;j<ips->num;j++) {
1454                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1455                                 break;
1456                         }
1457                 }
1458                 if (j==ips->num) {
1459                         continue;
1460                 }
1461
1462                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1463                         continue;
1464                 }
1465
1466                 options.pnn = nodemap->nodes[i].pnn;
1467                 control_delip(ctdb, argc, argv);
1468         }
1469
1470
1471         /* remove it from every node (also the one hosting it) */
1472         for(i=0;i<nodemap->num;i++){
1473                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1474                         continue;
1475                 }
1476                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1477                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1478                         continue;
1479                 }
1480
1481                 for (j=0;j<ips->num;j++) {
1482                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1483                                 break;
1484                         }
1485                 }
1486                 if (j==ips->num) {
1487                         continue;
1488                 }
1489
1490                 options.pnn = nodemap->nodes[i].pnn;
1491                 control_delip(ctdb, argc, argv);
1492         }
1493
1494         talloc_free(tmp_ctx);
1495         return 0;
1496 }
1497         
1498 /*
1499   delete a public ip address from a node
1500  */
1501 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1502 {
1503         int i, ret;
1504         ctdb_sock_addr addr;
1505         struct ctdb_control_ip_iface pub;
1506         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1507         struct ctdb_all_public_ips *ips;
1508
1509         if (argc != 1) {
1510                 talloc_free(tmp_ctx);
1511                 usage();
1512         }
1513
1514         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1515                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1516                 return -1;
1517         }
1518
1519         if (options.pnn == CTDB_BROADCAST_ALL) {
1520                 return control_delip_all(ctdb, argc, argv, &addr);
1521         }
1522
1523         pub.addr  = addr;
1524         pub.mask  = 0;
1525         pub.len   = 0;
1526
1527         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1528         if (ret != 0) {
1529                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1530                 talloc_free(tmp_ctx);
1531                 return ret;
1532         }
1533         
1534         for (i=0;i<ips->num;i++) {
1535                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1536                         break;
1537                 }
1538         }
1539
1540         if (i==ips->num) {
1541                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1542                         ctdb_addr_to_str(&addr)));
1543                 talloc_free(tmp_ctx);
1544                 return -1;
1545         }
1546
1547         if (ips->ips[i].pnn == options.pnn) {
1548                 ret = find_other_host_for_public_ip(ctdb, &addr);
1549                 if (ret != -1) {
1550                         if (move_ip(ctdb, &addr, ret) != 0) {
1551                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1552                                 return -1;
1553                         }
1554                 }
1555         }
1556
1557         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1558         if (ret != 0) {
1559                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1560                 talloc_free(tmp_ctx);
1561                 return ret;
1562         }
1563
1564         talloc_free(tmp_ctx);
1565         return 0;
1566 }
1567
1568 /*
1569   kill a tcp connection
1570  */
1571 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1572 {
1573         int ret;
1574         struct ctdb_control_killtcp killtcp;
1575
1576         if (argc < 2) {
1577                 usage();
1578         }
1579
1580         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1581                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1582                 return -1;
1583         }
1584
1585         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1586                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1587                 return -1;
1588         }
1589
1590         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1593                 return ret;
1594         }
1595
1596         return 0;
1597 }
1598
1599
1600 /*
1601   send a gratious arp
1602  */
1603 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1604 {
1605         int ret;
1606         ctdb_sock_addr addr;
1607
1608         if (argc < 2) {
1609                 usage();
1610         }
1611
1612         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1613                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1614                 return -1;
1615         }
1616
1617         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1618         if (ret != 0) {
1619                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1620                 return ret;
1621         }
1622
1623         return 0;
1624 }
1625
1626 /*
1627   register a server id
1628  */
1629 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1630 {
1631         int ret;
1632         struct ctdb_server_id server_id;
1633
1634         if (argc < 3) {
1635                 usage();
1636         }
1637
1638         server_id.pnn       = strtoul(argv[0], NULL, 0);
1639         server_id.type      = strtoul(argv[1], NULL, 0);
1640         server_id.server_id = strtoul(argv[2], NULL, 0);
1641
1642         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1643         if (ret != 0) {
1644                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1645                 return ret;
1646         }
1647         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1648         sleep(999);
1649         return -1;
1650 }
1651
1652 /*
1653   unregister a server id
1654  */
1655 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1656 {
1657         int ret;
1658         struct ctdb_server_id server_id;
1659
1660         if (argc < 3) {
1661                 usage();
1662         }
1663
1664         server_id.pnn       = strtoul(argv[0], NULL, 0);
1665         server_id.type      = strtoul(argv[1], NULL, 0);
1666         server_id.server_id = strtoul(argv[2], NULL, 0);
1667
1668         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1669         if (ret != 0) {
1670                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1671                 return ret;
1672         }
1673         return -1;
1674 }
1675
1676 /*
1677   check if a server id exists
1678  */
1679 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1680 {
1681         uint32_t status;
1682         int ret;
1683         struct ctdb_server_id server_id;
1684
1685         if (argc < 3) {
1686                 usage();
1687         }
1688
1689         server_id.pnn       = strtoul(argv[0], NULL, 0);
1690         server_id.type      = strtoul(argv[1], NULL, 0);
1691         server_id.server_id = strtoul(argv[2], NULL, 0);
1692
1693         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1694         if (ret != 0) {
1695                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1696                 return ret;
1697         }
1698
1699         if (status) {
1700                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1701         } else {
1702                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1703         }
1704         return 0;
1705 }
1706
1707 /*
1708   get a list of all server ids that are registered on a node
1709  */
1710 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1711 {
1712         int i, ret;
1713         struct ctdb_server_id_list *server_ids;
1714
1715         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1718                 return ret;
1719         }
1720
1721         for (i=0; i<server_ids->num; i++) {
1722                 printf("Server id %d:%d:%d\n", 
1723                         server_ids->server_ids[i].pnn, 
1724                         server_ids->server_ids[i].type, 
1725                         server_ids->server_ids[i].server_id); 
1726         }
1727
1728         return -1;
1729 }
1730
1731 /*
1732   send a tcp tickle ack
1733  */
1734 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1735 {
1736         int ret;
1737         ctdb_sock_addr  src, dst;
1738
1739         if (argc < 2) {
1740                 usage();
1741         }
1742
1743         if (!parse_ip_port(argv[0], &src)) {
1744                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1745                 return -1;
1746         }
1747
1748         if (!parse_ip_port(argv[1], &dst)) {
1749                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1750                 return -1;
1751         }
1752
1753         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1754         if (ret==0) {
1755                 return 0;
1756         }
1757         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1758
1759         return -1;
1760 }
1761
1762
1763 /*
1764   display public ip status
1765  */
1766 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1767 {
1768         int i, ret;
1769         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1770         struct ctdb_all_public_ips *ips;
1771
1772         if (options.pnn == CTDB_BROADCAST_ALL) {
1773                 /* read the list of public ips from all nodes */
1774                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1775         } else {
1776                 /* read the public ip list from this node */
1777                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1778         }
1779         if (ret != 0) {
1780                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1781                 talloc_free(tmp_ctx);
1782                 return ret;
1783         }
1784
1785         if (options.machinereadable){
1786                 printf(":Public IP:Node:");
1787                 if (options.verbose){
1788                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
1789                 }
1790                 printf("\n");
1791         } else {
1792                 if (options.pnn == CTDB_BROADCAST_ALL) {
1793                         printf("Public IPs on ALL nodes\n");
1794                 } else {
1795                         printf("Public IPs on node %u\n", options.pnn);
1796                 }
1797         }
1798
1799         for (i=1;i<=ips->num;i++) {
1800                 struct ctdb_control_public_ip_info *info = NULL;
1801                 int32_t pnn;
1802                 char *aciface = NULL;
1803                 char *avifaces = NULL;
1804                 char *cifaces = NULL;
1805
1806                 if (options.pnn == CTDB_BROADCAST_ALL) {
1807                         pnn = ips->ips[ips->num-i].pnn;
1808                 } else {
1809                         pnn = options.pnn;
1810                 }
1811
1812                 if (pnn != -1) {
1813                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1814                                                    &ips->ips[ips->num-i].addr, &info);
1815                 } else {
1816                         ret = -1;
1817                 }
1818
1819                 if (ret == 0) {
1820                         int j;
1821                         for (j=0; j < info->num; j++) {
1822                                 if (cifaces == NULL) {
1823                                         cifaces = talloc_strdup(info,
1824                                                                 info->ifaces[j].name);
1825                                 } else {
1826                                         cifaces = talloc_asprintf_append(cifaces,
1827                                                                          ",%s",
1828                                                                          info->ifaces[j].name);
1829                                 }
1830
1831                                 if (info->active_idx == j) {
1832                                         aciface = info->ifaces[j].name;
1833                                 }
1834
1835                                 if (info->ifaces[j].link_state == 0) {
1836                                         continue;
1837                                 }
1838
1839                                 if (avifaces == NULL) {
1840                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1841                                 } else {
1842                                         avifaces = talloc_asprintf_append(avifaces,
1843                                                                           ",%s",
1844                                                                           info->ifaces[j].name);
1845                                 }
1846                         }
1847                 }
1848
1849                 if (options.machinereadable){
1850                         printf(":%s:%d:",
1851                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1852                                 ips->ips[ips->num-i].pnn);
1853                         if (options.verbose){
1854                                 printf("%s:%s:%s:",
1855                                         aciface?aciface:"",
1856                                         avifaces?avifaces:"",
1857                                         cifaces?cifaces:"");
1858                         }
1859                         printf("\n");
1860                 } else {
1861                         if (options.verbose) {
1862                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1863                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1864                                         ips->ips[ips->num-i].pnn,
1865                                         aciface?aciface:"",
1866                                         avifaces?avifaces:"",
1867                                         cifaces?cifaces:"");
1868                         } else {
1869                                 printf("%s %d\n",
1870                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1871                                         ips->ips[ips->num-i].pnn);
1872                         }
1873                 }
1874                 talloc_free(info);
1875         }
1876
1877         talloc_free(tmp_ctx);
1878         return 0;
1879 }
1880
1881 /*
1882   public ip info
1883  */
1884 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1885 {
1886         int i, ret;
1887         ctdb_sock_addr addr;
1888         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1889         struct ctdb_control_public_ip_info *info;
1890
1891         if (argc != 1) {
1892                 talloc_free(tmp_ctx);
1893                 usage();
1894         }
1895
1896         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1897                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1898                 return -1;
1899         }
1900
1901         /* read the public ip info from this node */
1902         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1903                                            tmp_ctx, &addr, &info);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1906                                   argv[0], options.pnn));
1907                 talloc_free(tmp_ctx);
1908                 return ret;
1909         }
1910
1911         printf("Public IP[%s] info on node %u\n",
1912                ctdb_addr_to_str(&info->ip.addr),
1913                options.pnn);
1914
1915         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1916                ctdb_addr_to_str(&info->ip.addr),
1917                info->ip.pnn, info->num);
1918
1919         for (i=0; i<info->num; i++) {
1920                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1921
1922                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1923                        i+1, info->ifaces[i].name,
1924                        info->ifaces[i].link_state?"up":"down",
1925                        (unsigned int)info->ifaces[i].references,
1926                        (i==info->active_idx)?" (active)":"");
1927         }
1928
1929         talloc_free(tmp_ctx);
1930         return 0;
1931 }
1932
1933 /*
1934   display interfaces status
1935  */
1936 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1937 {
1938         int i, ret;
1939         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1940         struct ctdb_control_get_ifaces *ifaces;
1941
1942         /* read the public ip list from this node */
1943         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1944                                    tmp_ctx, &ifaces);
1945         if (ret != 0) {
1946                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1947                                   options.pnn));
1948                 talloc_free(tmp_ctx);
1949                 return ret;
1950         }
1951
1952         if (options.machinereadable){
1953                 printf(":Name:LinkStatus:References:\n");
1954         } else {
1955                 printf("Interfaces on node %u\n", options.pnn);
1956         }
1957
1958         for (i=0; i<ifaces->num; i++) {
1959                 if (options.machinereadable){
1960                         printf(":%s:%s:%u\n",
1961                                ifaces->ifaces[i].name,
1962                                ifaces->ifaces[i].link_state?"1":"0",
1963                                (unsigned int)ifaces->ifaces[i].references);
1964                 } else {
1965                         printf("name:%s link:%s references:%u\n",
1966                                ifaces->ifaces[i].name,
1967                                ifaces->ifaces[i].link_state?"up":"down",
1968                                (unsigned int)ifaces->ifaces[i].references);
1969                 }
1970         }
1971
1972         talloc_free(tmp_ctx);
1973         return 0;
1974 }
1975
1976
1977 /*
1978   set link status of an interface
1979  */
1980 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1981 {
1982         int ret;
1983         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1984         struct ctdb_control_iface_info info;
1985
1986         ZERO_STRUCT(info);
1987
1988         if (argc != 2) {
1989                 usage();
1990         }
1991
1992         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1993                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1994                                   argv[0]));
1995                 talloc_free(tmp_ctx);
1996                 return -1;
1997         }
1998         strcpy(info.name, argv[0]);
1999
2000         if (strcmp(argv[1], "up") == 0) {
2001                 info.link_state = 1;
2002         } else if (strcmp(argv[1], "down") == 0) {
2003                 info.link_state = 0;
2004         } else {
2005                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2006                                   argv[1]));
2007                 talloc_free(tmp_ctx);
2008                 return -1;
2009         }
2010
2011         /* read the public ip list from this node */
2012         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2013                                    tmp_ctx, &info);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2016                                   argv[0], options.pnn));
2017                 talloc_free(tmp_ctx);
2018                 return ret;
2019         }
2020
2021         talloc_free(tmp_ctx);
2022         return 0;
2023 }
2024
2025 /*
2026   display pid of a ctdb daemon
2027  */
2028 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2029 {
2030         uint32_t pid;
2031         int ret;
2032
2033         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2034         if (ret != 0) {
2035                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2036                 return ret;
2037         }
2038         printf("Pid:%d\n", pid);
2039
2040         return 0;
2041 }
2042
2043 static uint32_t ipreallocate_finished;
2044
2045 /*
2046   handler for receiving the response to ipreallocate
2047 */
2048 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2049                              TDB_DATA data, void *private_data)
2050 {
2051         ipreallocate_finished = 1;
2052 }
2053
2054 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2055 {
2056         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2057
2058         event_add_timed(ctdb->ev, ctdb, 
2059                                 timeval_current_ofs(1, 0),
2060                                 ctdb_every_second, ctdb);
2061 }
2062
2063 /*
2064   ask the recovery daemon on the recovery master to perform a ip reallocation
2065  */
2066 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2067 {
2068         int i, ret;
2069         TDB_DATA data;
2070         struct takeover_run_reply rd;
2071         uint32_t recmaster;
2072         struct ctdb_node_map *nodemap=NULL;
2073         int retries=0;
2074         struct timeval tv = timeval_current();
2075
2076         /* we need some events to trigger so we can timeout and restart
2077            the loop
2078         */
2079         event_add_timed(ctdb->ev, ctdb, 
2080                                 timeval_current_ofs(1, 0),
2081                                 ctdb_every_second, ctdb);
2082
2083         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2084         if (rd.pnn == -1) {
2085                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
2086                 return -1;
2087         }
2088         rd.srvid = getpid();
2089
2090         /* register a message port for receiveing the reply so that we
2091            can receive the reply
2092         */
2093         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2094
2095         data.dptr = (uint8_t *)&rd;
2096         data.dsize = sizeof(rd);
2097
2098 again:
2099         /* check that there are valid nodes available */
2100         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2101                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2102                 return -1;
2103         }
2104         for (i=0; i<nodemap->num;i++) {
2105                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2106                         break;
2107                 }
2108         }
2109         if (i==nodemap->num) {
2110                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2111                 return 0;
2112         }
2113
2114
2115         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2116         if (ret != 0) {
2117                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2118                 return ret;
2119         }
2120
2121         /* verify the node exists */
2122         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2123                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2124                 return -1;
2125         }
2126
2127
2128         /* check tha there are nodes available that can act as a recmaster */
2129         for (i=0; i<nodemap->num; i++) {
2130                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2131                         continue;
2132                 }
2133                 break;
2134         }
2135         if (i == nodemap->num) {
2136                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2137                 return 0;
2138         }
2139
2140         /* verify the recovery master is not STOPPED, nor BANNED */
2141         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2142                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2143                 retries++;
2144                 sleep(1);
2145                 goto again;
2146         } 
2147
2148         
2149         /* verify the recovery master is not STOPPED, nor BANNED */
2150         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2151                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2152                 retries++;
2153                 sleep(1);
2154                 goto again;
2155         } 
2156
2157         ipreallocate_finished = 0;
2158         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2161                 return -1;
2162         }
2163
2164         tv = timeval_current();
2165         /* this loop will terminate when we have received the reply */
2166         while (timeval_elapsed(&tv) < 3.0) {
2167                 event_loop_once(ctdb->ev);
2168         }
2169         if (ipreallocate_finished == 1) {
2170                 return 0;
2171         }
2172
2173         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2174         retries++;
2175         sleep(1);
2176         goto again;
2177
2178         return 0;
2179 }
2180
2181
2182 /*
2183   disable a remote node
2184  */
2185 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2186 {
2187         int ret;
2188         struct ctdb_node_map *nodemap=NULL;
2189
2190         /* check if the node is already disabled */
2191         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2192                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2193                 exit(10);
2194         }
2195         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2196                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2197                 return 0;
2198         }
2199
2200         do {
2201                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2202                 if (ret != 0) {
2203                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2204                         return ret;
2205                 }
2206
2207                 sleep(1);
2208
2209                 /* read the nodemap and verify the change took effect */
2210                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2211                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2212                         exit(10);
2213                 }
2214
2215         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2216         ret = control_ipreallocate(ctdb, argc, argv);
2217         if (ret != 0) {
2218                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2219                 return ret;
2220         }
2221
2222         return 0;
2223 }
2224
2225 /*
2226   enable a disabled remote node
2227  */
2228 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2229 {
2230         int ret;
2231
2232         struct ctdb_node_map *nodemap=NULL;
2233
2234
2235         /* check if the node is already enabled */
2236         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2237                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2238                 exit(10);
2239         }
2240         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2241                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2242                 return 0;
2243         }
2244
2245         do {
2246                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2247                 if (ret != 0) {
2248                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2249                         return ret;
2250                 }
2251
2252                 sleep(1);
2253
2254                 /* read the nodemap and verify the change took effect */
2255                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2256                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2257                         exit(10);
2258                 }
2259
2260         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2261
2262         ret = control_ipreallocate(ctdb, argc, argv);
2263         if (ret != 0) {
2264                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2265                 return ret;
2266         }
2267
2268         return 0;
2269 }
2270
2271 /*
2272   stop a remote node
2273  */
2274 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2275 {
2276         int ret;
2277         struct ctdb_node_map *nodemap=NULL;
2278
2279         do {
2280                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2281                 if (ret != 0) {
2282                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2283                 }
2284         
2285                 sleep(1);
2286
2287                 /* read the nodemap and verify the change took effect */
2288                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2289                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2290                         exit(10);
2291                 }
2292
2293         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2294         ret = control_ipreallocate(ctdb, argc, argv);
2295         if (ret != 0) {
2296                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2297                 return ret;
2298         }
2299
2300         return 0;
2301 }
2302
2303 /*
2304   restart a stopped remote node
2305  */
2306 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2307 {
2308         int ret;
2309
2310         struct ctdb_node_map *nodemap=NULL;
2311
2312         do {
2313                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2314                 if (ret != 0) {
2315                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2316                         return ret;
2317                 }
2318         
2319                 sleep(1);
2320
2321                 /* read the nodemap and verify the change took effect */
2322                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2323                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2324                         exit(10);
2325                 }
2326
2327         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2328         ret = control_ipreallocate(ctdb, argc, argv);
2329         if (ret != 0) {
2330                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2331                 return ret;
2332         }
2333
2334         return 0;
2335 }
2336
2337 static uint32_t get_generation(struct ctdb_context *ctdb)
2338 {
2339         struct ctdb_vnn_map *vnnmap=NULL;
2340         int ret;
2341
2342         /* wait until the recmaster is not in recovery mode */
2343         while (1) {
2344                 uint32_t recmode, recmaster;
2345                 
2346                 if (vnnmap != NULL) {
2347                         talloc_free(vnnmap);
2348                         vnnmap = NULL;
2349                 }
2350
2351                 /* get the recmaster */
2352                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2353                 if (ret != 0) {
2354                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2355                         exit(10);
2356                 }
2357
2358                 /* get recovery mode */
2359                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2360                 if (ret != 0) {
2361                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2362                         exit(10);
2363                 }
2364
2365                 /* get the current generation number */
2366                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2367                 if (ret != 0) {
2368                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2369                         exit(10);
2370                 }
2371
2372                 if ((recmode == CTDB_RECOVERY_NORMAL)
2373                 &&  (vnnmap->generation != 1)){
2374                         return vnnmap->generation;
2375                 }
2376                 sleep(1);
2377         }
2378 }
2379
2380 /*
2381   ban a node from the cluster
2382  */
2383 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2384 {
2385         int ret;
2386         struct ctdb_node_map *nodemap=NULL;
2387         struct ctdb_ban_time bantime;
2388
2389         if (argc < 1) {
2390                 usage();
2391         }
2392         
2393         /* verify the node exists */
2394         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2395         if (ret != 0) {
2396                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2397                 return ret;
2398         }
2399
2400         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2401                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2402                 return -1;
2403         }
2404
2405         bantime.pnn  = options.pnn;
2406         bantime.time = strtoul(argv[0], NULL, 0);
2407
2408         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2409         if (ret != 0) {
2410                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2411                 return -1;
2412         }       
2413
2414         ret = control_ipreallocate(ctdb, argc, argv);
2415         if (ret != 0) {
2416                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2417                 return ret;
2418         }
2419
2420         return 0;
2421 }
2422
2423
2424 /*
2425   unban a node from the cluster
2426  */
2427 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2428 {
2429         int ret;
2430         struct ctdb_node_map *nodemap=NULL;
2431         struct ctdb_ban_time bantime;
2432
2433         /* verify the node exists */
2434         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2435         if (ret != 0) {
2436                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2437                 return ret;
2438         }
2439
2440         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2441                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2442                 return -1;
2443         }
2444
2445         bantime.pnn  = options.pnn;
2446         bantime.time = 0;
2447
2448         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2449         if (ret != 0) {
2450                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2451                 return -1;
2452         }       
2453
2454         ret = control_ipreallocate(ctdb, argc, argv);
2455         if (ret != 0) {
2456                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2457                 return ret;
2458         }
2459
2460         return 0;
2461 }
2462
2463
2464 /*
2465   show ban information for a node
2466  */
2467 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2468 {
2469         int ret;
2470         struct ctdb_node_map *nodemap=NULL;
2471         struct ctdb_ban_time *bantime;
2472
2473         /* verify the node exists */
2474         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2475         if (ret != 0) {
2476                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2477                 return ret;
2478         }
2479
2480         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2481         if (ret != 0) {
2482                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2483                 return -1;
2484         }       
2485
2486         if (bantime->time == 0) {
2487                 printf("Node %u is not banned\n", bantime->pnn);
2488         } else {
2489                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2490         }
2491
2492         return 0;
2493 }
2494
2495 /*
2496   shutdown a daemon
2497  */
2498 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2499 {
2500         int ret;
2501
2502         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2503         if (ret != 0) {
2504                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2505                 return ret;
2506         }
2507
2508         return 0;
2509 }
2510
2511 /*
2512   trigger a recovery
2513  */
2514 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2515 {
2516         int ret;
2517         uint32_t generation, next_generation;
2518
2519         /* record the current generation number */
2520         generation = get_generation(ctdb);
2521
2522         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2523         if (ret != 0) {
2524                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2525                 return ret;
2526         }
2527
2528         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2529         if (ret != 0) {
2530                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2531                 return ret;
2532         }
2533
2534         /* wait until we are in a new generation */
2535         while (1) {
2536                 next_generation = get_generation(ctdb);
2537                 if (next_generation != generation) {
2538                         return 0;
2539                 }
2540                 sleep(1);
2541         }
2542
2543         return 0;
2544 }
2545
2546
2547 /*
2548   display monitoring mode of a remote node
2549  */
2550 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2551 {
2552         uint32_t monmode;
2553         int ret;
2554
2555         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2556         if (ret != 0) {
2557                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2558                 return ret;
2559         }
2560         if (!options.machinereadable){
2561                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2562         } else {
2563                 printf(":mode:\n");
2564                 printf(":%d:\n",monmode);
2565         }
2566         return 0;
2567 }
2568
2569
2570 /*
2571   display capabilities of a remote node
2572  */
2573 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2574 {
2575         uint32_t capabilities;
2576         int ret;
2577
2578         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2579         if (ret != 0) {
2580                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2581                 return ret;
2582         }
2583         
2584         if (!options.machinereadable){
2585                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2586                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2587                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2588                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2589         } else {
2590                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2591                 printf(":%d:%d:%d:%d:\n",
2592                         !!(capabilities&CTDB_CAP_RECMASTER),
2593                         !!(capabilities&CTDB_CAP_LMASTER),
2594                         !!(capabilities&CTDB_CAP_LVS),
2595                         !!(capabilities&CTDB_CAP_NATGW));
2596         }
2597         return 0;
2598 }
2599
2600 /*
2601   display lvs configuration
2602  */
2603 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2604 {
2605         uint32_t *capabilities;
2606         struct ctdb_node_map *nodemap=NULL;
2607         int i, ret;
2608         int healthy_count = 0;
2609
2610         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2611         if (ret != 0) {
2612                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2613                 return ret;
2614         }
2615
2616         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2617         CTDB_NO_MEMORY(ctdb, capabilities);
2618         
2619         /* collect capabilities for all connected nodes */
2620         for (i=0; i<nodemap->num; i++) {
2621                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2622                         continue;
2623                 }
2624                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2625                         continue;
2626                 }
2627         
2628                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2629                 if (ret != 0) {
2630                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2631                         return ret;
2632                 }
2633
2634                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2635                         continue;
2636                 }
2637
2638                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2639                         healthy_count++;
2640                 }
2641         }
2642
2643         /* Print all LVS nodes */
2644         for (i=0; i<nodemap->num; i++) {
2645                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2646                         continue;
2647                 }
2648                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2649                         continue;
2650                 }
2651                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2652                         continue;
2653                 }
2654
2655                 if (healthy_count != 0) {
2656                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2657                                 continue;
2658                         }
2659                 }
2660
2661                 printf("%d:%s\n", i, 
2662                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2663         }
2664
2665         return 0;
2666 }
2667
2668 /*
2669   display who is the lvs master
2670  */
2671 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2672 {
2673         uint32_t *capabilities;
2674         struct ctdb_node_map *nodemap=NULL;
2675         int i, ret;
2676         int healthy_count = 0;
2677
2678         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2679         if (ret != 0) {
2680                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2681                 return ret;
2682         }
2683
2684         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2685         CTDB_NO_MEMORY(ctdb, capabilities);
2686         
2687         /* collect capabilities for all connected nodes */
2688         for (i=0; i<nodemap->num; i++) {
2689                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2690                         continue;
2691                 }
2692                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2693                         continue;
2694                 }
2695         
2696                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2697                 if (ret != 0) {
2698                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2699                         return ret;
2700                 }
2701
2702                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2703                         continue;
2704                 }
2705
2706                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2707                         healthy_count++;
2708                 }
2709         }
2710
2711         /* find and show the lvsmaster */
2712         for (i=0; i<nodemap->num; i++) {
2713                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2714                         continue;
2715                 }
2716                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2717                         continue;
2718                 }
2719                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2720                         continue;
2721                 }
2722
2723                 if (healthy_count != 0) {
2724                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2725                                 continue;
2726                         }
2727                 }
2728
2729                 if (options.machinereadable){
2730                         printf("%d\n", i);
2731                 } else {
2732                         printf("Node %d is LVS master\n", i);
2733                 }
2734                 return 0;
2735         }
2736
2737         printf("There is no LVS master\n");
2738         return -1;
2739 }
2740
2741 /*
2742   disable monitoring on a  node
2743  */
2744 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2745 {
2746         
2747         int ret;
2748
2749         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2750         if (ret != 0) {
2751                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2752                 return ret;
2753         }
2754         printf("Monitoring mode:%s\n","DISABLED");
2755
2756         return 0;
2757 }
2758
2759 /*
2760   enable monitoring on a  node
2761  */
2762 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2763 {
2764         
2765         int ret;
2766
2767         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2768         if (ret != 0) {
2769                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2770                 return ret;
2771         }
2772         printf("Monitoring mode:%s\n","ACTIVE");
2773
2774         return 0;
2775 }
2776
2777 /*
2778   display remote list of keys/data for a db
2779  */
2780 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2781 {
2782         const char *db_name;
2783         struct ctdb_db_context *ctdb_db;
2784         int ret;
2785
2786         if (argc < 1) {
2787                 usage();
2788         }
2789
2790         db_name = argv[0];
2791
2792
2793         if (db_exists(ctdb, db_name)) {
2794                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2795                 return -1;
2796         }
2797
2798         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2799
2800         if (ctdb_db == NULL) {
2801                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2802                 return -1;
2803         }
2804
2805         /* traverse and dump the cluster tdb */
2806         ret = ctdb_dump_db(ctdb_db, stdout);
2807         if (ret == -1) {
2808                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2809                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2810                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2811                                   db_name));
2812                 return -1;
2813         }
2814         talloc_free(ctdb_db);
2815
2816         printf("Dumped %d records\n", ret);
2817         return 0;
2818 }
2819
2820
2821 /*
2822   fetch a record from a persistent database
2823  */
2824 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
2825 {
2826         const char *db_name;
2827         struct ctdb_db_context *ctdb_db;
2828         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2829         struct ctdb_transaction_handle *h;
2830         TDB_DATA key, data;
2831         int ret;
2832
2833         if (argc < 2) {
2834                 talloc_free(tmp_ctx);
2835                 usage();
2836         }
2837
2838         db_name = argv[0];
2839
2840
2841         if (db_exists(ctdb, db_name)) {
2842                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2843                 talloc_free(tmp_ctx);
2844                 return -1;
2845         }
2846
2847         ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
2848
2849         if (ctdb_db == NULL) {
2850                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2851                 talloc_free(tmp_ctx);
2852                 return -1;
2853         }
2854
2855         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
2856         if (h == NULL) {
2857                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
2858                 talloc_free(tmp_ctx);
2859                 return -1;
2860         }
2861
2862         key.dptr  = discard_const(argv[1]);
2863         key.dsize = strlen(argv[1]);
2864         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
2865         if (ret != 0) {
2866                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
2867                 talloc_free(tmp_ctx);
2868                 return -1;
2869         }
2870
2871         if (data.dsize == 0 || data.dptr == NULL) {
2872                 DEBUG(DEBUG_ERR,("Record is empty\n"));
2873                 talloc_free(tmp_ctx);
2874                 return -1;
2875         }
2876
2877         fwrite(data.dptr, data.dsize, 1, stdout);
2878
2879         /* abort the transaction */
2880         talloc_free(h);
2881
2882
2883         talloc_free(tmp_ctx);
2884         return 0;
2885 }
2886
2887 /*
2888   write a record to a persistent database
2889  */
2890 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
2891 {
2892         const char *db_name;
2893         struct ctdb_db_context *ctdb_db;
2894         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2895         struct ctdb_transaction_handle *h;
2896         struct stat st;
2897         TDB_DATA key, data;
2898         int fd, ret;
2899
2900         if (argc < 3) {
2901                 talloc_free(tmp_ctx);
2902                 usage();
2903         }
2904
2905         fd = open(argv[2], O_RDONLY);
2906         if (fd == -1) {
2907                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
2908                 talloc_free(tmp_ctx);
2909                 return -1;
2910         }
2911         
2912         ret = fstat(fd, &st);
2913         if (ret == -1) {
2914                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
2915                 close(fd);
2916                 talloc_free(tmp_ctx);
2917                 return -1;
2918         }
2919
2920         if (!S_ISREG(st.st_mode)) {
2921                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
2922                 close(fd);
2923                 talloc_free(tmp_ctx);
2924                 return -1;
2925         }
2926
2927         data.dsize = st.st_size;
2928         if (data.dsize == 0) {
2929                 data.dptr  = NULL;
2930         } else {
2931                 data.dptr = talloc_size(tmp_ctx, data.dsize);
2932                 if (data.dptr == NULL) {
2933                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
2934                         close(fd);
2935                         talloc_free(tmp_ctx);
2936                         return -1;
2937                 }
2938                 ret = read(fd, data.dptr, data.dsize);
2939                 if (ret != data.dsize) {
2940                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
2941                         close(fd);
2942                         talloc_free(tmp_ctx);
2943                         return -1;
2944                 }
2945         }
2946         close(fd);
2947
2948
2949         db_name = argv[0];
2950
2951         ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
2952
2953         if (ctdb_db == NULL) {
2954                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2955                 talloc_free(tmp_ctx);
2956                 return -1;
2957         }
2958
2959         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
2960         if (h == NULL) {
2961                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
2962                 talloc_free(tmp_ctx);
2963                 return -1;
2964         }
2965
2966         key.dptr  = discard_const(argv[1]);
2967         key.dsize = strlen(argv[1]);
2968         ret = ctdb_transaction_store(h, key, data);
2969         if (ret != 0) {
2970                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
2971                 talloc_free(tmp_ctx);
2972                 return -1;
2973         }
2974
2975         ret = ctdb_transaction_commit(h);
2976         if (ret != 0) {
2977                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
2978                 talloc_free(tmp_ctx);
2979                 return -1;
2980         }
2981
2982
2983         talloc_free(tmp_ctx);
2984         return 0;
2985 }
2986
2987 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2988                              TDB_DATA data, void *private_data)
2989 {
2990         DEBUG(DEBUG_ERR,("Log data received\n"));
2991         if (data.dsize > 0) {
2992                 printf("%s", data.dptr);
2993         }
2994
2995         exit(0);
2996 }
2997
2998 /*
2999   display a list of log messages from the in memory ringbuffer
3000  */
3001 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3002 {
3003         int ret;
3004         int32_t res;
3005         struct ctdb_get_log_addr log_addr;
3006         TDB_DATA data;
3007         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3008         char *errmsg;
3009         struct timeval tv;
3010
3011         if (argc != 1) {
3012                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3013                 talloc_free(tmp_ctx);
3014                 return -1;
3015         }
3016
3017         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3018         log_addr.srvid = getpid();
3019         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3020                 log_addr.level = get_debug_by_desc(argv[0]);
3021         } else {
3022                 log_addr.level = strtol(argv[0], NULL, 0);
3023         }
3024
3025
3026         data.dptr = (unsigned char *)&log_addr;
3027         data.dsize = sizeof(log_addr);
3028
3029         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3030
3031         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3032         sleep(1);
3033
3034         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3035
3036         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
3037                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3038         if (ret != 0 || res != 0) {
3039                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
3040                 talloc_free(tmp_ctx);
3041                 return -1;
3042         }
3043
3044
3045         tv = timeval_current();
3046         /* this loop will terminate when we have received the reply */
3047         while (timeval_elapsed(&tv) < 3.0) {    
3048                 event_loop_once(ctdb->ev);
3049         }
3050
3051         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
3052
3053         talloc_free(tmp_ctx);
3054         return 0;
3055 }
3056
3057 /*
3058   clear the in memory log area
3059  */
3060 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
3061 {
3062         int ret;
3063         int32_t res;
3064         char *errmsg;
3065         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3066
3067         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
3068                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
3069         if (ret != 0 || res != 0) {
3070                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
3071                 talloc_free(tmp_ctx);
3072                 return -1;
3073         }
3074
3075         talloc_free(tmp_ctx);
3076         return 0;
3077 }
3078
3079
3080
3081 /*
3082   display a list of the databases on a remote ctdb
3083  */
3084 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
3085 {
3086         int i, ret;
3087         struct ctdb_dbid_map *dbmap=NULL;
3088
3089         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3090         if (ret != 0) {
3091                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3092                 return ret;
3093         }
3094
3095         if(options.machinereadable){
3096                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
3097                 for(i=0;i<dbmap->num;i++){
3098                         const char *path;
3099                         const char *name;
3100                         const char *health;
3101                         bool persistent;
3102
3103                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
3104                                             dbmap->dbs[i].dbid, ctdb, &path);
3105                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3106                                             dbmap->dbs[i].dbid, ctdb, &name);
3107                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3108                                               dbmap->dbs[i].dbid, ctdb, &health);
3109                         persistent = dbmap->dbs[i].persistent;
3110                         printf(":0x%08X:%s:%s:%d:%d:\n",
3111                                dbmap->dbs[i].dbid, name, path,
3112                                !!(persistent), !!(health));
3113                 }
3114                 return 0;
3115         }
3116
3117         printf("Number of databases:%d\n", dbmap->num);
3118         for(i=0;i<dbmap->num;i++){
3119                 const char *path;
3120                 const char *name;
3121                 const char *health;
3122                 bool persistent;
3123
3124                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3125                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3126                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3127                 persistent = dbmap->dbs[i].persistent;
3128                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
3129                        dbmap->dbs[i].dbid, name, path,
3130                        persistent?" PERSISTENT":"",
3131                        health?" UNHEALTHY":"");
3132         }
3133
3134         return 0;
3135 }
3136
3137 /*
3138   display the status of a database on a remote ctdb
3139  */
3140 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
3141 {
3142         int i, ret;
3143         struct ctdb_dbid_map *dbmap=NULL;
3144         const char *db_name;
3145
3146         if (argc < 1) {
3147                 usage();
3148         }
3149
3150         db_name = argv[0];
3151
3152         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3153         if (ret != 0) {
3154                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3155                 return ret;
3156         }
3157
3158         for(i=0;i<dbmap->num;i++){
3159                 const char *path;
3160                 const char *name;
3161                 const char *health;
3162                 bool persistent;
3163
3164                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3165                 if (strcmp(name, db_name) != 0) {
3166                         continue;
3167                 }
3168
3169                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3170                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3171                 persistent = dbmap->dbs[i].persistent;
3172                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
3173                        dbmap->dbs[i].dbid, name, path,
3174                        persistent?"yes":"no",
3175                        health?health:"OK");
3176                 return 0;
3177         }
3178
3179         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
3180         return 0;
3181 }
3182
3183 /*
3184   check if the local node is recmaster or not
3185   it will return 1 if this node is the recmaster and 0 if it is not
3186   or if the local ctdb daemon could not be contacted
3187  */
3188 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3189 {
3190         uint32_t mypnn, recmaster;
3191         int ret;
3192
3193         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
3194         if (mypnn == -1) {
3195                 printf("Failed to get pnn of node\n");
3196                 return 1;
3197         }
3198
3199         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
3200         if (ret != 0) {
3201                 printf("Failed to get the recmaster\n");
3202                 return 1;
3203         }
3204
3205         if (recmaster != mypnn) {
3206                 printf("this node is not the recmaster\n");
3207                 return 1;
3208         }
3209
3210         printf("this node is the recmaster\n");
3211         return 0;
3212 }
3213
3214 /*
3215   ping a node
3216  */
3217 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
3218 {
3219         int ret;
3220         struct timeval tv = timeval_current();
3221         ret = ctdb_ctrl_ping(ctdb, options.pnn);
3222         if (ret == -1) {
3223                 printf("Unable to get ping response from node %u\n", options.pnn);
3224                 return -1;
3225         } else {
3226                 printf("response from %u time=%.6f sec  (%d clients)\n", 
3227                        options.pnn, timeval_elapsed(&tv), ret);
3228         }
3229         return 0;
3230 }
3231
3232
3233 /*
3234   get a tunable
3235  */
3236 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3237 {
3238         const char *name;
3239         uint32_t value;
3240         int ret;
3241
3242         if (argc < 1) {
3243                 usage();
3244         }
3245
3246         name = argv[0];
3247         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
3248         if (ret == -1) {
3249                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3250                 return -1;
3251         }
3252
3253         printf("%-19s = %u\n", name, value);
3254         return 0;
3255 }
3256
3257 /*
3258   set a tunable
3259  */
3260 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3261 {
3262         const char *name;
3263         uint32_t value;
3264         int ret;
3265
3266         if (argc < 2) {
3267                 usage();
3268         }
3269
3270         name = argv[0];
3271         value = strtoul(argv[1], NULL, 0);
3272
3273         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3274         if (ret == -1) {
3275                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3276                 return -1;
3277         }
3278         return 0;
3279 }
3280
3281 /*
3282   list all tunables
3283  */
3284 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3285 {
3286         uint32_t count;
3287         const char **list;
3288         int ret, i;
3289
3290         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3291         if (ret == -1) {
3292                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3293                 return -1;
3294         }
3295
3296         for (i=0;i<count;i++) {
3297                 control_getvar(ctdb, 1, &list[i]);
3298         }
3299
3300         talloc_free(list);
3301         
3302         return 0;
3303 }
3304
3305 /*
3306   display debug level on a node
3307  */
3308 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3309 {
3310         int ret;
3311         int32_t level;
3312
3313         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3314         if (ret != 0) {
3315                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3316                 return ret;
3317         } else {
3318                 if (options.machinereadable){
3319                         printf(":Name:Level:\n");
3320                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3321                 } else {
3322                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3323                 }
3324         }
3325         return 0;
3326 }
3327
3328 /*
3329   display reclock file of a node
3330  */
3331 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3332 {
3333         int ret;
3334         const char *reclock;
3335
3336         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3337         if (ret != 0) {
3338                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3339                 return ret;
3340         } else {
3341                 if (options.machinereadable){
3342                         if (reclock != NULL) {
3343                                 printf("%s", reclock);
3344                         }
3345                 } else {
3346                         if (reclock == NULL) {
3347                                 printf("No reclock file used.\n");
3348                         } else {
3349                                 printf("Reclock file:%s\n", reclock);
3350                         }
3351                 }
3352         }
3353         return 0;
3354 }
3355
3356 /*
3357   set the reclock file of a node
3358  */
3359 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3360 {
3361         int ret;
3362         const char *reclock;
3363
3364         if (argc == 0) {
3365                 reclock = NULL;
3366         } else if (argc == 1) {
3367                 reclock = argv[0];
3368         } else {
3369                 usage();
3370         }
3371
3372         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3373         if (ret != 0) {
3374                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3375                 return ret;
3376         }
3377         return 0;
3378 }
3379
3380 /*
3381   set the natgw state on/off
3382  */
3383 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3384 {
3385         int ret;
3386         uint32_t natgwstate;
3387
3388         if (argc == 0) {
3389                 usage();
3390         }
3391
3392         if (!strcmp(argv[0], "on")) {
3393                 natgwstate = 1;
3394         } else if (!strcmp(argv[0], "off")) {
3395                 natgwstate = 0;
3396         } else {
3397                 usage();
3398         }
3399
3400         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3401         if (ret != 0) {
3402                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3403                 return ret;
3404         }
3405
3406         return 0;
3407 }
3408
3409 /*
3410   set the lmaster role on/off
3411  */
3412 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3413 {
3414         int ret;
3415         uint32_t lmasterrole;
3416
3417         if (argc == 0) {
3418                 usage();
3419         }
3420
3421         if (!strcmp(argv[0], "on")) {
3422                 lmasterrole = 1;
3423         } else if (!strcmp(argv[0], "off")) {
3424                 lmasterrole = 0;
3425         } else {
3426                 usage();
3427         }
3428
3429         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3430         if (ret != 0) {
3431                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3432                 return ret;
3433         }
3434
3435         return 0;
3436 }
3437
3438 /*
3439   set the recmaster role on/off
3440  */
3441 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3442 {
3443         int ret;
3444         uint32_t recmasterrole;
3445
3446         if (argc == 0) {
3447                 usage();
3448         }
3449
3450         if (!strcmp(argv[0], "on")) {
3451                 recmasterrole = 1;
3452         } else if (!strcmp(argv[0], "off")) {
3453                 recmasterrole = 0;
3454         } else {
3455                 usage();
3456         }
3457
3458         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3459         if (ret != 0) {
3460                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3461                 return ret;
3462         }
3463
3464         return 0;
3465 }
3466
3467 /*
3468   set debug level on a node or all nodes
3469  */
3470 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3471 {
3472         int i, ret;
3473         int32_t level;
3474
3475         if (argc == 0) {
3476                 printf("You must specify the debug level. Valid levels are:\n");
3477                 for (i=0; debug_levels[i].description != NULL; i++) {
3478                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3479                 }
3480
3481                 return 0;
3482         }
3483
3484         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3485                 level = get_debug_by_desc(argv[0]);
3486         } else {
3487                 level = strtol(argv[0], NULL, 0);
3488         }
3489
3490         for (i=0; debug_levels[i].description != NULL; i++) {
3491                 if (level == debug_levels[i].level) {
3492                         break;
3493                 }
3494         }
3495         if (debug_levels[i].description == NULL) {
3496                 printf("Invalid debug level, must be one of\n");
3497                 for (i=0; debug_levels[i].description != NULL; i++) {
3498                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3499                 }
3500                 return -1;
3501         }
3502
3503         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3504         if (ret != 0) {
3505                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3506         }
3507         return 0;
3508 }
3509
3510
3511 /*
3512   thaw a node
3513  */
3514 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3515 {
3516         int ret;
3517         uint32_t priority;
3518         
3519         if (argc == 1) {
3520                 priority = strtol(argv[0], NULL, 0);
3521         } else {
3522                 priority = 0;
3523         }
3524         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3525
3526         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3527         if (ret != 0) {
3528                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3529         }               
3530         return 0;
3531 }
3532
3533
3534 /*
3535   attach to a database
3536  */
3537 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3538 {
3539         const char *db_name;
3540         struct ctdb_db_context *ctdb_db;
3541
3542         if (argc < 1) {
3543                 usage();
3544         }
3545         db_name = argv[0];
3546
3547         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3548         if (ctdb_db == NULL) {
3549                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3550                 return -1;
3551         }
3552
3553         return 0;
3554 }
3555
3556 /*
3557   set db priority
3558  */
3559 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3560 {
3561         struct ctdb_db_priority db_prio;
3562         int ret;
3563
3564         if (argc < 2) {
3565                 usage();
3566         }
3567
3568         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3569         db_prio.priority = strtoul(argv[1], NULL, 0);
3570
3571         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3572         if (ret != 0) {
3573                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3574                 return -1;
3575         }
3576
3577         return 0;
3578 }
3579
3580 /*
3581   get db priority
3582  */
3583 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3584 {
3585         uint32_t db_id, priority;
3586         int ret;
3587
3588         if (argc < 1) {
3589                 usage();
3590         }
3591
3592         db_id = strtoul(argv[0], NULL, 0);
3593
3594         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3595         if (ret != 0) {
3596                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3597                 return -1;
3598         }
3599
3600         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3601
3602         return 0;
3603 }
3604
3605 /*
3606   run an eventscript on a node
3607  */
3608 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3609 {
3610         TDB_DATA data;
3611         int ret;
3612         int32_t res;
3613         char *errmsg;
3614         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3615
3616         if (argc != 1) {
3617                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3618                 return -1;
3619         }
3620
3621         data.dptr = (unsigned char *)discard_const(argv[0]);
3622         data.dsize = strlen((char *)data.dptr) + 1;
3623
3624         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3625
3626         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3627                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3628         if (ret != 0 || res != 0) {
3629                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3630                 talloc_free(tmp_ctx);
3631                 return -1;
3632         }
3633         talloc_free(tmp_ctx);
3634         return 0;
3635 }
3636
3637 #define DB_VERSION 1
3638 #define MAX_DB_NAME 64
3639 struct db_file_header {
3640         unsigned long version;
3641         time_t timestamp;
3642         unsigned long persistent;
3643         unsigned long size;
3644         const char name[MAX_DB_NAME];
3645 };
3646
3647 struct backup_data {
3648         struct ctdb_marshall_buffer *records;
3649         uint32_t len;
3650         uint32_t total;
3651         bool traverse_error;
3652 };
3653
3654 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3655 {
3656         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3657         struct ctdb_rec_data *rec;
3658
3659         /* add the record */
3660         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3661         if (rec == NULL) {
3662                 bd->traverse_error = true;
3663                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3664                 return -1;
3665         }
3666         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3667         if (bd->records == NULL) {
3668                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3669                 bd->traverse_error = true;
3670                 return -1;
3671         }
3672         bd->records->count++;
3673         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3674         bd->len += rec->length;
3675         talloc_free(rec);
3676
3677         bd->total++;
3678         return 0;
3679 }
3680
3681 /*
3682  * backup a database to a file 
3683  */
3684 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3685 {
3686         int i, ret;
3687         struct ctdb_dbid_map *dbmap=NULL;
3688         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3689         struct db_file_header dbhdr;
3690         struct ctdb_db_context *ctdb_db;
3691         struct backup_data *bd;
3692         int fh = -1;
3693         int status = -1;
3694         const char *reason = NULL;
3695
3696         if (argc != 2) {
3697                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3698                 return -1;
3699         }
3700
3701         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3702         if (ret != 0) {
3703                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3704                 return ret;
3705         }
3706
3707         for(i=0;i<dbmap->num;i++){
3708                 const char *name;
3709
3710                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3711                 if(!strcmp(argv[0], name)){
3712                         talloc_free(discard_const(name));
3713                         break;
3714                 }
3715                 talloc_free(discard_const(name));
3716         }
3717         if (i == dbmap->num) {
3718                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3719                 talloc_free(tmp_ctx);
3720                 return -1;
3721         }
3722
3723         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3724                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3725         if (ret != 0) {
3726                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3727                                  argv[0]));
3728                 talloc_free(tmp_ctx);
3729                 return -1;
3730         }
3731         if (reason) {
3732                 uint32_t allow_unhealthy = 0;
3733
3734                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3735                                       "AllowUnhealthyDBRead",
3736                                       &allow_unhealthy);
3737
3738                 if (allow_unhealthy != 1) {
3739                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3740                                          argv[0], reason));
3741
3742                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3743                                          allow_unhealthy));
3744                         talloc_free(tmp_ctx);
3745                         return -1;
3746                 }
3747
3748                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3749                                      argv[0], argv[0]));
3750                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3751                                      "tunnable AllowUnhealthyDBRead = %u\n",
3752                                      allow_unhealthy));
3753         }
3754
3755         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3756         if (ctdb_db == NULL) {
3757                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3758                 talloc_free(tmp_ctx);
3759                 return -1;
3760         }
3761
3762
3763         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3764         if (ret == -1) {
3765                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3766                 talloc_free(tmp_ctx);
3767                 return -1;
3768         }
3769
3770
3771         bd = talloc_zero(tmp_ctx, struct backup_data);
3772         if (bd == NULL) {
3773                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3774                 talloc_free(tmp_ctx);
3775                 return -1;
3776         }
3777
3778         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3779         if (bd->records == NULL) {
3780                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3781                 talloc_free(tmp_ctx);
3782                 return -1;
3783         }
3784
3785         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3786         bd->records->db_id = ctdb_db->db_id;
3787         /* traverse the database collecting all records */
3788         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3789             bd->traverse_error) {
3790                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3791                 talloc_free(tmp_ctx);
3792                 return -1;              
3793         }
3794
3795         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3796
3797
3798         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3799         if (fh == -1) {
3800                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3801                 talloc_free(tmp_ctx);
3802                 return -1;
3803         }
3804
3805         dbhdr.version = DB_VERSION;
3806         dbhdr.timestamp = time(NULL);
3807         dbhdr.persistent = dbmap->dbs[i].persistent;
3808         dbhdr.size = bd->len;
3809         if (strlen(argv[0]) >= MAX_DB_NAME) {
3810                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3811                 goto done;
3812         }
3813         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3814         ret = write(fh, &dbhdr, sizeof(dbhdr));
3815         if (ret == -1) {
3816                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3817                 goto done;
3818         }
3819         ret = write(fh, bd->records, bd->len);
3820         if (ret == -1) {
3821                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3822                 goto done;
3823         }
3824
3825         status = 0;
3826 done:
3827         if (fh != -1) {
3828                 ret = close(fh);
3829                 if (ret == -1) {
3830                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3831                 }
3832         }
3833         talloc_free(tmp_ctx);
3834         return status;
3835 }
3836
3837 /*
3838  * restore a database from a file 
3839  */
3840 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3841 {
3842         int ret;
3843         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3844         TDB_DATA outdata;
3845         TDB_DATA data;
3846         struct db_file_header dbhdr;
3847         struct ctdb_db_context *ctdb_db;
3848         struct ctdb_node_map *nodemap=NULL;
3849         struct ctdb_vnn_map *vnnmap=NULL;
3850         int i, fh;
3851         struct ctdb_control_wipe_database w;
3852         uint32_t *nodes;
3853         uint32_t generation;
3854         struct tm *tm;
3855         char tbuf[100];
3856         char *dbname;
3857
3858         if (argc < 1 || argc > 2) {
3859                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3860                 return -1;
3861         }
3862
3863         fh = open(argv[0], O_RDONLY);
3864         if (fh == -1) {
3865                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3866                 talloc_free(tmp_ctx);
3867                 return -1;
3868         }
3869
3870         read(fh, &dbhdr, sizeof(dbhdr));
3871         if (dbhdr.version != DB_VERSION) {
3872                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3873                 talloc_free(tmp_ctx);
3874                 return -1;
3875         }
3876
3877         dbname = discard_const(dbhdr.name);
3878         if (argc == 2) {
3879                 dbname = discard_const(argv[1]);
3880         }
3881
3882         outdata.dsize = dbhdr.size;
3883         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3884         if (outdata.dptr == NULL) {
3885                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3886                 close(fh);
3887                 talloc_free(tmp_ctx);
3888                 return -1;
3889         }               
3890         read(fh, outdata.dptr, outdata.dsize);
3891         close(fh);
3892
3893         tm = localtime(&dbhdr.timestamp);
3894         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3895         printf("Restoring database '%s' from backup @ %s\n",
3896                 dbname, tbuf);
3897
3898
3899         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3900         if (ctdb_db == NULL) {
3901                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3902                 talloc_free(tmp_ctx);
3903                 return -1;
3904         }
3905
3906         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3907         if (ret != 0) {
3908                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3909                 talloc_free(tmp_ctx);
3910                 return ret;
3911         }
3912
3913
3914         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3915         if (ret != 0) {
3916                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3917                 talloc_free(tmp_ctx);
3918                 return ret;
3919         }
3920
3921         /* freeze all nodes */
3922         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3923         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3924                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3925                                         nodes, i,
3926                                         TIMELIMIT(),
3927                                         false, tdb_null,
3928                                         NULL, NULL,
3929                                         NULL) != 0) {
3930                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3931                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3932                         talloc_free(tmp_ctx);
3933                         return -1;
3934                 }
3935         }
3936
3937         generation = vnnmap->generation;
3938         data.dptr = (void *)&generation;
3939         data.dsize = sizeof(generation);
3940
3941         /* start a cluster wide transaction */
3942         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3943         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3944                                         nodes, 0,
3945                                         TIMELIMIT(), false, data,
3946                                         NULL, NULL,
3947                                         NULL) != 0) {
3948                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3949                 return -1;
3950         }
3951
3952
3953         w.db_id = ctdb_db->db_id;
3954         w.transaction_id = generation;
3955
3956         data.dptr = (void *)&w;
3957         data.dsize = sizeof(w);
3958
3959         /* wipe all the remote databases. */
3960         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3961         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3962                                         nodes, 0,
3963                                         TIMELIMIT(), false, data,
3964                                         NULL, NULL,
3965                                         NULL) != 0) {
3966                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3967                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3968                 talloc_free(tmp_ctx);
3969                 return -1;
3970         }
3971         
3972         /* push the database */
3973         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3974         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3975                                         nodes, 0,
3976                                         TIMELIMIT(), false, outdata,
3977                                         NULL, NULL,
3978                                         NULL) != 0) {
3979                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3980                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3981                 talloc_free(tmp_ctx);
3982                 return -1;
3983         }
3984
3985         data.dptr = (void *)&ctdb_db->db_id;
3986         data.dsize = sizeof(ctdb_db->db_id);
3987
3988         /* mark the database as healthy */
3989         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3990         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3991                                         nodes, 0,
3992                                         TIMELIMIT(), false, data,
3993                                         NULL, NULL,
3994                                         NULL) != 0) {
3995                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3996                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3997                 talloc_free(tmp_ctx);
3998                 return -1;
3999         }
4000
4001         data.dptr = (void *)&generation;
4002         data.dsize = sizeof(generation);
4003
4004         /* commit all the changes */
4005         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4006                                         nodes, 0,
4007                                         TIMELIMIT(), false, data,
4008                                         NULL, NULL,
4009                                         NULL) != 0) {
4010                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4011                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4012                 talloc_free(tmp_ctx);
4013                 return -1;
4014         }
4015
4016
4017         /* thaw all nodes */
4018         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4019         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4020                                         nodes, 0,
4021                                         TIMELIMIT(),
4022                                         false, tdb_null,
4023                                         NULL, NULL,
4024                                         NULL) != 0) {
4025                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4026                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4027                 talloc_free(tmp_ctx);
4028                 return -1;
4029         }
4030
4031
4032         talloc_free(tmp_ctx);
4033         return 0;
4034 }
4035
4036 /*
4037  * dump a database backup from a file
4038  */
4039 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
4040 {
4041         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4042         TDB_DATA outdata;
4043         struct db_file_header dbhdr;
4044         int i, fh;
4045         struct tm *tm;
4046         char tbuf[100];
4047         struct ctdb_rec_data *rec = NULL;
4048         struct ctdb_marshall_buffer *m;
4049
4050         if (argc != 1) {
4051                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4052                 return -1;
4053         }
4054
4055         fh = open(argv[0], O_RDONLY);
4056         if (fh == -1) {
4057                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4058                 talloc_free(tmp_ctx);
4059                 return -1;
4060         }
4061
4062         read(fh, &dbhdr, sizeof(dbhdr));
4063         if (dbhdr.version != DB_VERSION) {
4064                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4065                 talloc_free(tmp_ctx);
4066                 return -1;
4067         }
4068
4069         outdata.dsize = dbhdr.size;
4070         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4071         if (outdata.dptr == NULL) {
4072                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4073                 close(fh);
4074                 talloc_free(tmp_ctx);
4075                 return -1;
4076         }
4077         read(fh, outdata.dptr, outdata.dsize);
4078         close(fh);
4079         m = (struct ctdb_marshall_buffer *)outdata.dptr;
4080
4081         tm = localtime(&dbhdr.timestamp);
4082         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4083         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
4084                 dbhdr.name, m->db_id, tbuf);
4085
4086         for (i=0; i < m->count; i++) {
4087                 uint32_t reqid = 0;
4088                 TDB_DATA key, data;
4089
4090                 /* we do not want the header splitted, so we pass NULL*/
4091                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
4092                                               NULL, &key, &data);
4093
4094                 ctdb_dumpdb_record(ctdb, key, data, stdout);
4095         }
4096
4097         printf("Dumped %d records\n", i);
4098         talloc_free(tmp_ctx);
4099         return 0;
4100 }
4101
4102 /*
4103  * wipe a database from a file
4104  */
4105 static int control_wipedb(struct ctdb_context *ctdb, int argc,
4106                           const char **argv)
4107 {
4108         int ret;
4109         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4110         TDB_DATA data;
4111         struct ctdb_db_context *ctdb_db;
4112         struct ctdb_node_map *nodemap = NULL;
4113         struct ctdb_vnn_map *vnnmap = NULL;
4114         int i;
4115         struct ctdb_control_wipe_database w;
4116         uint32_t *nodes;
4117         uint32_t generation;
4118         struct ctdb_dbid_map *dbmap = NULL;
4119
4120         if (argc != 1) {
4121                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4122                 return -1;
4123         }
4124
4125         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4126                                  &dbmap);
4127         if (ret != 0) {
4128                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
4129                                   options.pnn));
4130                 return ret;
4131         }
4132
4133         for(i=0;i<dbmap->num;i++){
4134                 const char *name;
4135
4136                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4137                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
4138                 if(!strcmp(argv[0], name)){
4139                         talloc_free(discard_const(name));
4140                         break;
4141                 }
4142                 talloc_free(discard_const(name));
4143         }
4144         if (i == dbmap->num) {
4145                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
4146                                   argv[0]));
4147                 talloc_free(tmp_ctx);
4148                 return -1;
4149         }
4150
4151         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
4152         if (ctdb_db == NULL) {
4153                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
4154                                   argv[0]));
4155                 talloc_free(tmp_ctx);
4156                 return -1;
4157         }
4158
4159         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
4160                                    &nodemap);
4161         if (ret != 0) {
4162                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
4163                                   options.pnn));
4164                 talloc_free(tmp_ctx);
4165                 return ret;
4166         }
4167
4168         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4169                                   &vnnmap);
4170         if (ret != 0) {
4171                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
4172                                   options.pnn));
4173                 talloc_free(tmp_ctx);
4174                 return ret;
4175         }
4176
4177         /* freeze all nodes */
4178         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4179         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4180                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4181                                                 nodes, i,
4182                                                 TIMELIMIT(),
4183                                                 false, tdb_null,
4184                                                 NULL, NULL,
4185                                                 NULL);
4186                 if (ret != 0) {
4187                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4188                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
4189                                              CTDB_RECOVERY_ACTIVE);
4190                         talloc_free(tmp_ctx);
4191                         return -1;
4192                 }
4193         }
4194
4195         generation = vnnmap->generation;
4196         data.dptr = (void *)&generation;
4197         data.dsize = sizeof(generation);
4198
4199         /* start a cluster wide transaction */
4200         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4201         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4202                                         nodes, 0,
4203                                         TIMELIMIT(), false, data,
4204                                         NULL, NULL,
4205                                         NULL);
4206         if (ret!= 0) {
4207                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
4208                                   "transactions.\n"));
4209                 return -1;
4210         }
4211
4212         w.db_id = ctdb_db->db_id;
4213         w.transaction_id = generation;
4214
4215         data.dptr = (void *)&w;
4216         data.dsize = sizeof(w);
4217
4218         /* wipe all the remote databases. */
4219         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4220         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4221                                         nodes, 0,
4222                                         TIMELIMIT(), false, data,
4223                                         NULL, NULL,
4224                                         NULL) != 0) {
4225                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4226                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4227                 talloc_free(tmp_ctx);
4228                 return -1;
4229         }
4230
4231         data.dptr = (void *)&ctdb_db->db_id;
4232         data.dsize = sizeof(ctdb_db->db_id);
4233
4234         /* mark the database as healthy */
4235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4237                                         nodes, 0,
4238                                         TIMELIMIT(), false, data,
4239                                         NULL, NULL,
4240                                         NULL) != 0) {
4241                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4242                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4243                 talloc_free(tmp_ctx);
4244                 return -1;
4245         }
4246
4247         data.dptr = (void *)&generation;
4248         data.dsize = sizeof(generation);
4249
4250         /* commit all the changes */
4251         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4252                                         nodes, 0,
4253                                         TIMELIMIT(), false, data,
4254                                         NULL, NULL,
4255                                         NULL) != 0) {
4256                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4257                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4258                 talloc_free(tmp_ctx);
4259                 return -1;
4260         }
4261
4262         /* thaw all nodes */
4263         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4264         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4265                                         nodes, 0,
4266                                         TIMELIMIT(),
4267                                         false, tdb_null,
4268                                         NULL, NULL,
4269                                         NULL) != 0) {
4270                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4271                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4272                 talloc_free(tmp_ctx);
4273                 return -1;
4274         }
4275
4276         talloc_free(tmp_ctx);
4277         return 0;
4278 }
4279
4280 /*
4281  * set flags of a node in the nodemap
4282  */
4283 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4284 {
4285         int ret;
4286         int32_t status;
4287         int node;
4288         int flags;
4289         TDB_DATA data;
4290         struct ctdb_node_flag_change c;
4291
4292         if (argc != 2) {
4293                 usage();
4294                 return -1;
4295         }
4296
4297         if (sscanf(argv[0], "%d", &node) != 1) {
4298                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4299                 usage();
4300                 return -1;
4301         }
4302         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4303                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4304                 usage();
4305                 return -1;
4306         }
4307
4308         c.pnn       = node;
4309         c.old_flags = 0;
4310         c.new_flags = flags;
4311
4312         data.dsize = sizeof(c);
4313         data.dptr = (unsigned char *)&c;
4314
4315         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4316                            data, NULL, NULL, &status, NULL, NULL);
4317         if (ret != 0 || status != 0) {
4318                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4319                 return -1;
4320         }
4321         return 0;
4322 }
4323
4324 /*
4325   dump memory usage
4326  */
4327 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4328 {
4329         TDB_DATA data;
4330         int ret;
4331         int32_t res;
4332         char *errmsg;
4333         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4334         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4335                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4336         if (ret != 0 || res != 0) {
4337                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4338                 talloc_free(tmp_ctx);
4339                 return -1;
4340         }
4341         write(1, data.dptr, data.dsize);
4342         talloc_free(tmp_ctx);
4343         return 0;
4344 }
4345
4346 /*
4347   handler for memory dumps
4348 */
4349 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4350                              TDB_DATA data, void *private_data)
4351 {
4352         write(1, data.dptr, data.dsize);
4353         exit(0);
4354 }
4355
4356 /*
4357   dump memory usage on the recovery daemon
4358  */
4359 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4360 {
4361         int ret;
4362         TDB_DATA data;
4363         struct rd_memdump_reply rd;
4364
4365         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4366         if (rd.pnn == -1) {
4367                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4368                 return -1;
4369         }
4370         rd.srvid = getpid();
4371
4372         /* register a message port for receiveing the reply so that we
4373            can receive the reply
4374         */
4375         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4376
4377
4378         data.dptr = (uint8_t *)&rd;
4379         data.dsize = sizeof(rd);
4380
4381         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4382         if (ret != 0) {
4383                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4384                 return -1;
4385         }
4386
4387         /* this loop will terminate when we have received the reply */
4388         while (1) {     
4389                 event_loop_once(ctdb->ev);
4390         }
4391
4392         return 0;
4393 }
4394
4395 /*
4396   send a message to a srvid
4397  */
4398 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4399 {
4400         unsigned long srvid;
4401         int ret;
4402         TDB_DATA data;
4403
4404         if (argc < 2) {
4405                 usage();
4406         }
4407
4408         srvid      = strtoul(argv[0], NULL, 0);
4409
4410         data.dptr = (uint8_t *)discard_const(argv[1]);
4411         data.dsize= strlen(argv[1]);
4412
4413         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4414         if (ret != 0) {
4415                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4416                 return -1;
4417         }
4418
4419         return 0;
4420 }
4421
4422 /*
4423   handler for msglisten
4424 */
4425 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4426                              TDB_DATA data, void *private_data)
4427 {
4428         int i;
4429
4430         printf("Message received: ");
4431         for (i=0;i<data.dsize;i++) {
4432                 printf("%c", data.dptr[i]);
4433         }
4434         printf("\n");
4435 }
4436
4437 /*
4438   listen for messages on a messageport
4439  */
4440 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4441 {
4442         uint64_t srvid;
4443
4444         srvid = getpid();
4445
4446         /* register a message port and listen for messages
4447         */
4448         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4449         printf("Listening for messages on srvid:%d\n", (int)srvid);
4450
4451         while (1) {     
4452                 event_loop_once(ctdb->ev);
4453         }
4454
4455         return 0;
4456 }
4457
4458 /*
4459   list all nodes in the cluster
4460   if the daemon is running, we read the data from the daemon.
4461   if the daemon is not running we parse the nodes file directly
4462  */
4463 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4464 {
4465         int i, ret;
4466         struct ctdb_node_map *nodemap=NULL;
4467
4468         if (ctdb != NULL) {
4469                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4470                 if (ret != 0) {
4471                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4472                         return ret;
4473                 }
4474
4475                 for(i=0;i<nodemap->num;i++){
4476                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4477                                 continue;
4478                         }
4479                         if (options.machinereadable){
4480                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4481                         } else {
4482                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4483                         }
4484                 }
4485         } else {
4486                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4487                 struct pnn_node *pnn_nodes;
4488                 struct pnn_node *pnn_node;
4489         
4490                 pnn_nodes = read_nodes_file(mem_ctx);
4491                 if (pnn_nodes == NULL) {
4492                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4493                         talloc_free(mem_ctx);
4494                         return -1;
4495                 }
4496
4497                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4498                         ctdb_sock_addr addr;
4499
4500                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4501                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4502                                 talloc_free(mem_ctx);
4503                                 return -1;
4504                         }
4505
4506                         if (options.machinereadable){
4507                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4508                         } else {
4509                                 printf("%s\n", pnn_node->addr);
4510                         }
4511                 }
4512                 talloc_free(mem_ctx);
4513         }
4514
4515         return 0;
4516 }
4517
4518 /*
4519   reload the nodes file on the local node
4520  */
4521 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4522 {
4523         int i, ret;
4524         int mypnn;
4525         struct ctdb_node_map *nodemap=NULL;
4526
4527         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4528         if (mypnn == -1) {
4529                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4530                 return -1;
4531         }
4532
4533         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4534         if (ret != 0) {
4535                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4536                 return ret;
4537         }
4538
4539         /* reload the nodes file on all remote nodes */
4540         for (i=0;i<nodemap->num;i++) {
4541                 if (nodemap->nodes[i].pnn == mypnn) {
4542                         continue;
4543                 }
4544                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4545                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4546                         nodemap->nodes[i].pnn);
4547                 if (ret != 0) {
4548                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4549                 }
4550         }
4551
4552         /* reload the nodes file on the local node */
4553         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4554         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4555         if (ret != 0) {
4556                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4557         }
4558
4559         /* initiate a recovery */
4560         control_recover(ctdb, argc, argv);
4561
4562         return 0;
4563 }
4564
4565
4566 static const struct {
4567         const char *name;
4568         int (*fn)(struct ctdb_context *, int, const char **);
4569         bool auto_all;
4570         bool without_daemon; /* can be run without daemon running ? */
4571         const char *msg;
4572         const char *args;
4573 } ctdb_commands[] = {
4574 #ifdef CTDB_VERS
4575         { "version",         control_version,           true,   false,  "show version of ctdb" },
4576 #endif
4577         { "status",          control_status,            true,   false,  "show node status" },
4578         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4579         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4580         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4581         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4582         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4583         { "statistics",      control_statistics,        false,  false, "show statistics" },
4584         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4585         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4586         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4587         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4588         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4589         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4590         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4591         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4592         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4593         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4594         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4595         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4596         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4597         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4598         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4599         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4600         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4601         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4602         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4603         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4604         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4605         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4606         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4607         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4608         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4609         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4610         { "stop",            control_stop,              true,   false,  "stop a node" },
4611         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4612         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4613         { "unban",           control_unban,             true,   false,  "unban a node" },
4614         { "showban",         control_showban,           true,   false,  "show ban information"},
4615         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4616         { "recover",         control_recover,           true,   false,  "force recovery" },
4617         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
4618         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4619         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4620         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4621         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4622         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4623         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4624         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
4625         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
4626
4627         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
4628
4629         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4630         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4631         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4632         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4633         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4634         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4635         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4636         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4637         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4638         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4639         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4640         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4641         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4642         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4643         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4644         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4645         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4646         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4647         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4648         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4649         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4650         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4651         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4652         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4653         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4654         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4655         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4656         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4657         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4658         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4659         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4660         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4661         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
4662         { "pfetch",          control_pfetch,            true,   false,  "fetch a record from a persistent database", "<db> <key>" },
4663         { "pstore",          control_pstore,            true,   false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
4664 };
4665
4666 /*
4667   show usage message
4668  */
4669 static void usage(void)
4670 {
4671         int i;
4672         printf(
4673 "Usage: ctdb [options] <control>\n" \
4674 "Options:\n" \
4675 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4676 "   -Y                 generate machinereadable output\n"
4677 "   -v                 generate verbose output\n"
4678 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4679         printf("Controls:\n");
4680         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4681                 printf("  %-15s %-27s  %s\n", 
4682                        ctdb_commands[i].name, 
4683                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4684                        ctdb_commands[i].msg);
4685         }
4686         exit(1);
4687 }
4688
4689
4690 static void ctdb_alarm(int sig)
4691 {
4692         printf("Maximum runtime exceeded - exiting\n");
4693         _exit(ERR_TIMEOUT);
4694 }
4695
4696 /*
4697   main program
4698 */
4699 int main(int argc, const char *argv[])
4700 {
4701         struct ctdb_context *ctdb;
4702         char *nodestring = NULL;
4703         struct poptOption popt_options[] = {
4704                 POPT_AUTOHELP
4705                 POPT_CTDB_CMDLINE
4706                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4707                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4708                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4709                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
4710                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4711                 POPT_TABLEEND
4712         };
4713         int opt;
4714         const char **extra_argv;
4715         int extra_argc = 0;
4716         int ret=-1, i;
4717         poptContext pc;
4718         struct event_context *ev;
4719         const char *control;
4720
4721         setlinebuf(stdout);
4722         
4723         /* set some defaults */
4724         options.maxruntime = 0;
4725         options.timelimit = 3;
4726         options.pnn = CTDB_CURRENT_NODE;
4727
4728         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4729
4730         while ((opt = poptGetNextOpt(pc)) != -1) {
4731                 switch (opt) {
4732                 default:
4733                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4734                                 poptBadOption(pc, 0), poptStrerror(opt)));
4735                         exit(1);
4736                 }
4737         }
4738
4739         /* setup the remaining options for the main program to use */
4740         extra_argv = poptGetArgs(pc);
4741         if (extra_argv) {
4742                 extra_argv++;
4743                 while (extra_argv[extra_argc]) extra_argc++;
4744         }
4745
4746         if (extra_argc < 1) {
4747                 usage();
4748         }
4749
4750         if (options.maxruntime == 0) {
4751                 const char *ctdb_timeout;
4752                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4753                 if (ctdb_timeout != NULL) {
4754                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4755                 } else {
4756                         /* default timeout is 120 seconds */
4757                         options.maxruntime = 120;
4758                 }
4759         }
4760
4761         signal(SIGALRM, ctdb_alarm);
4762         alarm(options.maxruntime);
4763
4764         /* setup the node number to contact */
4765         if (nodestring != NULL) {
4766                 if (strcmp(nodestring, "all") == 0) {
4767                         options.pnn = CTDB_BROADCAST_ALL;
4768                 } else {
4769                         options.pnn = strtoul(nodestring, NULL, 0);
4770                 }
4771         }
4772
4773         control = extra_argv[0];
4774
4775         ev = event_context_init(NULL);
4776         if (!ev) {
4777                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
4778                 exit(1);
4779         }
4780         tevent_loop_allow_nesting(ev);
4781
4782         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4783                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4784                         int j;
4785
4786                         if (ctdb_commands[i].without_daemon == true) {
4787                                 close(2);
4788                         }
4789
4790                         /* initialise ctdb */
4791                         ctdb = ctdb_cmdline_client(ev);
4792
4793                         if (ctdb_commands[i].without_daemon == false) {
4794                                 const char *socket_name;
4795
4796                                 if (ctdb == NULL) {
4797                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4798                                         exit(1);
4799                                 }
4800
4801                                 /* initialize a libctdb connection as well */
4802                                 socket_name = ctdb_get_socketname(ctdb);
4803                                 ctdb_connection = ctdb_connect(socket_name,
4804                                                        ctdb_log_file, stderr);
4805                                 if (ctdb_connection == NULL) {
4806                                         fprintf(stderr, "Failed to connect to daemon from libctdb\n");
4807                                         exit(1);
4808                                 }                               
4809                         
4810                                 /* verify the node exists */
4811                                 verify_node(ctdb);
4812
4813                                 if (options.pnn == CTDB_CURRENT_NODE) {
4814                                         int pnn;
4815                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4816                                         if (pnn == -1) {
4817                                                 return -1;
4818                                         }
4819                                         options.pnn = pnn;
4820                                 }
4821                         }
4822
4823                         if (ctdb_commands[i].auto_all && 
4824                             options.pnn == CTDB_BROADCAST_ALL) {
4825                                 uint32_t *nodes;
4826                                 uint32_t num_nodes;
4827                                 ret = 0;
4828
4829                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4830                                 CTDB_NO_MEMORY(ctdb, nodes);
4831         
4832                                 for (j=0;j<num_nodes;j++) {
4833                                         options.pnn = nodes[j];
4834                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4835                                 }
4836                                 talloc_free(nodes);
4837                         } else {
4838                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4839                         }
4840                         break;
4841                 }
4842         }
4843
4844         if (i == ARRAY_SIZE(ctdb_commands)) {
4845                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4846                 exit(1);
4847         }
4848
4849         return ret;
4850 }