tools/ctdb: Fix tstore command to generate ltdb header internally
[ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         uint32_t *nodes;
44         int machinereadable;
45         int verbose;
46         int maxruntime;
47         int printemptyrecords;
48         int printdatasize;
49         int printlmaster;
50         int printhash;
51         int printrecordflags;
52 } options;
53
54 #define LONGTIMEOUT options.timelimit*10
55
56 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
57 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
58
59 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
60 {
61         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
62         return 0;
63 }
64
65 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
66                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
67                                    "Out of memory in " __location__ )); \
68                 abort();                                                \
69         }} while (0)
70
71 static uint32_t getpnn(struct ctdb_context *ctdb)
72 {
73         if ((options.pnn == CTDB_BROADCAST_ALL) ||
74             (options.pnn == CTDB_MULTICAST)) {
75                 DEBUG(DEBUG_ERR,
76                       ("Cannot get PNN for node %u\n", options.pnn));
77                 exit(1);
78         }
79
80         if (options.pnn == CTDB_CURRENT_NODE) {
81                 return ctdb_get_pnn(ctdb);
82         } else {
83                 return options.pnn;
84         }
85 }
86
87 static void assert_single_node_only(void)
88 {
89         if ((options.pnn == CTDB_BROADCAST_ALL) ||
90             (options.pnn == CTDB_MULTICAST)) {
91                 DEBUG(DEBUG_ERR,
92                       ("This control can not be applied to multiple PNNs\n"));
93                 exit(1);
94         }
95 }
96
97 /* Pretty print the flags to a static buffer in human-readable format.
98  * This never returns NULL!
99  */
100 static const char *pretty_print_flags(uint32_t flags)
101 {
102         int j;
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         static char flags_str[512]; /* Big enough to contain all flag names */
116
117         flags_str[0] = '\0';
118         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
119                 if (flags & flag_names[j].flag) {
120                         if (flags_str[0] == '\0') {
121                                 (void) strcpy(flags_str, flag_names[j].name);
122                         } else {
123                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
124                                 (void) strncat(flags_str, flag_names[j].name,
125                                                sizeof(flags_str)-1);
126                         }
127                 }
128         }
129         if (flags_str[0] == '\0') {
130                 (void) strcpy(flags_str, "OK");
131         }
132
133         return flags_str;
134 }
135
136 static int h2i(char h)
137 {
138         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
139         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
140         return h - '0';
141 }
142
143 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
144 {
145         int i, len;
146         TDB_DATA key = {NULL, 0};
147
148         len = strlen(str);
149         if (len & 0x01) {
150                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
151                 return key;
152         }
153
154         key.dsize = len>>1;
155         key.dptr  = talloc_size(mem_ctx, key.dsize);
156
157         for (i=0; i < len/2; i++) {
158                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
159         }
160         return key;
161 }
162
163 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
164  * that are disconnected or deleted.  If dd_ok is true those nodes are
165  * included in the output list of nodes.  If dd_ok is false, those
166  * nodes are filtered from the "all" case and cause an error if
167  * explicitly specified.
168  */
169 static bool parse_nodestring(struct ctdb_context *ctdb,
170                              TALLOC_CTX *mem_ctx,
171                              const char * nodestring,
172                              uint32_t current_pnn,
173                              bool dd_ok,
174                              uint32_t **nodes,
175                              uint32_t *pnn_mode)
176 {
177         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
178         int n;
179         uint32_t i;
180         struct ctdb_node_map *nodemap;
181         int ret;
182
183         *nodes = NULL;
184
185         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
186         if (ret != 0) {
187                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
188                 talloc_free(tmp_ctx);
189                 exit(10);
190         }
191
192         if (nodestring != NULL) {
193                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
194                 if (*nodes == NULL) {
195                         goto failed;
196                 }
197
198                 n = 0;
199
200                 if (strcmp(nodestring, "all") == 0) {
201                         *pnn_mode = CTDB_BROADCAST_ALL;
202
203                         /* all */
204                         for (i = 0; i < nodemap->num; i++) {
205                                 if ((nodemap->nodes[i].flags &
206                                      (NODE_FLAGS_DISCONNECTED |
207                                       NODE_FLAGS_DELETED)) && !dd_ok) {
208                                         continue;
209                                 }
210                                 *nodes = talloc_realloc(mem_ctx, *nodes,
211                                                         uint32_t, n+1);
212                                 if (*nodes == NULL) {
213                                         goto failed;
214                                 }
215                                 (*nodes)[n] = i;
216                                 n++;
217                         }
218                 } else {
219                         /* x{,y...} */
220                         char *ns, *tok;
221
222                         ns = talloc_strdup(tmp_ctx, nodestring);
223                         tok = strtok(ns, ",");
224                         while (tok != NULL) {
225                                 uint32_t pnn;
226                                 i = (uint32_t)strtoul(tok, NULL, 0);
227                                 if (i >= nodemap->num) {
228                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
229                                         talloc_free(tmp_ctx);
230                                         exit(ERR_NONODE);
231                                 }
232                                 if ((nodemap->nodes[i].flags & 
233                                      (NODE_FLAGS_DISCONNECTED |
234                                       NODE_FLAGS_DELETED)) && !dd_ok) {
235                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
236                                         talloc_free(tmp_ctx);
237                                         exit(ERR_DISNODE);
238                                 }
239                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
240                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
241                                         talloc_free(tmp_ctx);
242                                         exit(10);
243                                 }
244
245                                 *nodes = talloc_realloc(mem_ctx, *nodes,
246                                                         uint32_t, n+1);
247                                 if (*nodes == NULL) {
248                                         goto failed;
249                                 }
250
251                                 (*nodes)[n] = i;
252                                 n++;
253
254                                 tok = strtok(NULL, ",");
255                         }
256                         talloc_free(ns);
257
258                         if (n == 1) {
259                                 *pnn_mode = (*nodes)[0];
260                         } else {
261                                 *pnn_mode = CTDB_MULTICAST;
262                         }
263                 }
264         } else {
265                 /* default - no nodes specified */
266                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
267                 if (*nodes == NULL) {
268                         goto failed;
269                 }
270                 *pnn_mode = CTDB_CURRENT_NODE;
271
272                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
273                         goto failed;
274                 }
275         }
276
277         talloc_free(tmp_ctx);
278         return true;
279
280 failed:
281         talloc_free(tmp_ctx);
282         return false;
283 }
284
285 /*
286  check if a database exists
287 */
288 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
289                       uint32_t *dbid, const char **dbname, uint8_t *flags)
290 {
291         int i, ret;
292         struct ctdb_dbid_map *dbmap=NULL;
293         bool dbid_given = false, found = false;
294         uint32_t id;
295         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
296         const char *name;
297
298         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
301                 goto fail;
302         }
303
304         if (strncmp(dbarg, "0x", 2) == 0) {
305                 id = strtoul(dbarg, NULL, 0);
306                 dbid_given = true;
307         }
308
309         for(i=0; i<dbmap->num; i++) {
310                 if (dbid_given) {
311                         if (id == dbmap->dbs[i].dbid) {
312                                 found = true;
313                                 break;
314                         }
315                 } else {
316                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
317                         if (ret != 0) {
318                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
319                                 goto fail;
320                         }
321
322                         if (strcmp(name, dbarg) == 0) {
323                                 id = dbmap->dbs[i].dbid;
324                                 found = true;
325                                 break;
326                         }
327                 }
328         }
329
330         if (found && dbid_given && dbname != NULL) {
331                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
332                 if (ret != 0) {
333                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
334                         found = false;
335                         goto fail;
336                 }
337         }
338
339         if (found) {
340                 if (dbid) *dbid = id;
341                 if (dbname) *dbname = talloc_strdup(ctdb, name);
342                 if (flags) *flags = dbmap->dbs[i].flags;
343         } else {
344                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
345         }
346
347 fail:
348         talloc_free(tmp_ctx);
349         return found;
350 }
351
352 /*
353   see if a process exists
354  */
355 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
356 {
357         uint32_t pnn, pid;
358         int ret;
359         if (argc < 1) {
360                 usage();
361         }
362
363         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
364                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
365                 return -1;
366         }
367
368         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
369         if (ret == 0) {
370                 printf("%u:%u exists\n", pnn, pid);
371         } else {
372                 printf("%u:%u does not exist\n", pnn, pid);
373         }
374         return ret;
375 }
376
377 /*
378   display statistics structure
379  */
380 static void show_statistics(struct ctdb_statistics *s, int show_header)
381 {
382         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
383         int i;
384         const char *prefix=NULL;
385         int preflen=0;
386         int tmp, days, hours, minutes, seconds;
387         const struct {
388                 const char *name;
389                 uint32_t offset;
390         } fields[] = {
391 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
392                 STATISTICS_FIELD(num_clients),
393                 STATISTICS_FIELD(frozen),
394                 STATISTICS_FIELD(recovering),
395                 STATISTICS_FIELD(num_recoveries),
396                 STATISTICS_FIELD(client_packets_sent),
397                 STATISTICS_FIELD(client_packets_recv),
398                 STATISTICS_FIELD(node_packets_sent),
399                 STATISTICS_FIELD(node_packets_recv),
400                 STATISTICS_FIELD(keepalive_packets_sent),
401                 STATISTICS_FIELD(keepalive_packets_recv),
402                 STATISTICS_FIELD(node.req_call),
403                 STATISTICS_FIELD(node.reply_call),
404                 STATISTICS_FIELD(node.req_dmaster),
405                 STATISTICS_FIELD(node.reply_dmaster),
406                 STATISTICS_FIELD(node.reply_error),
407                 STATISTICS_FIELD(node.req_message),
408                 STATISTICS_FIELD(node.req_control),
409                 STATISTICS_FIELD(node.reply_control),
410                 STATISTICS_FIELD(client.req_call),
411                 STATISTICS_FIELD(client.req_message),
412                 STATISTICS_FIELD(client.req_control),
413                 STATISTICS_FIELD(timeouts.call),
414                 STATISTICS_FIELD(timeouts.control),
415                 STATISTICS_FIELD(timeouts.traverse),
416                 STATISTICS_FIELD(locks.num_calls),
417                 STATISTICS_FIELD(locks.num_current),
418                 STATISTICS_FIELD(locks.num_pending),
419                 STATISTICS_FIELD(locks.num_failed),
420                 STATISTICS_FIELD(total_calls),
421                 STATISTICS_FIELD(pending_calls),
422                 STATISTICS_FIELD(childwrite_calls),
423                 STATISTICS_FIELD(pending_childwrite_calls),
424                 STATISTICS_FIELD(memory_used),
425                 STATISTICS_FIELD(max_hop_count),
426                 STATISTICS_FIELD(total_ro_delegations),
427                 STATISTICS_FIELD(total_ro_revokes),
428         };
429         
430         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
431         seconds = tmp%60;
432         tmp    /= 60;
433         minutes = tmp%60;
434         tmp    /= 60;
435         hours   = tmp%24;
436         tmp    /= 24;
437         days    = tmp;
438
439         if (options.machinereadable){
440                 if (show_header) {
441                         printf("CTDB version:");
442                         printf("Current time of statistics:");
443                         printf("Statistics collected since:");
444                         for (i=0;i<ARRAY_SIZE(fields);i++) {
445                                 printf("%s:", fields[i].name);
446                         }
447                         printf("num_reclock_ctdbd_latency:");
448                         printf("min_reclock_ctdbd_latency:");
449                         printf("avg_reclock_ctdbd_latency:");
450                         printf("max_reclock_ctdbd_latency:");
451
452                         printf("num_reclock_recd_latency:");
453                         printf("min_reclock_recd_latency:");
454                         printf("avg_reclock_recd_latency:");
455                         printf("max_reclock_recd_latency:");
456
457                         printf("num_call_latency:");
458                         printf("min_call_latency:");
459                         printf("avg_call_latency:");
460                         printf("max_call_latency:");
461
462                         printf("num_lockwait_latency:");
463                         printf("min_lockwait_latency:");
464                         printf("avg_lockwait_latency:");
465                         printf("max_lockwait_latency:");
466
467                         printf("num_childwrite_latency:");
468                         printf("min_childwrite_latency:");
469                         printf("avg_childwrite_latency:");
470                         printf("max_childwrite_latency:");
471                         printf("\n");
472                 }
473                 printf("%d:", CTDB_VERSION);
474                 printf("%d:", (int)s->statistics_current_time.tv_sec);
475                 printf("%d:", (int)s->statistics_start_time.tv_sec);
476                 for (i=0;i<ARRAY_SIZE(fields);i++) {
477                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
478                 }
479                 printf("%d:", s->reclock.ctdbd.num);
480                 printf("%.6f:", s->reclock.ctdbd.min);
481                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
482                 printf("%.6f:", s->reclock.ctdbd.max);
483
484                 printf("%d:", s->reclock.recd.num);
485                 printf("%.6f:", s->reclock.recd.min);
486                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
487                 printf("%.6f:", s->reclock.recd.max);
488
489                 printf("%d:", s->call_latency.num);
490                 printf("%.6f:", s->call_latency.min);
491                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
492                 printf("%.6f:", s->call_latency.max);
493
494                 printf("%d:", s->childwrite_latency.num);
495                 printf("%.6f:", s->childwrite_latency.min);
496                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
497                 printf("%.6f:", s->childwrite_latency.max);
498                 printf("\n");
499         } else {
500                 printf("CTDB version %u\n", CTDB_VERSION);
501                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
502                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
503
504                 for (i=0;i<ARRAY_SIZE(fields);i++) {
505                         if (strchr(fields[i].name, '.')) {
506                                 preflen = strcspn(fields[i].name, ".")+1;
507                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
508                                         prefix = fields[i].name;
509                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
510                                 }
511                         } else {
512                                 preflen = 0;
513                         }
514                         printf(" %*s%-22s%*s%10u\n", 
515                                preflen?4:0, "",
516                                fields[i].name+preflen, 
517                                preflen?0:4, "",
518                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
519                 }
520                 printf(" hop_count_buckets:");
521                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
522                         printf(" %d", s->hop_count_bucket[i]);
523                 }
524                 printf("\n");
525                 printf(" lock_buckets:");
526                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
527                         printf(" %d", s->locks.buckets[i]);
528                 }
529                 printf("\n");
530                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
531
532                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
533
534                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
535
536                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
537                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
538         }
539
540         talloc_free(tmp_ctx);
541 }
542
543 /*
544   display remote ctdb statistics combined from all nodes
545  */
546 static int control_statistics_all(struct ctdb_context *ctdb)
547 {
548         int ret, i;
549         struct ctdb_statistics statistics;
550         uint32_t *nodes;
551         uint32_t num_nodes;
552
553         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
554         CTDB_NO_MEMORY(ctdb, nodes);
555         
556         ZERO_STRUCT(statistics);
557
558         for (i=0;i<num_nodes;i++) {
559                 struct ctdb_statistics s1;
560                 int j;
561                 uint32_t *v1 = (uint32_t *)&s1;
562                 uint32_t *v2 = (uint32_t *)&statistics;
563                 uint32_t num_ints = 
564                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
565                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
566                 if (ret != 0) {
567                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
568                         return ret;
569                 }
570                 for (j=0;j<num_ints;j++) {
571                         v2[j] += v1[j];
572                 }
573                 statistics.max_hop_count = 
574                         MAX(statistics.max_hop_count, s1.max_hop_count);
575                 statistics.call_latency.max = 
576                         MAX(statistics.call_latency.max, s1.call_latency.max);
577         }
578         talloc_free(nodes);
579         printf("Gathered statistics for %u nodes\n", num_nodes);
580         show_statistics(&statistics, 1);
581         return 0;
582 }
583
584 /*
585   display remote ctdb statistics
586  */
587 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
588 {
589         int ret;
590         struct ctdb_statistics statistics;
591
592         if (options.pnn == CTDB_BROADCAST_ALL) {
593                 return control_statistics_all(ctdb);
594         }
595
596         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
597         if (ret != 0) {
598                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
599                 return ret;
600         }
601         show_statistics(&statistics, 1);
602         return 0;
603 }
604
605
606 /*
607   reset remote ctdb statistics
608  */
609 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
610 {
611         int ret;
612
613         ret = ctdb_statistics_reset(ctdb, options.pnn);
614         if (ret != 0) {
615                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
616                 return ret;
617         }
618         return 0;
619 }
620
621
622 /*
623   display remote ctdb rolling statistics
624  */
625 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
626 {
627         int ret;
628         struct ctdb_statistics_wire *stats;
629         int i, num_records = -1;
630
631         assert_single_node_only();
632
633         if (argc ==1) {
634                 num_records = atoi(argv[0]) - 1;
635         }
636
637         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
638         if (ret != 0) {
639                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
640                 return ret;
641         }
642         for (i=0;i<stats->num;i++) {
643                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
644                         continue;
645                 }
646                 show_statistics(&stats->stats[i], i==0);
647                 if (i == num_records) {
648                         break;
649                 }
650         }
651         return 0;
652 }
653
654
655 /*
656   display remote ctdb db statistics
657  */
658 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
659 {
660         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
661         struct ctdb_db_statistics *dbstat;
662         int i;
663         uint32_t db_id;
664         int num_hot_keys;
665         int ret;
666
667         if (argc < 1) {
668                 usage();
669         }
670
671         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
672                 return -1;
673         }
674
675         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
678                 talloc_free(tmp_ctx);
679                 return -1;
680         }
681
682         printf("DB Statistics: %s\n", argv[0]);
683         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
684                 dbstat->db_ro_delegations);
685         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
686                 dbstat->db_ro_delegations);
687         printf(" %s\n", "locks");
688         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
689                 dbstat->locks.num_calls);
690         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
691                 dbstat->locks.num_failed);
692         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
693                 dbstat->locks.num_current);
694         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
695                 dbstat->locks.num_pending);
696         printf(" %s", "hop_count_buckets:");
697         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
698                 printf(" %d", dbstat->hop_count_bucket[i]);
699         }
700         printf("\n");
701         printf(" %s", "lock_buckets:");
702         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
703                 printf(" %d", dbstat->locks.buckets[i]);
704         }
705         printf("\n");
706         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
707                 "locks_latency      MIN/AVG/MAX",
708                 dbstat->locks.latency.min,
709                 (dbstat->locks.latency.num ?
710                  dbstat->locks.latency.total /dbstat->locks.latency.num :
711                  0.0),
712                 dbstat->locks.latency.max,
713                 dbstat->locks.latency.num);
714         num_hot_keys = 0;
715         for (i=0; i<dbstat->num_hot_keys; i++) {
716                 if (dbstat->hot_keys[i].count > 0) {
717                         num_hot_keys++;
718                 }
719         }
720         dbstat->num_hot_keys = num_hot_keys;
721
722         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
723         for (i = 0; i < dbstat->num_hot_keys; i++) {
724                 int j;
725                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
726                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
727                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
728                 }
729                 printf("\n");
730         }
731
732         talloc_free(tmp_ctx);
733         return 0;
734 }
735
736 /*
737   display uptime of remote node
738  */
739 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
740 {
741         int ret;
742         struct ctdb_uptime *uptime = NULL;
743         int tmp, days, hours, minutes, seconds;
744
745         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
746         if (ret != 0) {
747                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
748                 return ret;
749         }
750
751         if (options.machinereadable){
752                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
753                 printf(":%u:%u:%u:%lf\n",
754                         (unsigned int)uptime->current_time.tv_sec,
755                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
756                         (unsigned int)uptime->last_recovery_finished.tv_sec,
757                         timeval_delta(&uptime->last_recovery_finished,
758                                       &uptime->last_recovery_started)
759                 );
760                 return 0;
761         }
762
763         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
764
765         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
766         seconds = tmp%60;
767         tmp    /= 60;
768         minutes = tmp%60;
769         tmp    /= 60;
770         hours   = tmp%24;
771         tmp    /= 24;
772         days    = tmp;
773         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
774
775         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
776         seconds = tmp%60;
777         tmp    /= 60;
778         minutes = tmp%60;
779         tmp    /= 60;
780         hours   = tmp%24;
781         tmp    /= 24;
782         days    = tmp;
783         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
784         
785         printf("Duration of last recovery/failover: %lf seconds\n",
786                 timeval_delta(&uptime->last_recovery_finished,
787                               &uptime->last_recovery_started));
788
789         return 0;
790 }
791
792 /*
793   show the PNN of the current node
794  */
795 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
796 {
797         uint32_t mypnn;
798
799         mypnn = getpnn(ctdb);
800
801         printf("PNN:%d\n", mypnn);
802         return 0;
803 }
804
805
806 struct pnn_node {
807         struct pnn_node *next;
808         const char *addr;
809         int pnn;
810 };
811
812 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
813 {
814         const char *nodes_list;
815         int nlines;
816         char **lines;
817         int i, pnn;
818         struct pnn_node *pnn_nodes = NULL;
819         struct pnn_node *pnn_node;
820         struct pnn_node *tmp_node;
821
822         /* read the nodes file */
823         nodes_list = getenv("CTDB_NODES");
824         if (nodes_list == NULL) {
825                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
826                                              getenv("CTDB_BASE"));
827                 if (nodes_list == NULL) {
828                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
829                         exit(1);
830                 }
831         }
832         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
833         if (lines == NULL) {
834                 return NULL;
835         }
836         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
837                 nlines--;
838         }
839         for (i=0, pnn=0; i<nlines; i++) {
840                 char *node;
841
842                 node = lines[i];
843                 /* strip leading spaces */
844                 while((*node == ' ') || (*node == '\t')) {
845                         node++;
846                 }
847                 if (*node == '#') {
848                         pnn++;
849                         continue;
850                 }
851                 if (strcmp(node, "") == 0) {
852                         continue;
853                 }
854                 pnn_node = talloc(mem_ctx, struct pnn_node);
855                 pnn_node->pnn = pnn++;
856                 pnn_node->addr = talloc_strdup(pnn_node, node);
857                 pnn_node->next = pnn_nodes;
858                 pnn_nodes = pnn_node;
859         }
860
861         /* swap them around so we return them in incrementing order */
862         pnn_node = pnn_nodes;
863         pnn_nodes = NULL;
864         while (pnn_node) {
865                 tmp_node = pnn_node;
866                 pnn_node = pnn_node->next;
867
868                 tmp_node->next = pnn_nodes;
869                 pnn_nodes = tmp_node;
870         }
871
872         return pnn_nodes;
873 }
874
875 /*
876   show the PNN of the current node
877   discover the pnn by loading the nodes file and try to bind to all
878   addresses one at a time until the ip address is found.
879  */
880 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
881 {
882         TALLOC_CTX *mem_ctx = talloc_new(NULL);
883         struct pnn_node *pnn_nodes;
884         struct pnn_node *pnn_node;
885
886         assert_single_node_only();
887
888         pnn_nodes = read_nodes_file(mem_ctx);
889         if (pnn_nodes == NULL) {
890                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
891                 talloc_free(mem_ctx);
892                 return -1;
893         }
894
895         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
896                 ctdb_sock_addr addr;
897
898                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
899                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
900                         talloc_free(mem_ctx);
901                         return -1;
902                 }
903
904                 if (ctdb_sys_have_ip(&addr)) {
905                         printf("PNN:%d\n", pnn_node->pnn);
906                         talloc_free(mem_ctx);
907                         return 0;
908                 }
909         }
910
911         printf("Failed to detect which PNN this node is\n");
912         talloc_free(mem_ctx);
913         return -1;
914 }
915
916 /* Helpers for ctdb status
917  */
918 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
919 {
920         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
921         int j;
922         bool ret = false;
923
924         if (node->flags == 0) {
925                 struct ctdb_control_get_ifaces *ifaces;
926
927                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
928                                          tmp_ctx, &ifaces) == 0) {
929                         for (j=0; j < ifaces->num; j++) {
930                                 if (ifaces->ifaces[j].link_state != 0) {
931                                         continue;
932                                 }
933                                 ret = true;
934                                 break;
935                         }
936                 }
937         }
938         talloc_free(tmp_ctx);
939
940         return ret;
941 }
942
943 static void control_status_header_machine(void)
944 {
945         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
946                ":Inactive:PartiallyOnline:ThisNode:\n");
947 }
948
949 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
950                                     struct ctdb_node_and_flags *node)
951 {
952         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
953                ctdb_addr_to_str(&node->addr),
954                !!(node->flags&NODE_FLAGS_DISCONNECTED),
955                !!(node->flags&NODE_FLAGS_BANNED),
956                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
957                !!(node->flags&NODE_FLAGS_UNHEALTHY),
958                !!(node->flags&NODE_FLAGS_STOPPED),
959                !!(node->flags&NODE_FLAGS_INACTIVE),
960                is_partially_online(ctdb, node) ? 1 : 0,
961                (node->pnn == mypnn)?'Y':'N');
962
963         return node->flags;
964 }
965
966 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
967                                   struct ctdb_node_and_flags *node)
968 {
969        printf("pnn:%d %-16s %s%s\n", node->pnn,
970               ctdb_addr_to_str(&node->addr),
971               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
972               node->pnn == mypnn?" (THIS NODE)":"");
973
974        return node->flags;
975 }
976
977 /*
978   display remote ctdb status
979  */
980 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
981 {
982         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
983         int i;
984         struct ctdb_vnn_map *vnnmap=NULL;
985         struct ctdb_node_map *nodemap=NULL;
986         uint32_t recmode, recmaster, mypnn;
987         int num_deleted_nodes = 0;
988         int ret;
989
990         mypnn = getpnn(ctdb);
991
992         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
993         if (ret != 0) {
994                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
995                 talloc_free(tmp_ctx);
996                 return -1;
997         }
998
999         if (options.machinereadable) {
1000                 control_status_header_machine();
1001                 for (i=0;i<nodemap->num;i++) {
1002                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1003                                 continue;
1004                         }
1005                         (void) control_status_1_machine(ctdb, mypnn,
1006                                                         &nodemap->nodes[i]);
1007                 }
1008                 talloc_free(tmp_ctx);
1009                 return 0;
1010         }
1011
1012         for (i=0; i<nodemap->num; i++) {
1013                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1014                         num_deleted_nodes++;
1015                 }
1016         }
1017         if (num_deleted_nodes == 0) {
1018                 printf("Number of nodes:%d\n", nodemap->num);
1019         } else {
1020                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1021                        nodemap->num, num_deleted_nodes);
1022         }
1023         for(i=0;i<nodemap->num;i++){
1024                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1025                         continue;
1026                 }
1027                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1028         }
1029
1030         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036         if (vnnmap->generation == INVALID_GENERATION) {
1037                 printf("Generation:INVALID\n");
1038         } else {
1039                 printf("Generation:%d\n",vnnmap->generation);
1040         }
1041         printf("Size:%d\n",vnnmap->size);
1042         for(i=0;i<vnnmap->size;i++){
1043                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1044         }
1045
1046         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1047         if (ret != 0) {
1048                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1049                 talloc_free(tmp_ctx);
1050                 return -1;
1051         }
1052         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1053
1054         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1055         if (ret != 0) {
1056                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1057                 talloc_free(tmp_ctx);
1058                 return -1;
1059         }
1060         printf("Recovery master:%d\n",recmaster);
1061
1062         talloc_free(tmp_ctx);
1063         return 0;
1064 }
1065
1066 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1067 {
1068         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1069         int i, ret;
1070         struct ctdb_node_map *nodemap=NULL;
1071         uint32_t * nodes;
1072         uint32_t pnn_mode, mypnn;
1073
1074         if (argc > 1) {
1075                 usage();
1076         }
1077
1078         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1079                               options.pnn, true, &nodes, &pnn_mode)) {
1080                 return -1;
1081         }
1082
1083         if (options.machinereadable) {
1084                 control_status_header_machine();
1085         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1086                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1087         }
1088
1089         mypnn = getpnn(ctdb);
1090
1091         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1092         if (ret != 0) {
1093                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1094                 talloc_free(tmp_ctx);
1095                 return -1;
1096         }
1097
1098         ret = 0;
1099
1100         for (i = 0; i < talloc_array_length(nodes); i++) {
1101                 if (options.machinereadable) {
1102                         ret |= control_status_1_machine(ctdb, mypnn,
1103                                                         &nodemap->nodes[nodes[i]]);
1104                 } else {
1105                         ret |= control_status_1_human(ctdb, mypnn,
1106                                                       &nodemap->nodes[nodes[i]]);
1107                 }
1108         }
1109
1110         talloc_free(tmp_ctx);
1111         return ret;
1112 }
1113
1114 struct natgw_node {
1115         struct natgw_node *next;
1116         const char *addr;
1117 };
1118
1119 static int find_natgw(struct ctdb_context *ctdb,
1120                        struct ctdb_node_map *nodemap, uint32_t flags,
1121                        uint32_t *pnn, const char **ip)
1122 {
1123         int i;
1124         uint32_t capabilities;
1125         int ret;
1126
1127         for (i=0;i<nodemap->num;i++) {
1128                 if (!(nodemap->nodes[i].flags & flags)) {
1129                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1130                                                         nodemap->nodes[i].pnn,
1131                                                         &capabilities);
1132                         if (ret != 0) {
1133                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1134                                                   nodemap->nodes[i].pnn));
1135                                 return -1;
1136                         }
1137                         if (!(capabilities&CTDB_CAP_NATGW)) {
1138                                 continue;
1139                         }
1140                         *pnn = nodemap->nodes[i].pnn;
1141                         *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
1142                         return 0;
1143                 }
1144         }
1145
1146         return 2; /* matches ENOENT */
1147 }
1148
1149 /*
1150   display the list of nodes belonging to this natgw configuration
1151  */
1152 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1153 {
1154         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1155         int i, ret;
1156         const char *natgw_list;
1157         int nlines;
1158         char **lines;
1159         struct natgw_node *natgw_nodes = NULL;
1160         struct natgw_node *natgw_node;
1161         struct ctdb_node_map *nodemap=NULL;
1162         uint32_t mypnn, pnn;
1163         const char *ip;
1164
1165         /* When we have some nodes that could be the NATGW, make a
1166          * series of attempts to find the first node that doesn't have
1167          * certain status flags set.
1168          */
1169         uint32_t exclude_flags[] = {
1170                 /* Look for a nice healthy node */
1171                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1172                 /* If not found, an UNHEALTHY/BANNED node will do */
1173                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1174                 /* If not found, a STOPPED node will do */
1175                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1176                 0,
1177         };
1178
1179         /* read the natgw nodes file into a linked list */
1180         natgw_list = getenv("CTDB_NATGW_NODES");
1181         if (natgw_list == NULL) {
1182                 natgw_list = talloc_asprintf(tmp_ctx, "%s/natgw_nodes",
1183                                              getenv("CTDB_BASE"));
1184                 if (natgw_list == NULL) {
1185                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1186                         exit(1);
1187                 }
1188         }
1189         lines = file_lines_load(natgw_list, &nlines, ctdb);
1190         if (lines == NULL) {
1191                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1192                 talloc_free(tmp_ctx);
1193                 return -1;
1194         }
1195         for (i=0;i<nlines;i++) {
1196                 char *node;
1197
1198                 node = lines[i];
1199                 /* strip leading spaces */
1200                 while((*node == ' ') || (*node == '\t')) {
1201                         node++;
1202                 }
1203                 if (*node == '#') {
1204                         continue;
1205                 }
1206                 if (strcmp(node, "") == 0) {
1207                         continue;
1208                 }
1209                 natgw_node = talloc(ctdb, struct natgw_node);
1210                 natgw_node->addr = talloc_strdup(natgw_node, node);
1211                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1212                 natgw_node->next = natgw_nodes;
1213                 natgw_nodes = natgw_node;
1214         }
1215
1216         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1217         if (ret != 0) {
1218                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1219                 talloc_free(tmp_ctx);
1220                 return -1;
1221         }
1222
1223         /* Trim the nodemap so it only includes connected nodes in the
1224          * current natgw group.
1225          */
1226         i=0;
1227         while(i<nodemap->num) {
1228                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1229                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1230                                 break;
1231                         }
1232                 }
1233
1234                 /* this node was not in the natgw so we just remove it from
1235                  * the list
1236                  */
1237                 if ((natgw_node == NULL) 
1238                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1239                         int j;
1240
1241                         for (j=i+1; j<nodemap->num; j++) {
1242                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1243                         }
1244                         nodemap->num--;
1245                         continue;
1246                 }
1247
1248                 i++;
1249         }
1250
1251         ret = 2; /* matches ENOENT */
1252         pnn = -1;
1253         ip = "0.0.0.0";
1254         for (i = 0; exclude_flags[i] != 0; i++) {
1255                 ret = find_natgw(ctdb, nodemap,
1256                                  exclude_flags[i],
1257                                  &pnn, &ip);
1258                 if (ret == -1) {
1259                         goto done;
1260                 }
1261                 if (ret == 0) {
1262                         break;
1263                 }
1264         }
1265
1266         if (options.machinereadable) {
1267                 printf(":Node:IP:\n");
1268                 printf(":%d:%s:\n", pnn, ip);
1269         } else {
1270                 printf("%d %s\n", pnn, ip);
1271         }
1272
1273         /* print the pruned list of nodes belonging to this natgw list */
1274         mypnn = getpnn(ctdb);
1275         if (options.machinereadable) {
1276                 control_status_header_machine();
1277         } else {
1278                 printf("Number of nodes:%d\n", nodemap->num);
1279         }
1280         for(i=0;i<nodemap->num;i++){
1281                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1282                         continue;
1283                 }
1284                 if (options.machinereadable) {
1285                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1286                 } else {
1287                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1288                 }
1289         }
1290
1291 done:
1292         talloc_free(tmp_ctx);
1293         return ret;
1294 }
1295
1296 /*
1297   display the status of the scripts for monitoring (or other events)
1298  */
1299 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1300                                     enum ctdb_eventscript_call type)
1301 {
1302         struct ctdb_scripts_wire *script_status;
1303         int ret, i;
1304
1305         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1308                 return ret;
1309         }
1310
1311         if (script_status == NULL) {
1312                 if (!options.machinereadable) {
1313                         printf("%s cycle never run\n",
1314                                ctdb_eventscript_call_names[type]);
1315                 }
1316                 return 0;
1317         }
1318
1319         if (!options.machinereadable) {
1320                 printf("%d scripts were executed last %s cycle\n",
1321                        script_status->num_scripts,
1322                        ctdb_eventscript_call_names[type]);
1323         }
1324         for (i=0; i<script_status->num_scripts; i++) {
1325                 const char *status = NULL;
1326
1327                 switch (script_status->scripts[i].status) {
1328                 case -ETIME:
1329                         status = "TIMEDOUT";
1330                         break;
1331                 case -ENOEXEC:
1332                         status = "DISABLED";
1333                         break;
1334                 case 0:
1335                         status = "OK";
1336                         break;
1337                 default:
1338                         if (script_status->scripts[i].status > 0)
1339                                 status = "ERROR";
1340                         break;
1341                 }
1342                 if (options.machinereadable) {
1343                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1344                                ctdb_eventscript_call_names[type],
1345                                script_status->scripts[i].name,
1346                                script_status->scripts[i].status,
1347                                status,
1348                                (long)script_status->scripts[i].start.tv_sec,
1349                                (long)script_status->scripts[i].start.tv_usec,
1350                                (long)script_status->scripts[i].finished.tv_sec,
1351                                (long)script_status->scripts[i].finished.tv_usec,
1352                                script_status->scripts[i].output);
1353                         continue;
1354                 }
1355                 if (status)
1356                         printf("%-20s Status:%s    ",
1357                                script_status->scripts[i].name, status);
1358                 else
1359                         /* Some other error, eg from stat. */
1360                         printf("%-20s Status:CANNOT RUN (%s)",
1361                                script_status->scripts[i].name,
1362                                strerror(-script_status->scripts[i].status));
1363
1364                 if (script_status->scripts[i].status >= 0) {
1365                         printf("Duration:%.3lf ",
1366                         timeval_delta(&script_status->scripts[i].finished,
1367                               &script_status->scripts[i].start));
1368                 }
1369                 if (script_status->scripts[i].status != -ENOEXEC) {
1370                         printf("%s",
1371                                ctime(&script_status->scripts[i].start.tv_sec));
1372                         if (script_status->scripts[i].status != 0) {
1373                                 printf("   OUTPUT:%s\n",
1374                                        script_status->scripts[i].output);
1375                         }
1376                 } else {
1377                         printf("\n");
1378                 }
1379         }
1380         return 0;
1381 }
1382
1383
1384 static int control_scriptstatus(struct ctdb_context *ctdb,
1385                                 int argc, const char **argv)
1386 {
1387         int ret;
1388         enum ctdb_eventscript_call type, min, max;
1389         const char *arg;
1390
1391         if (argc > 1) {
1392                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1393                 return -1;
1394         }
1395
1396         if (argc == 0)
1397                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1398         else
1399                 arg = argv[0];
1400
1401         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1402                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1403                         min = type;
1404                         max = type+1;
1405                         break;
1406                 }
1407         }
1408         if (type == CTDB_EVENT_MAX) {
1409                 if (strcmp(arg, "all") == 0) {
1410                         min = 0;
1411                         max = CTDB_EVENT_MAX;
1412                 } else {
1413                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1414                         return -1;
1415                 }
1416         }
1417
1418         if (options.machinereadable) {
1419                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1420         }
1421
1422         for (type = min; type < max; type++) {
1423                 ret = control_one_scriptstatus(ctdb, type);
1424                 if (ret != 0) {
1425                         return ret;
1426                 }
1427         }
1428
1429         return 0;
1430 }
1431
1432 /*
1433   enable an eventscript
1434  */
1435 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1436 {
1437         int ret;
1438
1439         if (argc < 1) {
1440                 usage();
1441         }
1442
1443         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1444         if (ret != 0) {
1445           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1446                 return ret;
1447         }
1448
1449         return 0;
1450 }
1451
1452 /*
1453   disable an eventscript
1454  */
1455 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1456 {
1457         int ret;
1458
1459         if (argc < 1) {
1460                 usage();
1461         }
1462
1463         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1464         if (ret != 0) {
1465           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1466                 return ret;
1467         }
1468
1469         return 0;
1470 }
1471
1472 /*
1473   display the pnn of the recovery master
1474  */
1475 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1476 {
1477         uint32_t recmaster;
1478         int ret;
1479
1480         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1481         if (ret != 0) {
1482                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1483                 return -1;
1484         }
1485         printf("%d\n",recmaster);
1486
1487         return 0;
1488 }
1489
1490 /*
1491   add a tickle to a public address
1492  */
1493 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1494 {
1495         struct ctdb_tcp_connection t;
1496         TDB_DATA data;
1497         int ret;
1498
1499         assert_single_node_only();
1500
1501         if (argc < 2) {
1502                 usage();
1503         }
1504
1505         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1506                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1507                 return -1;
1508         }
1509         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1510                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1511                 return -1;
1512         }
1513
1514         data.dptr = (uint8_t *)&t;
1515         data.dsize = sizeof(t);
1516
1517         /* tell all nodes about this tcp connection */
1518         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1519                            0, data, ctdb, NULL, NULL, NULL, NULL);
1520         if (ret != 0) {
1521                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1522                 return -1;
1523         }
1524         
1525         return 0;
1526 }
1527
1528
1529 /*
1530   delete a tickle from a node
1531  */
1532 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1533 {
1534         struct ctdb_tcp_connection t;
1535         TDB_DATA data;
1536         int ret;
1537
1538         assert_single_node_only();
1539
1540         if (argc < 2) {
1541                 usage();
1542         }
1543
1544         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1545                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1546                 return -1;
1547         }
1548         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1549                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1550                 return -1;
1551         }
1552
1553         data.dptr = (uint8_t *)&t;
1554         data.dsize = sizeof(t);
1555
1556         /* tell all nodes about this tcp connection */
1557         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1558                            0, data, ctdb, NULL, NULL, NULL, NULL);
1559         if (ret != 0) {
1560                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1561                 return -1;
1562         }
1563         
1564         return 0;
1565 }
1566
1567
1568 /*
1569   get a list of all tickles for this pnn
1570  */
1571 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1572 {
1573         struct ctdb_control_tcp_tickle_list *list;
1574         ctdb_sock_addr addr;
1575         int i, ret;
1576         unsigned port = 0;
1577
1578         assert_single_node_only();
1579
1580         if (argc < 1) {
1581                 usage();
1582         }
1583
1584         if (argc == 2) {
1585                 port = atoi(argv[1]);
1586         }
1587
1588         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1589                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1590                 return -1;
1591         }
1592
1593         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1594         if (ret == -1) {
1595                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1596                 return -1;
1597         }
1598
1599         if (options.machinereadable){
1600                 printf(":source ip:port:destination ip:port:\n");
1601                 for (i=0;i<list->tickles.num;i++) {
1602                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1603                                 continue;
1604                         }
1605                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1606                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1607                 }
1608         } else {
1609                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1610                 printf("Num tickles:%u\n", list->tickles.num);
1611                 for (i=0;i<list->tickles.num;i++) {
1612                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1613                                 continue;
1614                         }
1615                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1616                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1617                 }
1618         }
1619
1620         talloc_free(list);
1621         
1622         return 0;
1623 }
1624
1625
1626 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1627 {
1628         struct ctdb_all_public_ips *ips;
1629         struct ctdb_public_ip ip;
1630         int i, ret;
1631         uint32_t *nodes;
1632         uint32_t disable_time;
1633         TDB_DATA data;
1634         struct ctdb_node_map *nodemap=NULL;
1635         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1636
1637         disable_time = 30;
1638         data.dptr  = (uint8_t*)&disable_time;
1639         data.dsize = sizeof(disable_time);
1640         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1641         if (ret != 0) {
1642                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1643                 return -1;
1644         }
1645
1646
1647
1648         /* read the public ip list from the node */
1649         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1652                 talloc_free(tmp_ctx);
1653                 return -1;
1654         }
1655
1656         for (i=0;i<ips->num;i++) {
1657                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1658                         break;
1659                 }
1660         }
1661         if (i==ips->num) {
1662                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1663                         pnn, ctdb_addr_to_str(addr)));
1664                 talloc_free(tmp_ctx);
1665                 return -1;
1666         }
1667
1668         ip.pnn  = pnn;
1669         ip.addr = *addr;
1670
1671         data.dptr  = (uint8_t *)&ip;
1672         data.dsize = sizeof(ip);
1673
1674         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1675         if (ret != 0) {
1676                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1677                 talloc_free(tmp_ctx);
1678                 return ret;
1679         }
1680
1681         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1682         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1683                                         nodes, 0,
1684                                         LONGTIMELIMIT(),
1685                                         false, data,
1686                                         NULL, NULL,
1687                                         NULL);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1690                 talloc_free(tmp_ctx);
1691                 return -1;
1692         }
1693
1694         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1695         if (ret != 0) {
1696                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1697                 talloc_free(tmp_ctx);
1698                 return -1;
1699         }
1700
1701         /* update the recovery daemon so it now knows to expect the new
1702            node assignment for this ip.
1703         */
1704         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1707                 return -1;
1708         }
1709
1710         talloc_free(tmp_ctx);
1711         return 0;
1712 }
1713
1714
1715 /* 
1716  * scans all other nodes and returns a pnn for another node that can host this 
1717  * ip address or -1
1718  */
1719 static int
1720 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1721 {
1722         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1723         struct ctdb_all_public_ips *ips;
1724         struct ctdb_node_map *nodemap=NULL;
1725         int i, j, ret;
1726
1727         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1728         if (ret != 0) {
1729                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1730                 talloc_free(tmp_ctx);
1731                 return ret;
1732         }
1733
1734         for(i=0;i<nodemap->num;i++){
1735                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1736                         continue;
1737                 }
1738                 if (nodemap->nodes[i].pnn == options.pnn) {
1739                         continue;
1740                 }
1741
1742                 /* read the public ip list from this node */
1743                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1744                 if (ret != 0) {
1745                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1746                         return -1;
1747                 }
1748
1749                 for (j=0;j<ips->num;j++) {
1750                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1751                                 talloc_free(tmp_ctx);
1752                                 return nodemap->nodes[i].pnn;
1753                         }
1754                 }
1755                 talloc_free(ips);
1756         }
1757
1758         talloc_free(tmp_ctx);
1759         return -1;
1760 }
1761
1762 /* If pnn is -1 then try to find a node to move IP to... */
1763 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1764 {
1765         bool pnn_specified = (pnn == -1 ? false : true);
1766         int retries = 0;
1767
1768         while (retries < 5) {
1769                 if (!pnn_specified) {
1770                         pnn = find_other_host_for_public_ip(ctdb, addr);
1771                         if (pnn == -1) {
1772                                 return false;
1773                         }
1774                         DEBUG(DEBUG_NOTICE,
1775                               ("Trying to move public IP to node %u\n", pnn));
1776                 }
1777
1778                 if (move_ip(ctdb, addr, pnn) == 0) {
1779                         return true;
1780                 }
1781
1782                 sleep(3);
1783                 retries++;
1784         }
1785
1786         return false;
1787 }
1788
1789
1790 /*
1791   move/failover an ip address to a specific node
1792  */
1793 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1794 {
1795         uint32_t pnn;
1796         ctdb_sock_addr addr;
1797
1798         assert_single_node_only();
1799
1800         if (argc < 2) {
1801                 usage();
1802                 return -1;
1803         }
1804
1805         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1806                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1807                 return -1;
1808         }
1809
1810
1811         if (sscanf(argv[1], "%u", &pnn) != 1) {
1812                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1813                 return -1;
1814         }
1815
1816         if (!try_moveip(ctdb, &addr, pnn)) {
1817                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1818                 return -1;
1819         }
1820
1821         return 0;
1822 }
1823
1824 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1825 {
1826         TDB_DATA data;
1827
1828         data.dptr  = (uint8_t *)&pnn;
1829         data.dsize = sizeof(uint32_t);
1830         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1831                 DEBUG(DEBUG_ERR,
1832                       ("Failed to send message to force node %u to be a rebalancing target\n",
1833                        pnn));
1834                 return -1;
1835         }
1836
1837         return 0;
1838 }
1839
1840
1841 /*
1842   rebalance a node by setting it to allow failback and triggering a
1843   takeover run
1844  */
1845 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1846 {
1847         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1848         uint32_t *nodes;
1849         uint32_t pnn_mode;
1850         int i, ret;
1851
1852         assert_single_node_only();
1853
1854         if (argc > 1) {
1855                 usage();
1856         }
1857
1858         /* Determine the nodes where IPs need to be reloaded */
1859         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1860                               options.pnn, true, &nodes, &pnn_mode)) {
1861                 ret = -1;
1862                 goto done;
1863         }
1864
1865         for (i = 0; i < talloc_array_length(nodes); i++) {
1866                 if (!rebalance_node(ctdb, nodes[i])) {
1867                         ret = -1;
1868                 }
1869         }
1870
1871 done:
1872         talloc_free(tmp_ctx);
1873         return ret;
1874 }
1875
1876 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1877 {
1878         struct ctdb_public_ip ip;
1879         int ret;
1880         uint32_t *nodes;
1881         uint32_t disable_time;
1882         TDB_DATA data;
1883         struct ctdb_node_map *nodemap=NULL;
1884         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1885
1886         disable_time = 30;
1887         data.dptr  = (uint8_t*)&disable_time;
1888         data.dsize = sizeof(disable_time);
1889         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1890         if (ret != 0) {
1891                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1892                 return -1;
1893         }
1894
1895         ip.pnn  = -1;
1896         ip.addr = *addr;
1897
1898         data.dptr  = (uint8_t *)&ip;
1899         data.dsize = sizeof(ip);
1900
1901         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1902         if (ret != 0) {
1903                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1904                 talloc_free(tmp_ctx);
1905                 return ret;
1906         }
1907
1908         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1909         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1910                                         nodes, 0,
1911                                         LONGTIMELIMIT(),
1912                                         false, data,
1913                                         NULL, NULL,
1914                                         NULL);
1915         if (ret != 0) {
1916                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1917                 talloc_free(tmp_ctx);
1918                 return -1;
1919         }
1920
1921         talloc_free(tmp_ctx);
1922         return 0;
1923 }
1924
1925 /*
1926   release an ip form all nodes and have it re-assigned by recd
1927  */
1928 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1929 {
1930         ctdb_sock_addr addr;
1931
1932         assert_single_node_only();
1933
1934         if (argc < 1) {
1935                 usage();
1936                 return -1;
1937         }
1938
1939         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1940                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1941                 return -1;
1942         }
1943
1944         if (rebalance_ip(ctdb, &addr) != 0) {
1945                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1946                 return -1;
1947         }
1948
1949         return 0;
1950 }
1951
1952 static int getips_store_callback(void *param, void *data)
1953 {
1954         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1955         struct ctdb_all_public_ips *ips = param;
1956         int i;
1957
1958         i = ips->num++;
1959         ips->ips[i].pnn  = node_ip->pnn;
1960         ips->ips[i].addr = node_ip->addr;
1961         return 0;
1962 }
1963
1964 static int getips_count_callback(void *param, void *data)
1965 {
1966         uint32_t *count = param;
1967
1968         (*count)++;
1969         return 0;
1970 }
1971
1972 #define IP_KEYLEN       4
1973 static uint32_t *ip_key(ctdb_sock_addr *ip)
1974 {
1975         static uint32_t key[IP_KEYLEN];
1976
1977         bzero(key, sizeof(key));
1978
1979         switch (ip->sa.sa_family) {
1980         case AF_INET:
1981                 key[0]  = ip->ip.sin_addr.s_addr;
1982                 break;
1983         case AF_INET6: {
1984                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1985                 key[0]  = s6_a32[3];
1986                 key[1]  = s6_a32[2];
1987                 key[2]  = s6_a32[1];
1988                 key[3]  = s6_a32[0];
1989                 break;
1990         }
1991         default:
1992                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1993                 return key;
1994         }
1995
1996         return key;
1997 }
1998
1999 static void *add_ip_callback(void *parm, void *data)
2000 {
2001         return parm;
2002 }
2003
2004 static int
2005 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2006 {
2007         struct ctdb_all_public_ips *tmp_ips;
2008         struct ctdb_node_map *nodemap=NULL;
2009         trbt_tree_t *ip_tree;
2010         int i, j, len, ret;
2011         uint32_t count;
2012
2013         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2016                 return ret;
2017         }
2018
2019         ip_tree = trbt_create(tmp_ctx, 0);
2020
2021         for(i=0;i<nodemap->num;i++){
2022                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2023                         continue;
2024                 }
2025                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2026                         continue;
2027                 }
2028
2029                 /* read the public ip list from this node */
2030                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2031                 if (ret != 0) {
2032                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2033                         return -1;
2034                 }
2035         
2036                 for (j=0; j<tmp_ips->num;j++) {
2037                         struct ctdb_public_ip *node_ip;
2038
2039                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2040                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2041                         node_ip->addr = tmp_ips->ips[j].addr;
2042
2043                         trbt_insertarray32_callback(ip_tree,
2044                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2045                                 add_ip_callback,
2046                                 node_ip);
2047                 }
2048                 talloc_free(tmp_ips);
2049         }
2050
2051         /* traverse */
2052         count = 0;
2053         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2054
2055         len = offsetof(struct ctdb_all_public_ips, ips) + 
2056                 count*sizeof(struct ctdb_public_ip);
2057         tmp_ips = talloc_zero_size(tmp_ctx, len);
2058         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2059
2060         *ips = tmp_ips;
2061
2062         return 0;
2063 }
2064
2065
2066 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2067 {
2068         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2069
2070         event_add_timed(ctdb->ev, ctdb, 
2071                                 timeval_current_ofs(1, 0),
2072                                 ctdb_every_second, ctdb);
2073 }
2074
2075 struct srvid_reply_handler_data {
2076         bool done;
2077         bool wait_for_all;
2078         uint32_t *nodes;
2079         const char *srvid_str;
2080 };
2081
2082 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2083                                          uint64_t srvid,
2084                                          TDB_DATA data,
2085                                          void *private_data)
2086 {
2087         struct srvid_reply_handler_data *d =
2088                 (struct srvid_reply_handler_data *)private_data;
2089         int i;
2090         int32_t ret;
2091
2092         if (data.dsize != sizeof(ret)) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2094                 return;
2095         }
2096
2097         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2098         ret = *(int32_t *)data.dptr;
2099         if (ret < 0) {
2100                 DEBUG(DEBUG_ERR,
2101                       ("%s failed with result %d\n", d->srvid_str, ret));
2102                 return;
2103         }
2104
2105         if (!d->wait_for_all) {
2106                 d->done = true;
2107                 return;
2108         }
2109
2110         /* Wait for all replies */
2111         d->done = true;
2112         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2113                 if (d->nodes[i] == ret) {
2114                         DEBUG(DEBUG_INFO,
2115                               ("%s reply received from node %u\n",
2116                                d->srvid_str, ret));
2117                         d->nodes[i] = -1;
2118                 }
2119                 if (d->nodes[i] != -1) {
2120                         /* Found a node that hasn't yet replied */
2121                         d->done = false;
2122                 }
2123         }
2124 }
2125
2126 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2127  * or replies from all connected nodes.  arg is the data argument to
2128  * pass in the srvid_request structure - pass 0 if this isn't needed.
2129  */
2130 static int srvid_broadcast(struct ctdb_context *ctdb,
2131                            uint64_t srvid, uint32_t *arg,
2132                            const char *srvid_str, bool wait_for_all)
2133 {
2134         int ret;
2135         TDB_DATA data;
2136         uint32_t pnn;
2137         uint64_t reply_srvid;
2138         struct srvid_request request;
2139         struct srvid_request_data request_data;
2140         struct srvid_reply_handler_data reply_data;
2141         struct timeval tv;
2142
2143         ZERO_STRUCT(request);
2144
2145         /* Time ticks to enable timeouts to be processed */
2146         event_add_timed(ctdb->ev, ctdb, 
2147                                 timeval_current_ofs(1, 0),
2148                                 ctdb_every_second, ctdb);
2149
2150         pnn = ctdb_get_pnn(ctdb);
2151         reply_srvid = getpid();
2152
2153         if (arg == NULL) {
2154                 request.pnn = pnn;
2155                 request.srvid = reply_srvid;
2156
2157                 data.dptr = (uint8_t *)&request;
2158                 data.dsize = sizeof(request);
2159         } else {
2160                 request_data.pnn = pnn;
2161                 request_data.srvid = reply_srvid;
2162                 request_data.data = *arg;
2163
2164                 data.dptr = (uint8_t *)&request_data;
2165                 data.dsize = sizeof(request_data);
2166         }
2167
2168         /* Register message port for reply from recovery master */
2169         ctdb_client_set_message_handler(ctdb, reply_srvid,
2170                                         srvid_broadcast_reply_handler,
2171                                         &reply_data);
2172
2173         reply_data.wait_for_all = wait_for_all;
2174         reply_data.nodes = NULL;
2175         reply_data.srvid_str = srvid_str;
2176
2177 again:
2178         reply_data.done = false;
2179
2180         if (wait_for_all) {
2181                 struct ctdb_node_map *nodemap;
2182
2183                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2184                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2185                 if (ret != 0) {
2186                         DEBUG(DEBUG_ERR,
2187                               ("Unable to get nodemap from current node, try again\n"));
2188                         sleep(1);
2189                         goto again;
2190                 }
2191
2192                 if (reply_data.nodes != NULL) {
2193                         talloc_free(reply_data.nodes);
2194                 }
2195                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2196                                                            NULL, true);
2197
2198                 talloc_free(nodemap);
2199         }
2200
2201         /* Send to all connected nodes. Only recmaster replies */
2202         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2203                                        srvid, data);
2204         if (ret != 0) {
2205                 /* This can only happen if the socket is closed and
2206                  * there's no way to recover from that, so don't try
2207                  * again.
2208                  */
2209                 DEBUG(DEBUG_ERR,
2210                       ("Failed to send %s request to connected nodes\n",
2211                        srvid_str));
2212                 return -1;
2213         }
2214
2215         tv = timeval_current();
2216         /* This loop terminates the reply is received */
2217         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2218                 event_loop_once(ctdb->ev);
2219         }
2220
2221         if (!reply_data.done) {
2222                 DEBUG(DEBUG_NOTICE,
2223                       ("Still waiting for confirmation of %s\n", srvid_str));
2224                 sleep(1);
2225                 goto again;
2226         }
2227
2228         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2229
2230         talloc_free(reply_data.nodes);
2231
2232         return 0;
2233 }
2234
2235 static int ipreallocate(struct ctdb_context *ctdb)
2236 {
2237         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2238                                "IP reallocation", false);
2239 }
2240
2241
2242 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2243 {
2244         return ipreallocate(ctdb);
2245 }
2246
2247 /*
2248   add a public ip address to a node
2249  */
2250 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2251 {
2252         int i, ret;
2253         int len, retries = 0;
2254         unsigned mask;
2255         ctdb_sock_addr addr;
2256         struct ctdb_control_ip_iface *pub;
2257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2258         struct ctdb_all_public_ips *ips;
2259
2260
2261         if (argc != 2) {
2262                 talloc_free(tmp_ctx);
2263                 usage();
2264         }
2265
2266         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2267                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2268                 talloc_free(tmp_ctx);
2269                 return -1;
2270         }
2271
2272         /* read the public ip list from the node */
2273         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2274         if (ret != 0) {
2275                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2276                 talloc_free(tmp_ctx);
2277                 return -1;
2278         }
2279         for (i=0;i<ips->num;i++) {
2280                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2281                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2282                         return 0;
2283                 }
2284         }
2285
2286
2287
2288         /* Dont timeout. This command waits for an ip reallocation
2289            which sometimes can take wuite a while if there has
2290            been a recent recovery
2291         */
2292         alarm(0);
2293
2294         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2295         pub = talloc_size(tmp_ctx, len); 
2296         CTDB_NO_MEMORY(ctdb, pub);
2297
2298         pub->addr  = addr;
2299         pub->mask  = mask;
2300         pub->len   = strlen(argv[1])+1;
2301         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2302
2303         do {
2304                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2305                 if (ret != 0) {
2306                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2307                         sleep(3);
2308                         retries++;
2309                 }
2310         } while (retries < 5 && ret != 0);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2313                 talloc_free(tmp_ctx);
2314                 return ret;
2315         }
2316
2317         if (rebalance_node(ctdb, options.pnn) != 0) {
2318                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2319                 return ret;
2320         }
2321
2322         talloc_free(tmp_ctx);
2323         return 0;
2324 }
2325
2326 /*
2327   add a public ip address to a node
2328  */
2329 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2330 {
2331         ctdb_sock_addr addr;
2332
2333         if (argc != 1) {
2334                 usage();
2335         }
2336
2337         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2338                 printf("Badly formed ip : %s\n", argv[0]);
2339                 return -1;
2340         }
2341
2342         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2343
2344         return 0;
2345 }
2346
2347 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2348
2349 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2350 {
2351         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2352         struct ctdb_node_map *nodemap=NULL;
2353         struct ctdb_all_public_ips *ips;
2354         int ret, i, j;
2355
2356         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2357         if (ret != 0) {
2358                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2359                 return ret;
2360         }
2361
2362         /* remove it from the nodes that are not hosting the ip currently */
2363         for(i=0;i<nodemap->num;i++){
2364                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2365                         continue;
2366                 }
2367                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2368                 if (ret != 0) {
2369                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2370                         continue;
2371                 }
2372
2373                 for (j=0;j<ips->num;j++) {
2374                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2375                                 break;
2376                         }
2377                 }
2378                 if (j==ips->num) {
2379                         continue;
2380                 }
2381
2382                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2383                         continue;
2384                 }
2385
2386                 options.pnn = nodemap->nodes[i].pnn;
2387                 control_delip(ctdb, argc, argv);
2388         }
2389
2390
2391         /* remove it from every node (also the one hosting it) */
2392         for(i=0;i<nodemap->num;i++){
2393                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2394                         continue;
2395                 }
2396                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2397                 if (ret != 0) {
2398                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2399                         continue;
2400                 }
2401
2402                 for (j=0;j<ips->num;j++) {
2403                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2404                                 break;
2405                         }
2406                 }
2407                 if (j==ips->num) {
2408                         continue;
2409                 }
2410
2411                 options.pnn = nodemap->nodes[i].pnn;
2412                 control_delip(ctdb, argc, argv);
2413         }
2414
2415         talloc_free(tmp_ctx);
2416         return 0;
2417 }
2418         
2419 /*
2420   delete a public ip address from a node
2421  */
2422 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2423 {
2424         int i, ret;
2425         ctdb_sock_addr addr;
2426         struct ctdb_control_ip_iface pub;
2427         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2428         struct ctdb_all_public_ips *ips;
2429
2430         if (argc != 1) {
2431                 talloc_free(tmp_ctx);
2432                 usage();
2433         }
2434
2435         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2436                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2437                 return -1;
2438         }
2439
2440         if (options.pnn == CTDB_BROADCAST_ALL) {
2441                 return control_delip_all(ctdb, argc, argv, &addr);
2442         }
2443
2444         pub.addr  = addr;
2445         pub.mask  = 0;
2446         pub.len   = 0;
2447
2448         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2449         if (ret != 0) {
2450                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2451                 talloc_free(tmp_ctx);
2452                 return ret;
2453         }
2454         
2455         for (i=0;i<ips->num;i++) {
2456                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2457                         break;
2458                 }
2459         }
2460
2461         if (i==ips->num) {
2462                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2463                         ctdb_addr_to_str(&addr)));
2464                 talloc_free(tmp_ctx);
2465                 return -1;
2466         }
2467
2468         /* This is an optimisation.  If this node is hosting the IP
2469          * then try to move it somewhere else without invoking a full
2470          * takeover run.  We don't care if this doesn't work!
2471          */
2472         if (ips->ips[i].pnn == options.pnn) {
2473                 (void) try_moveip(ctdb, &addr, -1);
2474         }
2475
2476         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2477         if (ret != 0) {
2478                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2479                 talloc_free(tmp_ctx);
2480                 return ret;
2481         }
2482
2483         talloc_free(tmp_ctx);
2484         return 0;
2485 }
2486
2487 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2488                               int argc, const char **argv)
2489 {
2490         struct ctdb_control_killtcp *killtcp;
2491         int max_entries, current, i;
2492         struct timeval timeout;
2493         char line[128], src[128], dst[128];
2494         int linenum;
2495         TDB_DATA data;
2496         struct client_async_data *async_data;
2497         struct ctdb_client_control_state *state;
2498
2499         if (argc != 0) {
2500                 usage();
2501         }
2502
2503         linenum = 1;
2504         killtcp = NULL;
2505         max_entries = 0;
2506         current = 0;
2507         while (!feof(stdin)) {
2508                 if (fgets(line, sizeof(line), stdin) == NULL) {
2509                         continue;
2510                 }
2511
2512                 /* Silently skip empty lines */
2513                 if (line[0] == '\n') {
2514                         continue;
2515                 }
2516
2517                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2518                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2519                                           linenum, line));
2520                         talloc_free(killtcp);
2521                         return -1;
2522                 }
2523
2524                 if (current >= max_entries) {
2525                         max_entries += 1024;
2526                         killtcp = talloc_realloc(ctdb, killtcp,
2527                                                  struct ctdb_control_killtcp,
2528                                                  max_entries);
2529                         CTDB_NO_MEMORY(ctdb, killtcp);
2530                 }
2531
2532                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2533                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2534                                           linenum, src));
2535                         talloc_free(killtcp);
2536                         return -1;
2537                 }
2538
2539                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2540                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2541                                           linenum, dst));
2542                         talloc_free(killtcp);
2543                         return -1;
2544                 }
2545
2546                 current++;
2547         }
2548
2549         async_data = talloc_zero(ctdb, struct client_async_data);
2550         if (async_data == NULL) {
2551                 talloc_free(killtcp);
2552                 return -1;
2553         }
2554
2555         for (i = 0; i < current; i++) {
2556
2557                 data.dsize = sizeof(struct ctdb_control_killtcp);
2558                 data.dptr  = (unsigned char *)&killtcp[i];
2559
2560                 timeout = TIMELIMIT();
2561                 state = ctdb_control_send(ctdb, options.pnn, 0,
2562                                           CTDB_CONTROL_KILL_TCP, 0, data,
2563                                           async_data, &timeout, NULL);
2564
2565                 if (state == NULL) {
2566                         DEBUG(DEBUG_ERR,
2567                               ("Failed to call async killtcp control to node %u\n",
2568                                options.pnn));
2569                         talloc_free(killtcp);
2570                         return -1;
2571                 }
2572                 
2573                 ctdb_client_async_add(async_data, state);
2574         }
2575
2576         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2577                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2578                 talloc_free(killtcp);
2579                 return -1;
2580         }
2581
2582         talloc_free(killtcp);
2583         return 0;
2584 }
2585
2586
2587 /*
2588   kill a tcp connection
2589  */
2590 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2591 {
2592         int ret;
2593         struct ctdb_control_killtcp killtcp;
2594
2595         assert_single_node_only();
2596
2597         if (argc == 0) {
2598                 return kill_tcp_from_file(ctdb, argc, argv);
2599         }
2600
2601         if (argc < 2) {
2602                 usage();
2603         }
2604
2605         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2606                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2607                 return -1;
2608         }
2609
2610         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2611                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2612                 return -1;
2613         }
2614
2615         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2616         if (ret != 0) {
2617                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2618                 return ret;
2619         }
2620
2621         return 0;
2622 }
2623
2624
2625 /*
2626   send a gratious arp
2627  */
2628 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2629 {
2630         int ret;
2631         ctdb_sock_addr addr;
2632
2633         assert_single_node_only();
2634
2635         if (argc < 2) {
2636                 usage();
2637         }
2638
2639         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2640                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2641                 return -1;
2642         }
2643
2644         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2645         if (ret != 0) {
2646                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2647                 return ret;
2648         }
2649
2650         return 0;
2651 }
2652
2653 /*
2654   register a server id
2655  */
2656 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2657 {
2658         int ret;
2659         struct ctdb_server_id server_id;
2660
2661         if (argc < 3) {
2662                 usage();
2663         }
2664
2665         server_id.pnn       = strtoul(argv[0], NULL, 0);
2666         server_id.type      = strtoul(argv[1], NULL, 0);
2667         server_id.server_id = strtoul(argv[2], NULL, 0);
2668
2669         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2670         if (ret != 0) {
2671                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2672                 return ret;
2673         }
2674         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2675         sleep(999);
2676         return -1;
2677 }
2678
2679 /*
2680   unregister a server id
2681  */
2682 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2683 {
2684         int ret;
2685         struct ctdb_server_id server_id;
2686
2687         if (argc < 3) {
2688                 usage();
2689         }
2690
2691         server_id.pnn       = strtoul(argv[0], NULL, 0);
2692         server_id.type      = strtoul(argv[1], NULL, 0);
2693         server_id.server_id = strtoul(argv[2], NULL, 0);
2694
2695         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2696         if (ret != 0) {
2697                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2698                 return ret;
2699         }
2700         return -1;
2701 }
2702
2703 /*
2704   check if a server id exists
2705  */
2706 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2707 {
2708         uint32_t status;
2709         int ret;
2710         struct ctdb_server_id server_id;
2711
2712         if (argc < 3) {
2713                 usage();
2714         }
2715
2716         server_id.pnn       = strtoul(argv[0], NULL, 0);
2717         server_id.type      = strtoul(argv[1], NULL, 0);
2718         server_id.server_id = strtoul(argv[2], NULL, 0);
2719
2720         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2721         if (ret != 0) {
2722                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2723                 return ret;
2724         }
2725
2726         if (status) {
2727                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2728         } else {
2729                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2730         }
2731         return 0;
2732 }
2733
2734 /*
2735   get a list of all server ids that are registered on a node
2736  */
2737 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2738 {
2739         int i, ret;
2740         struct ctdb_server_id_list *server_ids;
2741
2742         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2743         if (ret != 0) {
2744                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2745                 return ret;
2746         }
2747
2748         for (i=0; i<server_ids->num; i++) {
2749                 printf("Server id %d:%d:%d\n", 
2750                         server_ids->server_ids[i].pnn, 
2751                         server_ids->server_ids[i].type, 
2752                         server_ids->server_ids[i].server_id); 
2753         }
2754
2755         return -1;
2756 }
2757
2758 /*
2759   check if a server id exists
2760  */
2761 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2762 {
2763         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2764         uint64_t *ids;
2765         uint8_t *result;
2766         int i;
2767
2768         if (argc < 1) {
2769                 talloc_free(tmp_ctx);
2770                 usage();
2771         }
2772
2773         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2774         result = talloc_array(tmp_ctx, uint8_t, argc);
2775
2776         for (i = 0; i < argc; i++) {
2777                 ids[i] = strtoull(argv[i], NULL, 0);
2778         }
2779
2780         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2781                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2782                                   options.pnn));
2783                 talloc_free(tmp_ctx);
2784                 return -1;
2785         }
2786
2787         for (i=0; i < argc; i++) {
2788                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2789                        result[i] ? "exists" : "does not exist");
2790         }
2791
2792         talloc_free(tmp_ctx);
2793         return 0;
2794 }
2795
2796 /*
2797   send a tcp tickle ack
2798  */
2799 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2800 {
2801         int ret;
2802         ctdb_sock_addr  src, dst;
2803
2804         if (argc < 2) {
2805                 usage();
2806         }
2807
2808         if (!parse_ip_port(argv[0], &src)) {
2809                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2810                 return -1;
2811         }
2812
2813         if (!parse_ip_port(argv[1], &dst)) {
2814                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2815                 return -1;
2816         }
2817
2818         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2819         if (ret==0) {
2820                 return 0;
2821         }
2822         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2823
2824         return -1;
2825 }
2826
2827
2828 /*
2829   display public ip status
2830  */
2831 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2832 {
2833         int i, ret;
2834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2835         struct ctdb_all_public_ips *ips;
2836
2837         if (options.pnn == CTDB_BROADCAST_ALL) {
2838                 /* read the list of public ips from all nodes */
2839                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2840         } else {
2841                 /* read the public ip list from this node */
2842                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2843         }
2844         if (ret != 0) {
2845                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2846                 talloc_free(tmp_ctx);
2847                 return ret;
2848         }
2849
2850         if (options.machinereadable){
2851                 printf(":Public IP:Node:");
2852                 if (options.verbose){
2853                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2854                 }
2855                 printf("\n");
2856         } else {
2857                 if (options.pnn == CTDB_BROADCAST_ALL) {
2858                         printf("Public IPs on ALL nodes\n");
2859                 } else {
2860                         printf("Public IPs on node %u\n", options.pnn);
2861                 }
2862         }
2863
2864         for (i=1;i<=ips->num;i++) {
2865                 struct ctdb_control_public_ip_info *info = NULL;
2866                 int32_t pnn;
2867                 char *aciface = NULL;
2868                 char *avifaces = NULL;
2869                 char *cifaces = NULL;
2870
2871                 if (options.pnn == CTDB_BROADCAST_ALL) {
2872                         pnn = ips->ips[ips->num-i].pnn;
2873                 } else {
2874                         pnn = options.pnn;
2875                 }
2876
2877                 if (pnn != -1) {
2878                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2879                                                    &ips->ips[ips->num-i].addr, &info);
2880                 } else {
2881                         ret = -1;
2882                 }
2883
2884                 if (ret == 0) {
2885                         int j;
2886                         for (j=0; j < info->num; j++) {
2887                                 if (cifaces == NULL) {
2888                                         cifaces = talloc_strdup(info,
2889                                                                 info->ifaces[j].name);
2890                                 } else {
2891                                         cifaces = talloc_asprintf_append(cifaces,
2892                                                                          ",%s",
2893                                                                          info->ifaces[j].name);
2894                                 }
2895
2896                                 if (info->active_idx == j) {
2897                                         aciface = info->ifaces[j].name;
2898                                 }
2899
2900                                 if (info->ifaces[j].link_state == 0) {
2901                                         continue;
2902                                 }
2903
2904                                 if (avifaces == NULL) {
2905                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2906                                 } else {
2907                                         avifaces = talloc_asprintf_append(avifaces,
2908                                                                           ",%s",
2909                                                                           info->ifaces[j].name);
2910                                 }
2911                         }
2912                 }
2913
2914                 if (options.machinereadable){
2915                         printf(":%s:%d:",
2916                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2917                                 ips->ips[ips->num-i].pnn);
2918                         if (options.verbose){
2919                                 printf("%s:%s:%s:",
2920                                         aciface?aciface:"",
2921                                         avifaces?avifaces:"",
2922                                         cifaces?cifaces:"");
2923                         }
2924                         printf("\n");
2925                 } else {
2926                         if (options.verbose) {
2927                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2928                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2929                                         ips->ips[ips->num-i].pnn,
2930                                         aciface?aciface:"",
2931                                         avifaces?avifaces:"",
2932                                         cifaces?cifaces:"");
2933                         } else {
2934                                 printf("%s %d\n",
2935                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2936                                         ips->ips[ips->num-i].pnn);
2937                         }
2938                 }
2939                 talloc_free(info);
2940         }
2941
2942         talloc_free(tmp_ctx);
2943         return 0;
2944 }
2945
2946 /*
2947   public ip info
2948  */
2949 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2950 {
2951         int i, ret;
2952         ctdb_sock_addr addr;
2953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2954         struct ctdb_control_public_ip_info *info;
2955
2956         if (argc != 1) {
2957                 talloc_free(tmp_ctx);
2958                 usage();
2959         }
2960
2961         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2962                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2963                 return -1;
2964         }
2965
2966         /* read the public ip info from this node */
2967         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2968                                            tmp_ctx, &addr, &info);
2969         if (ret != 0) {
2970                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2971                                   argv[0], options.pnn));
2972                 talloc_free(tmp_ctx);
2973                 return ret;
2974         }
2975
2976         printf("Public IP[%s] info on node %u\n",
2977                ctdb_addr_to_str(&info->ip.addr),
2978                options.pnn);
2979
2980         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2981                ctdb_addr_to_str(&info->ip.addr),
2982                info->ip.pnn, info->num);
2983
2984         for (i=0; i<info->num; i++) {
2985                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2986
2987                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2988                        i+1, info->ifaces[i].name,
2989                        info->ifaces[i].link_state?"up":"down",
2990                        (unsigned int)info->ifaces[i].references,
2991                        (i==info->active_idx)?" (active)":"");
2992         }
2993
2994         talloc_free(tmp_ctx);
2995         return 0;
2996 }
2997
2998 /*
2999   display interfaces status
3000  */
3001 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3002 {
3003         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3004         int i;
3005         struct ctdb_control_get_ifaces *ifaces;
3006         int ret;
3007
3008         /* read the public ip list from this node */
3009         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3010         if (ret != 0) {
3011                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3012                                   options.pnn));
3013                 talloc_free(tmp_ctx);
3014                 return -1;
3015         }
3016
3017         if (options.machinereadable){
3018                 printf(":Name:LinkStatus:References:\n");
3019         } else {
3020                 printf("Interfaces on node %u\n", options.pnn);
3021         }
3022
3023         for (i=0; i<ifaces->num; i++) {
3024                 if (options.machinereadable){
3025                         printf(":%s:%s:%u\n",
3026                                ifaces->ifaces[i].name,
3027                                ifaces->ifaces[i].link_state?"1":"0",
3028                                (unsigned int)ifaces->ifaces[i].references);
3029                 } else {
3030                         printf("name:%s link:%s references:%u\n",
3031                                ifaces->ifaces[i].name,
3032                                ifaces->ifaces[i].link_state?"up":"down",
3033                                (unsigned int)ifaces->ifaces[i].references);
3034                 }
3035         }
3036
3037         talloc_free(tmp_ctx);
3038         return 0;
3039 }
3040
3041
3042 /*
3043   set link status of an interface
3044  */
3045 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3046 {
3047         int ret;
3048         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3049         struct ctdb_control_iface_info info;
3050
3051         ZERO_STRUCT(info);
3052
3053         if (argc != 2) {
3054                 usage();
3055         }
3056
3057         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3058                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3059                                   argv[0]));
3060                 talloc_free(tmp_ctx);
3061                 return -1;
3062         }
3063         strcpy(info.name, argv[0]);
3064
3065         if (strcmp(argv[1], "up") == 0) {
3066                 info.link_state = 1;
3067         } else if (strcmp(argv[1], "down") == 0) {
3068                 info.link_state = 0;
3069         } else {
3070                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3071                                   argv[1]));
3072                 talloc_free(tmp_ctx);
3073                 return -1;
3074         }
3075
3076         /* read the public ip list from this node */
3077         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3078                                    tmp_ctx, &info);
3079         if (ret != 0) {
3080                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3081                                   argv[0], options.pnn));
3082                 talloc_free(tmp_ctx);
3083                 return ret;
3084         }
3085
3086         talloc_free(tmp_ctx);
3087         return 0;
3088 }
3089
3090 /*
3091   display pid of a ctdb daemon
3092  */
3093 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3094 {
3095         uint32_t pid;
3096         int ret;
3097
3098         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3099         if (ret != 0) {
3100                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3101                 return ret;
3102         }
3103         printf("Pid:%d\n", pid);
3104
3105         return 0;
3106 }
3107
3108 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3109
3110 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3111                                               void *data,
3112                                               update_flags_handler_t handler,
3113                                               uint32_t flag,
3114                                               const char *desc,
3115                                               bool set_flag)
3116 {
3117         struct ctdb_node_map *nodemap = NULL;
3118         bool flag_is_set;
3119         int ret;
3120
3121         /* Check if the node is already in the desired state */
3122         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3123         if (ret != 0) {
3124                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3125                 exit(10);
3126         }
3127         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3128         if (set_flag == flag_is_set) {
3129                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3130                                      (set_flag ? "already" : "not"), desc));
3131                 return 0;
3132         }
3133
3134         do {
3135                 if (!handler(ctdb, data)) {
3136                         DEBUG(DEBUG_WARNING,
3137                               ("Failed to send control to set state %s on node %u, try again\n",
3138                                desc, options.pnn));
3139                 }
3140
3141                 sleep(1);
3142
3143                 /* Read the nodemap and verify the change took effect.
3144                  * Even if the above control/hanlder timed out then it
3145                  * could still have worked!
3146                  */
3147                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3148                                          ctdb, &nodemap);
3149                 if (ret != 0) {
3150                         DEBUG(DEBUG_WARNING,
3151                               ("Unable to get nodemap from local node, try again\n"));
3152                 }
3153                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3154         } while (nodemap == NULL || (set_flag != flag_is_set));
3155
3156         return ipreallocate(ctdb);
3157 }
3158
3159 /* Administratively disable a node */
3160 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3161 {
3162         int ret;
3163
3164         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3165                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3166         return ret == 0;
3167 }
3168
3169 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3170 {
3171         return update_flags_and_ipreallocate(ctdb, NULL,
3172                                                   update_flags_disabled,
3173                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3174                                                   "disabled",
3175                                                   true /* set_flag*/);
3176 }
3177
3178 /* Administratively re-enable a node */
3179 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3180 {
3181         int ret;
3182
3183         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3184                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3185         return ret == 0;
3186 }
3187
3188 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3189 {
3190         return update_flags_and_ipreallocate(ctdb, NULL,
3191                                                   update_flags_not_disabled,
3192                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3193                                                   "disabled",
3194                                                   false /* set_flag*/);
3195 }
3196
3197 /* Stop a node */
3198 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3199 {
3200         int ret;
3201
3202         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3203
3204         return ret == 0;
3205 }
3206
3207 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3208 {
3209         return update_flags_and_ipreallocate(ctdb, NULL,
3210                                                   update_flags_stopped,
3211                                                   NODE_FLAGS_STOPPED,
3212                                                   "stopped",
3213                                                   true /* set_flag*/);
3214 }
3215
3216 /* Continue a stopped node */
3217 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3218 {
3219         int ret;
3220
3221         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3222
3223         return ret == 0;
3224 }
3225
3226 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3227 {
3228         return update_flags_and_ipreallocate(ctdb, NULL,
3229                                                   update_flags_not_stopped,
3230                                                   NODE_FLAGS_STOPPED,
3231                                                   "stopped",
3232                                                   false /* set_flag */);
3233 }
3234
3235 static uint32_t get_generation(struct ctdb_context *ctdb)
3236 {
3237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3238         struct ctdb_vnn_map *vnnmap=NULL;
3239         int ret;
3240         uint32_t generation;
3241
3242         /* wait until the recmaster is not in recovery mode */
3243         while (1) {
3244                 uint32_t recmode, recmaster;
3245                 
3246                 if (vnnmap != NULL) {
3247                         talloc_free(vnnmap);
3248                         vnnmap = NULL;
3249                 }
3250
3251                 /* get the recmaster */
3252                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3253                 if (ret != 0) {
3254                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3255                         talloc_free(tmp_ctx);
3256                         exit(10);
3257                 }
3258
3259                 /* get recovery mode */
3260                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3261                 if (ret != 0) {
3262                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3263                         talloc_free(tmp_ctx);
3264                         exit(10);
3265                 }
3266
3267                 /* get the current generation number */
3268                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3269                 if (ret != 0) {
3270                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3271                         talloc_free(tmp_ctx);
3272                         exit(10);
3273                 }
3274
3275                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3276                         generation = vnnmap->generation;
3277                         talloc_free(tmp_ctx);
3278                         return generation;
3279                 }
3280                 sleep(1);
3281         }
3282 }
3283
3284 /* Ban a node */
3285 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3286 {
3287         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3288         int ret;
3289
3290         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3291
3292         return ret == 0;
3293 }
3294
3295 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3296 {
3297         struct ctdb_ban_time bantime;
3298
3299         if (argc < 1) {
3300                 usage();
3301         }
3302         
3303         bantime.pnn  = options.pnn;
3304         bantime.time = strtoul(argv[0], NULL, 0);
3305
3306         if (bantime.time == 0) {
3307                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3308                 return -1;
3309         }
3310
3311         return update_flags_and_ipreallocate(ctdb, &bantime,
3312                                                   update_state_banned,
3313                                                   NODE_FLAGS_BANNED,
3314                                                   "banned",
3315                                                   true /* set_flag*/);
3316 }
3317
3318
3319 /* Unban a node */
3320 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3321 {
3322         struct ctdb_ban_time bantime;
3323
3324         bantime.pnn  = options.pnn;
3325         bantime.time = 0;
3326
3327         return update_flags_and_ipreallocate(ctdb, &bantime,
3328                                                   update_state_banned,
3329                                                   NODE_FLAGS_BANNED,
3330                                                   "banned",
3331                                                   false /* set_flag*/);
3332 }
3333
3334 /*
3335   show ban information for a node
3336  */
3337 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3338 {
3339         int ret;
3340         struct ctdb_node_map *nodemap=NULL;
3341         struct ctdb_ban_time *bantime;
3342
3343         /* verify the node exists */
3344         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3345         if (ret != 0) {
3346                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3347                 return ret;
3348         }
3349
3350         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3351         if (ret != 0) {
3352                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3353                 return -1;
3354         }       
3355
3356         if (bantime->time == 0) {
3357                 printf("Node %u is not banned\n", bantime->pnn);
3358         } else {
3359                 printf("Node %u is banned, %d seconds remaining\n",
3360                        bantime->pnn, bantime->time);
3361         }
3362
3363         return 0;
3364 }
3365
3366 /*
3367   shutdown a daemon
3368  */
3369 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3370 {
3371         int ret;
3372
3373         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3374         if (ret != 0) {
3375                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3376                 return ret;
3377         }
3378
3379         return 0;
3380 }
3381
3382 /*
3383   trigger a recovery
3384  */
3385 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3386 {
3387         int ret;
3388         uint32_t generation, next_generation;
3389         bool force;
3390
3391         /* "force" option ignores freeze failure and forces recovery */
3392         force = (argc == 1) && (strcasecmp(argv[0], "force") == 0);
3393
3394         /* record the current generation number */
3395         generation = get_generation(ctdb);
3396
3397         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3398         if (ret != 0) {
3399                 if (!force) {
3400                         DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3401                         return ret;
3402                 }
3403                 DEBUG(DEBUG_WARNING, ("Unable to freeze node but proceeding because \"force\" option given\n"));
3404         }
3405
3406         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3407         if (ret != 0) {
3408                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3409                 return ret;
3410         }
3411
3412         /* wait until we are in a new generation */
3413         while (1) {
3414                 next_generation = get_generation(ctdb);
3415                 if (next_generation != generation) {
3416                         return 0;
3417                 }
3418                 sleep(1);
3419         }
3420
3421         return 0;
3422 }
3423
3424
3425 /*
3426   display monitoring mode of a remote node
3427  */
3428 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3429 {
3430         uint32_t monmode;
3431         int ret;
3432
3433         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3434         if (ret != 0) {
3435                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3436                 return ret;
3437         }
3438         if (!options.machinereadable){
3439                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3440         } else {
3441                 printf(":mode:\n");
3442                 printf(":%d:\n",monmode);
3443         }
3444         return 0;
3445 }
3446
3447
3448 /*
3449   display capabilities of a remote node
3450  */
3451 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3452 {
3453         uint32_t capabilities;
3454         int ret;
3455
3456         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3457         if (ret != 0) {
3458                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3459                 return -1;
3460         }
3461         
3462         if (!options.machinereadable){
3463                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3464                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3465                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3466                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3467         } else {
3468                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3469                 printf(":%d:%d:%d:%d:\n",
3470                         !!(capabilities&CTDB_CAP_RECMASTER),
3471                         !!(capabilities&CTDB_CAP_LMASTER),
3472                         !!(capabilities&CTDB_CAP_LVS),
3473                         !!(capabilities&CTDB_CAP_NATGW));
3474         }
3475         return 0;
3476 }
3477
3478 /*
3479   display lvs configuration
3480  */
3481 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3482 {
3483         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3484         uint32_t *capabilities;
3485         struct ctdb_node_map *nodemap=NULL;
3486         int i, ret;
3487         int healthy_count = 0;
3488
3489         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
3490         if (ret != 0) {
3491                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3492                 talloc_free(tmp_ctx);
3493                 return -1;
3494         }
3495
3496         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3497         CTDB_NO_MEMORY(ctdb, capabilities);
3498         
3499         ret = 0;
3500
3501         /* collect capabilities for all connected nodes */
3502         for (i=0; i<nodemap->num; i++) {
3503                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3504                         continue;
3505                 }
3506                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3507                         continue;
3508                 }
3509
3510                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3511                 if (ret != 0) {
3512                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3513                         ret = -1;
3514                         goto done;
3515                 }
3516
3517                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3518                         continue;
3519                 }
3520
3521                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3522                         healthy_count++;
3523                 }
3524         }
3525
3526         /* Print all LVS nodes */
3527         for (i=0; i<nodemap->num; i++) {
3528                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3529                         continue;
3530                 }
3531                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3532                         continue;
3533                 }
3534                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3535                         continue;
3536                 }
3537
3538                 if (healthy_count != 0) {
3539                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3540                                 continue;
3541                         }
3542                 }
3543
3544                 printf("%d:%s\n", i, 
3545                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3546         }
3547
3548 done:
3549         talloc_free(tmp_ctx);
3550         return ret;
3551 }
3552
3553 /*
3554   display who is the lvs master
3555  */
3556 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3557 {
3558         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3559         uint32_t *capabilities;
3560         struct ctdb_node_map *nodemap=NULL;
3561         int i, ret;
3562         int healthy_count = 0;
3563
3564         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
3565         if (ret != 0) {
3566                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3567                 talloc_free(tmp_ctx);
3568                 return -1;
3569         }
3570
3571         capabilities = talloc_array(tmp_ctx, uint32_t, nodemap->num);
3572         if (capabilities == NULL) {
3573                 talloc_free(tmp_ctx);
3574                 CTDB_NO_MEMORY(ctdb, capabilities);
3575         }
3576
3577         /* collect capabilities for all connected nodes */
3578         for (i=0; i<nodemap->num; i++) {
3579                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3580                         continue;
3581                 }
3582                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3583                         continue;
3584                 }
3585         
3586                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3587                 if (ret != 0) {
3588                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3589                         ret = -1;
3590                         goto done;
3591                 }
3592
3593                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3594                         continue;
3595                 }
3596
3597                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3598                         healthy_count++;
3599                 }
3600         }
3601
3602         ret = -1;
3603
3604         /* find and show the lvsmaster */
3605         for (i=0; i<nodemap->num; i++) {
3606                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3607                         continue;
3608                 }
3609                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3610                         continue;
3611                 }
3612                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3613                         continue;
3614                 }
3615
3616                 if (healthy_count != 0) {
3617                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3618                                 continue;
3619                         }
3620                 }
3621
3622                 if (options.machinereadable){
3623                         printf("%d\n", i);
3624                 } else {
3625                         printf("Node %d is LVS master\n", i);
3626                 }
3627                 ret = 0;
3628                 goto done;
3629         }
3630
3631         printf("There is no LVS master\n");
3632 done:
3633         talloc_free(tmp_ctx);
3634         return ret;
3635 }
3636
3637 /*
3638   disable monitoring on a  node
3639  */
3640 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3641 {
3642         
3643         int ret;
3644
3645         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3646         if (ret != 0) {
3647                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3648                 return ret;
3649         }
3650         printf("Monitoring mode:%s\n","DISABLED");
3651
3652         return 0;
3653 }
3654
3655 /*
3656   enable monitoring on a  node
3657  */
3658 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3659 {
3660         
3661         int ret;
3662
3663         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3664         if (ret != 0) {
3665                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3666                 return ret;
3667         }
3668         printf("Monitoring mode:%s\n","ACTIVE");
3669
3670         return 0;
3671 }
3672
3673 /*
3674   display remote list of keys/data for a db
3675  */
3676 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3677 {
3678         const char *db_name;
3679         struct ctdb_db_context *ctdb_db;
3680         int ret;
3681         struct ctdb_dump_db_context c;
3682         uint8_t flags;
3683
3684         if (argc < 1) {
3685                 usage();
3686         }
3687
3688         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3689                 return -1;
3690         }
3691
3692         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3693         if (ctdb_db == NULL) {
3694                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3695                 return -1;
3696         }
3697
3698         if (options.printlmaster) {
3699                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3700                                           ctdb, &ctdb->vnn_map);
3701                 if (ret != 0) {
3702                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3703                                           options.pnn));
3704                         return ret;
3705                 }
3706         }
3707
3708         ZERO_STRUCT(c);
3709         c.f = stdout;
3710         c.printemptyrecords = (bool)options.printemptyrecords;
3711         c.printdatasize = (bool)options.printdatasize;
3712         c.printlmaster = (bool)options.printlmaster;
3713         c.printhash = (bool)options.printhash;
3714         c.printrecordflags = (bool)options.printrecordflags;
3715
3716         /* traverse and dump the cluster tdb */
3717         ret = ctdb_dump_db(ctdb_db, &c);
3718         if (ret == -1) {
3719                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3720                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3721                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3722                                   db_name));
3723                 return -1;
3724         }
3725         talloc_free(ctdb_db);
3726
3727         printf("Dumped %d records\n", ret);
3728         return 0;
3729 }
3730
3731 struct cattdb_data {
3732         struct ctdb_context *ctdb;
3733         uint32_t count;
3734 };
3735
3736 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3737 {
3738         struct cattdb_data *d = private_data;
3739         struct ctdb_dump_db_context c;
3740
3741         d->count++;
3742
3743         ZERO_STRUCT(c);
3744         c.f = stdout;
3745         c.printemptyrecords = (bool)options.printemptyrecords;
3746         c.printdatasize = (bool)options.printdatasize;
3747         c.printlmaster = false;
3748         c.printhash = (bool)options.printhash;
3749         c.printrecordflags = true;
3750
3751         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3752 }
3753
3754 /*
3755   cat the local tdb database using same format as catdb
3756  */
3757 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3758 {
3759         const char *db_name;
3760         struct ctdb_db_context *ctdb_db;
3761         struct cattdb_data d;
3762         uint8_t flags;
3763
3764         if (argc < 1) {
3765                 usage();
3766         }
3767
3768         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3769                 return -1;
3770         }
3771
3772         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3773         if (ctdb_db == NULL) {
3774                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3775                 return -1;
3776         }
3777
3778         /* traverse the local tdb */
3779         d.count = 0;
3780         d.ctdb  = ctdb;
3781         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3782                 printf("Failed to cattdb data\n");
3783                 exit(10);
3784         }
3785         talloc_free(ctdb_db);
3786
3787         printf("Dumped %d records\n", d.count);
3788         return 0;
3789 }
3790
3791 /*
3792   display the content of a database key
3793  */
3794 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3795 {
3796         const char *db_name;
3797         struct ctdb_db_context *ctdb_db;
3798         struct ctdb_record_handle *h;
3799         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3800         TDB_DATA key, data;
3801         uint8_t flags;
3802
3803         if (argc < 2) {
3804                 usage();
3805         }
3806
3807         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3808                 return -1;
3809         }
3810
3811         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3812         if (ctdb_db == NULL) {
3813                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3814                 return -1;
3815         }
3816
3817         key.dptr  = discard_const(argv[1]);
3818         key.dsize = strlen((char *)key.dptr);
3819
3820         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3821         if (h == NULL) {
3822                 printf("Failed to fetch record '%s' on node %d\n", 
3823                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3824                 talloc_free(tmp_ctx);
3825                 exit(10);
3826         }
3827
3828         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3829
3830         talloc_free(ctdb_db);
3831         talloc_free(tmp_ctx);
3832         return 0;
3833 }
3834
3835 /*
3836   display the content of a database key
3837  */
3838 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3839 {
3840         const char *db_name;
3841         struct ctdb_db_context *ctdb_db;
3842         struct ctdb_record_handle *h;
3843         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3844         TDB_DATA key, data;
3845         uint8_t flags;
3846
3847         if (argc < 3) {
3848                 usage();
3849         }
3850
3851         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3852                 return -1;
3853         }
3854
3855         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3856         if (ctdb_db == NULL) {
3857                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3858                 return -1;
3859         }
3860
3861         key.dptr  = discard_const(argv[1]);
3862         key.dsize = strlen((char *)key.dptr);
3863
3864         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3865         if (h == NULL) {
3866                 printf("Failed to fetch record '%s' on node %d\n", 
3867                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3868                 talloc_free(tmp_ctx);
3869                 exit(10);
3870         }
3871
3872         data.dptr  = discard_const(argv[2]);
3873         data.dsize = strlen((char *)data.dptr);
3874
3875         if (ctdb_record_store(h, data) != 0) {
3876                 printf("Failed to store record\n");
3877         }
3878
3879         talloc_free(h);
3880         talloc_free(ctdb_db);
3881         talloc_free(tmp_ctx);
3882         return 0;
3883 }
3884
3885 /*
3886   fetch a record from a persistent database
3887  */
3888 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3889 {
3890         const char *db_name;
3891         struct ctdb_db_context *ctdb_db;
3892         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3893         struct ctdb_transaction_handle *h;
3894         TDB_DATA key, data;
3895         int fd, ret;
3896         bool persistent;
3897         uint8_t flags;
3898
3899         if (argc < 2) {
3900                 talloc_free(tmp_ctx);
3901                 usage();
3902         }
3903
3904         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3905                 talloc_free(tmp_ctx);
3906                 return -1;
3907         }
3908
3909         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3910         if (!persistent) {
3911                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3912                 talloc_free(tmp_ctx);
3913                 return -1;
3914         }
3915
3916         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3917         if (ctdb_db == NULL) {
3918                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3919                 talloc_free(tmp_ctx);
3920                 return -1;
3921         }
3922
3923         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3924         if (h == NULL) {
3925                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3926                 talloc_free(tmp_ctx);
3927                 return -1;
3928         }
3929
3930         key.dptr  = discard_const(argv[1]);
3931         key.dsize = strlen(argv[1]);
3932         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3933         if (ret != 0) {
3934                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3935                 talloc_free(tmp_ctx);
3936                 return -1;
3937         }
3938
3939         if (data.dsize == 0 || data.dptr == NULL) {
3940                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3941                 talloc_free(tmp_ctx);
3942                 return -1;
3943         }
3944
3945         if (argc == 3) {
3946           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3947                 if (fd == -1) {
3948                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3949                         talloc_free(tmp_ctx);
3950                         return -1;
3951                 }
3952                 write(fd, data.dptr, data.dsize);
3953                 close(fd);
3954         } else {
3955                 write(1, data.dptr, data.dsize);
3956         }
3957
3958         /* abort the transaction */
3959         talloc_free(h);
3960
3961
3962         talloc_free(tmp_ctx);
3963         return 0;
3964 }
3965
3966 /*
3967   fetch a record from a tdb-file
3968  */
3969 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3970 {
3971         const char *tdb_file;
3972         TDB_CONTEXT *tdb;
3973         TDB_DATA key, data;
3974         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3975         int fd;
3976
3977         if (argc < 2) {
3978                 usage();
3979         }
3980
3981         tdb_file = argv[0];
3982
3983         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3984         if (tdb == NULL) {
3985                 printf("Failed to open TDB file %s\n", tdb_file);
3986                 return -1;
3987         }
3988
3989         if (!strncmp(argv[1], "0x", 2)) {
3990                 key = hextodata(tmp_ctx, argv[1] + 2);
3991                 if (key.dsize == 0) {
3992                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3993                         return -1;
3994                 }
3995         } else {
3996                 key.dptr  = discard_const(argv[1]);
3997                 key.dsize = strlen(argv[1]);
3998         }
3999
4000         data = tdb_fetch(tdb, key);
4001         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4002                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4003                 tdb_close(tdb);
4004                 return -1;
4005         }
4006
4007         tdb_close(tdb);
4008
4009         if (argc == 3) {
4010           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4011                 if (fd == -1) {
4012                         printf("Failed to open output file %s\n", argv[2]);
4013                         return -1;
4014                 }
4015                 if (options.verbose){
4016                         write(fd, data.dptr, data.dsize);
4017                 } else {
4018                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4019                 }
4020                 close(fd);
4021         } else {
4022                 if (options.verbose){
4023                         write(1, data.dptr, data.dsize);
4024                 } else {
4025                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4026                 }
4027         }
4028
4029         talloc_free(tmp_ctx);
4030         return 0;
4031 }
4032
4033 /*
4034   store a record and header to a tdb-file
4035  */
4036 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4037 {
4038         const char *tdb_file;
4039         TDB_CONTEXT *tdb;
4040         TDB_DATA key, value, data;
4041         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4042         struct ctdb_ltdb_header header;
4043
4044         if (argc < 3) {
4045                 usage();
4046         }
4047
4048         tdb_file = argv[0];
4049
4050         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4051         if (tdb == NULL) {
4052                 printf("Failed to open TDB file %s\n", tdb_file);
4053                 return -1;
4054         }
4055
4056         if (!strncmp(argv[1], "0x", 2)) {
4057                 key = hextodata(tmp_ctx, argv[1] + 2);
4058                 if (key.dsize == 0) {
4059                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4060                         return -1;
4061                 }
4062         } else {
4063                 key.dptr  = discard_const(argv[1]);
4064                 key.dsize = strlen(argv[1]);
4065         }
4066
4067         if (!strncmp(argv[2], "0x", 2)) {
4068                 value = hextodata(tmp_ctx, argv[2] + 2);
4069                 if (value.dsize == 0) {
4070                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4071                         return -1;
4072                 }
4073         } else {
4074                 value.dptr  = discard_const(argv[2]);
4075                 value.dsize = strlen(argv[2]);
4076         }
4077
4078         ZERO_STRUCT(header);
4079         if (argc > 3) {
4080                 header.rsn = atoll(argv[3]);
4081         }
4082         if (argc > 4) {
4083                 header.dmaster = atoi(argv[4]);
4084         }
4085         if (argc > 5) {
4086                 header.flags = atoi(argv[5]);
4087         }
4088
4089         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4090         data.dptr = talloc_size(tmp_ctx, data.dsize);
4091         if (data.dptr == NULL) {
4092                 printf("Failed to allocate header+value\n");
4093                 return -1;
4094         }
4095
4096         *(struct ctdb_ltdb_header *)data.dptr = header;
4097         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4098
4099         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4100                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4101                 tdb_close(tdb);
4102                 return -1;
4103         }
4104
4105         tdb_close(tdb);
4106
4107         talloc_free(tmp_ctx);
4108         return 0;
4109 }
4110
4111 /*
4112   write a record to a persistent database
4113  */
4114 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4115 {
4116         const char *db_name;
4117         struct ctdb_db_context *ctdb_db;
4118         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4119         struct ctdb_transaction_handle *h;
4120         struct stat st;
4121         TDB_DATA key, data;
4122         int fd, ret;
4123
4124         if (argc < 3) {
4125                 talloc_free(tmp_ctx);
4126                 usage();
4127         }
4128
4129         fd = open(argv[2], O_RDONLY);
4130         if (fd == -1) {
4131                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4132                 talloc_free(tmp_ctx);
4133                 return -1;
4134         }
4135         
4136         ret = fstat(fd, &st);
4137         if (ret == -1) {
4138                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4139                 close(fd);
4140                 talloc_free(tmp_ctx);
4141                 return -1;
4142         }
4143
4144         if (!S_ISREG(st.st_mode)) {
4145                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4146                 close(fd);
4147                 talloc_free(tmp_ctx);
4148                 return -1;
4149         }
4150
4151         data.dsize = st.st_size;
4152         if (data.dsize == 0) {
4153                 data.dptr  = NULL;
4154         } else {
4155                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4156                 if (data.dptr == NULL) {
4157                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4158                         close(fd);
4159                         talloc_free(tmp_ctx);
4160                         return -1;
4161                 }
4162                 ret = read(fd, data.dptr, data.dsize);
4163                 if (ret != data.dsize) {
4164                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4165                         close(fd);
4166                         talloc_free(tmp_ctx);
4167                         return -1;
4168                 }
4169         }
4170         close(fd);
4171
4172
4173         db_name = argv[0];
4174
4175         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4176         if (ctdb_db == NULL) {
4177                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4178                 talloc_free(tmp_ctx);
4179                 return -1;
4180         }
4181
4182         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4183         if (h == NULL) {
4184                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4185                 talloc_free(tmp_ctx);
4186                 return -1;
4187         }
4188
4189         key.dptr  = discard_const(argv[1]);
4190         key.dsize = strlen(argv[1]);
4191         ret = ctdb_transaction_store(h, key, data);
4192         if (ret != 0) {
4193                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4194                 talloc_free(tmp_ctx);
4195                 return -1;
4196         }
4197
4198         ret = ctdb_transaction_commit(h);
4199         if (ret != 0) {
4200                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4201                 talloc_free(tmp_ctx);
4202                 return -1;
4203         }
4204
4205
4206         talloc_free(tmp_ctx);
4207         return 0;
4208 }
4209
4210 /*
4211  * delete a record from a persistent database
4212  */
4213 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4214 {
4215         const char *db_name;
4216         struct ctdb_db_context *ctdb_db;
4217         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4218         struct ctdb_transaction_handle *h;
4219         TDB_DATA key;
4220         int ret;
4221         bool persistent;
4222         uint8_t flags;
4223
4224         if (argc < 2) {
4225                 talloc_free(tmp_ctx);
4226                 usage();
4227         }
4228
4229         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4230                 talloc_free(tmp_ctx);
4231                 return -1;
4232         }
4233
4234         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4235         if (!persistent) {
4236                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4237                 talloc_free(tmp_ctx);
4238                 return -1;
4239         }
4240
4241         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4242         if (ctdb_db == NULL) {
4243                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4244                 talloc_free(tmp_ctx);
4245                 return -1;
4246         }
4247
4248         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4249         if (h == NULL) {
4250                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4251                 talloc_free(tmp_ctx);
4252                 return -1;
4253         }
4254
4255         key.dptr = discard_const(argv[1]);
4256         key.dsize = strlen(argv[1]);
4257         ret = ctdb_transaction_store(h, key, tdb_null);
4258         if (ret != 0) {
4259                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4260                 talloc_free(tmp_ctx);
4261                 return -1;
4262         }
4263
4264         ret = ctdb_transaction_commit(h);
4265         if (ret != 0) {
4266                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4267                 talloc_free(tmp_ctx);
4268                 return -1;
4269         }
4270
4271         talloc_free(tmp_ctx);
4272         return 0;
4273 }
4274
4275 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4276                                        TDB_DATA *data)
4277 {
4278         const char *t;
4279         size_t n;
4280         const char *ret; /* Next byte after successfully parsed value */
4281
4282         /* Error, unless someone says otherwise */
4283         ret = NULL;
4284         /* Indicates no value to parse */
4285         *data = tdb_null;
4286
4287         /* Skip whitespace */
4288         n = strspn(s, " \t");
4289         t = s + n;
4290
4291         if (t[0] == '"') {
4292                 /* Quoted ASCII string - no wide characters! */
4293                 t++;
4294                 n = strcspn(t, "\"");
4295                 if (t[n] == '"') {
4296                         if (n > 0) {
4297                                 data->dsize = n;
4298                                 data->dptr = talloc_memdup(mem_ctx, t, n);
4299                                 CTDB_NOMEM_ABORT(data->dptr);
4300                         }
4301                         ret = t + n + 1;
4302                 } else {
4303                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4304                 }
4305         } else {
4306                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4307         }
4308
4309         return ret;
4310 }
4311
4312 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4313                                  TDB_DATA *key, TDB_DATA *value)
4314 {
4315         char line [1024]; /* FIXME: make this more flexible? */
4316         const char *t;
4317         char *ptr;
4318
4319         ptr = fgets(line, sizeof(line), file);
4320
4321         if (ptr == NULL) {
4322                 return false;
4323         }
4324
4325         /* Get key */
4326         t = ptrans_parse_string(mem_ctx, line, key);
4327         if (t == NULL || key->dptr == NULL) {
4328                 /* Line Ignored but not EOF */
4329                 return true;
4330         }
4331
4332         /* Get value */
4333         t = ptrans_parse_string(mem_ctx, t, value);
4334         if (t == NULL) {
4335                 /* Line Ignored but not EOF */
4336                 talloc_free(key->dptr);
4337                 *key = tdb_null;
4338                 return true;
4339         }
4340
4341         return true;
4342 }
4343
4344 /*
4345  * Update a persistent database as per file/stdin
4346  */
4347 static int control_ptrans(struct ctdb_context *ctdb,
4348                           int argc, const char **argv)
4349 {
4350         const char *db_name;
4351         struct ctdb_db_context *ctdb_db;
4352         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4353         struct ctdb_transaction_handle *h;
4354         TDB_DATA key, value;
4355         FILE *file;
4356         int ret;
4357
4358         if (argc < 1) {
4359                 talloc_free(tmp_ctx);
4360                 usage();
4361         }
4362
4363         file = stdin;
4364         if (argc == 2) {
4365                 file = fopen(argv[1], "r");
4366                 if (file == NULL) {
4367                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4368                         talloc_free(tmp_ctx);
4369                         return -1;
4370                 }
4371         }
4372
4373         db_name = argv[0];
4374
4375         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4376         if (ctdb_db == NULL) {
4377                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4378                 goto error;
4379         }
4380
4381         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4382         if (h == NULL) {
4383                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4384                 goto error;
4385         }
4386
4387         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4388                 if (key.dsize != 0) {
4389                         ret = ctdb_transaction_store(h, key, value);
4390                         /* Minimise memory use */
4391                         talloc_free(key.dptr);
4392                         if (value.dptr != NULL) {
4393                                 talloc_free(value.dptr);
4394                         }
4395                         if (ret != 0) {
4396                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4397                                 ctdb_transaction_cancel(h);
4398                                 goto error;
4399                         }
4400                 }
4401         }
4402
4403         ret = ctdb_transaction_commit(h);
4404         if (ret != 0) {
4405                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4406                 goto error;
4407         }
4408
4409         if (file != stdin) {
4410                 fclose(file);
4411         }
4412         talloc_free(tmp_ctx);
4413         return 0;
4414
4415 error:
4416         if (file != stdin) {
4417                 fclose(file);
4418         }
4419
4420         talloc_free(tmp_ctx);
4421         return -1;
4422 }
4423
4424 /*
4425   check if a service is bound to a port or not
4426  */
4427 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4428 {
4429         int s, ret;
4430         int v;
4431         int port;
4432         struct sockaddr_in sin;
4433
4434         if (argc != 1) {
4435                 printf("Use: ctdb chktcport <port>\n");
4436                 return EINVAL;
4437         }
4438
4439         port = atoi(argv[0]);
4440
4441         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4442         if (s == -1) {
4443                 printf("Failed to open local socket\n");
4444                 return errno;
4445         }
4446
4447         v = fcntl(s, F_GETFL, 0);
4448         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4449                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4450         }
4451
4452         bzero(&sin, sizeof(sin));
4453         sin.sin_family = PF_INET;
4454         sin.sin_port   = htons(port);
4455         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4456         close(s);
4457         if (ret == -1) {
4458                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4459                 return errno;
4460         }
4461
4462         return 0;
4463 }
4464
4465
4466
4467 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4468                              TDB_DATA data, void *private_data)
4469 {
4470         DEBUG(DEBUG_ERR,("Log data received\n"));
4471         if (data.dsize > 0) {
4472                 printf("%s", data.dptr);
4473         }
4474
4475         exit(0);
4476 }
4477
4478 /*
4479   display a list of log messages from the in memory ringbuffer
4480  */
4481 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
4482 {
4483         int ret, i;
4484         bool main_daemon;
4485         struct ctdb_get_log_addr log_addr;
4486         TDB_DATA data;
4487         struct timeval tv;
4488
4489         /* Process options */
4490         main_daemon = true;
4491         log_addr.pnn = ctdb_get_pnn(ctdb);
4492         log_addr.level = DEBUG_NOTICE;
4493         for (i = 0; i < argc; i++) {
4494                 if (strcmp(argv[i], "recoverd") == 0) {
4495                         main_daemon = false;
4496                 } else {
4497                         if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
4498                                 log_addr.level = get_debug_by_desc(argv[i]);
4499                         } else {
4500                                 log_addr.level = strtol(argv[i], NULL, 0);
4501                         }
4502                 }
4503         }
4504
4505         /* Our message port is our PID */
4506         log_addr.srvid = getpid();
4507
4508         data.dptr = (unsigned char *)&log_addr;
4509         data.dsize = sizeof(log_addr);
4510
4511         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
4512
4513         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
4514         sleep(1);
4515
4516         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
4517
4518         if (main_daemon) {
4519                 int32_t res;
4520                 char *errmsg;
4521                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4522
4523                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4524                                    0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4525                 if (ret != 0 || res != 0) {
4526                         DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4527                         talloc_free(tmp_ctx);
4528                         return -1;
4529                 }
4530                 talloc_free(tmp_ctx);
4531         } else {
4532                 ret = ctdb_client_send_message(ctdb, options.pnn,
4533                                                CTDB_SRVID_GETLOG, data);
4534                 if (ret != 0) {
4535                         DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
4536                         return -1;
4537                 }
4538         }
4539
4540         tv = timeval_current();
4541         /* this loop will terminate when we have received the reply */
4542         while (timeval_elapsed(&tv) < (double)options.timelimit) {
4543                 event_loop_once(ctdb->ev);
4544         }
4545
4546         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4547
4548         return 0;
4549 }
4550
4551 /*
4552   clear the in memory log area
4553  */
4554 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4555 {
4556         int ret;
4557
4558         if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
4559                 /* "recoverd" not given - get logs from main daemon */
4560                 int32_t res;
4561                 char *errmsg;
4562                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4563
4564                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4565                                    0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4566                 if (ret != 0 || res != 0) {
4567                         DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4568                         talloc_free(tmp_ctx);
4569                         return -1;
4570                 }
4571
4572                 talloc_free(tmp_ctx);
4573         } else {
4574                 TDB_DATA data; /* unused in recoverd... */
4575                 data.dsize = 0;
4576
4577                 ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
4578                 if (ret != 0) {
4579                         DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
4580                         return -1;
4581                 }
4582         }
4583
4584         return 0;
4585 }
4586
4587 /* Reload public IPs on a specified nodes */
4588 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4589 {
4590         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4591         uint32_t *nodes;
4592         uint32_t pnn_mode;
4593         uint32_t timeout;
4594         int ret;
4595
4596         assert_single_node_only();
4597
4598         if (argc > 1) {
4599                 usage();
4600         }
4601
4602         /* Determine the nodes where IPs need to be reloaded */
4603         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4604                               options.pnn, true, &nodes, &pnn_mode)) {
4605                 ret = -1;
4606                 goto done;
4607         }
4608
4609 again:
4610         /* Disable takeover runs on all connected nodes.  A reply
4611          * indicating success is needed from each node so all nodes
4612          * will need to be active.  This will retry until maxruntime
4613          * is exceeded, hence no error handling.
4614          * 
4615          * A check could be added to not allow reloading of IPs when
4616          * there are disconnected nodes.  However, this should
4617          * probably be left up to the administrator.
4618          */
4619         timeout = LONGTIMEOUT;
4620         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4621                         "Disable takeover runs", true);
4622
4623         /* Now tell all the desired nodes to reload their public IPs.
4624          * Keep trying this until it succeeds.  This assumes all
4625          * failures are transient, which might not be true...
4626          */
4627         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4628                                       nodes, 0, LONGTIMELIMIT(),
4629                                       false, tdb_null,
4630                                       NULL, NULL, NULL) != 0) {
4631                 DEBUG(DEBUG_ERR,
4632                       ("Unable to reload IPs on some nodes, try again.\n"));
4633                 goto again;
4634         }
4635
4636         /* It isn't strictly necessary to wait until takeover runs are
4637          * re-enabled but doing so can't hurt.
4638          */
4639         timeout = 0;
4640         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4641                         "Enable takeover runs", true);
4642
4643         ipreallocate(ctdb);
4644
4645         ret = 0;
4646 done:
4647         talloc_free(tmp_ctx);
4648         return ret;
4649 }
4650
4651 /*
4652   display a list of the databases on a remote ctdb
4653  */
4654 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4655 {
4656         int i, ret;
4657         struct ctdb_dbid_map *dbmap=NULL;
4658
4659         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4660         if (ret != 0) {
4661                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4662                 return ret;
4663         }
4664
4665         if(options.machinereadable){
4666                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4667                 for(i=0;i<dbmap->num;i++){
4668                         const char *path;
4669                         const char *name;
4670                         const char *health;
4671                         bool persistent;
4672                         bool readonly;
4673                         bool sticky;
4674
4675                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4676                                             dbmap->dbs[i].dbid, ctdb, &path);
4677                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4678                                             dbmap->dbs[i].dbid, ctdb, &name);
4679                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4680                                               dbmap->dbs[i].dbid, ctdb, &health);
4681                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4682                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4683                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4684                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4685                                dbmap->dbs[i].dbid, name, path,
4686                                !!(persistent), !!(sticky),
4687                                !!(health), !!(readonly));
4688                 }
4689                 return 0;
4690         }
4691
4692         printf("Number of databases:%d\n", dbmap->num);
4693         for(i=0;i<dbmap->num;i++){
4694                 const char *path;
4695                 const char *name;
4696                 const char *health;
4697                 bool persistent;
4698                 bool readonly;
4699                 bool sticky;
4700
4701                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4702                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4703                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4704                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4705                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4706                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4707                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4708                        dbmap->dbs[i].dbid, name, path,
4709                        persistent?" PERSISTENT":"",
4710                        sticky?" STICKY":"",
4711                        readonly?" READONLY":"",
4712                        health?" UNHEALTHY":"");
4713         }
4714
4715         return 0;
4716 }
4717
4718 /*
4719   display the status of a database on a remote ctdb
4720  */
4721 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4722 {
4723         const char *db_name;
4724         uint32_t db_id;
4725         uint8_t flags;
4726         const char *path;
4727         const char *health;
4728
4729         if (argc < 1) {
4730                 usage();
4731         }
4732
4733         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4734                 return -1;
4735         }
4736
4737         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4738         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4739         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4740                db_id, db_name, path,
4741                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4742                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4743                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4744                (health ? health : "OK"));
4745
4746         return 0;
4747 }
4748
4749 /*
4750   check if the local node is recmaster or not
4751   it will return 1 if this node is the recmaster and 0 if it is not
4752   or if the local ctdb daemon could not be contacted
4753  */
4754 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4755 {
4756         uint32_t mypnn, recmaster;
4757         int ret;
4758
4759         assert_single_node_only();
4760
4761         mypnn = getpnn(ctdb);
4762
4763         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4764         if (ret != 0) {
4765                 printf("Failed to get the recmaster\n");
4766                 return 1;
4767         }
4768
4769         if (recmaster != mypnn) {
4770                 printf("this node is not the recmaster\n");
4771                 return 1;
4772         }
4773
4774         printf("this node is the recmaster\n");
4775         return 0;
4776 }
4777
4778 /*
4779   ping a node
4780  */
4781 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4782 {
4783         int ret;
4784         struct timeval tv = timeval_current();
4785         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4786         if (ret == -1) {
4787                 printf("Unable to get ping response from node %u\n", options.pnn);
4788                 return -1;
4789         } else {
4790                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4791                        options.pnn, timeval_elapsed(&tv), ret);
4792         }
4793         return 0;
4794 }
4795
4796
4797 /*
4798   get a node's runstate
4799  */
4800 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4801 {
4802         int ret;
4803         enum ctdb_runstate runstate;
4804
4805         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4806         if (ret == -1) {
4807                 printf("Unable to get runstate response from node %u\n",
4808                        options.pnn);
4809                 return -1;
4810         } else {
4811                 bool found = true;
4812                 enum ctdb_runstate t;
4813                 int i;
4814                 for (i=0; i<argc; i++) {
4815                         found = false;
4816                         t = runstate_from_string(argv[i]);
4817                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4818                                 printf("Invalid run state (%s)\n", argv[i]);
4819                                 return -1;
4820                         }
4821
4822                         if (t == runstate) {
4823                                 found = true;
4824                                 break;
4825                         }
4826                 }
4827
4828                 if (!found) {
4829                         printf("CTDB not in required run state (got %s)\n", 
4830                                runstate_to_string((enum ctdb_runstate)runstate));
4831                         return -1;
4832                 }
4833         }
4834
4835         printf("%s\n", runstate_to_string(runstate));
4836         return 0;
4837 }
4838
4839
4840 /*
4841   get a tunable
4842  */
4843 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4844 {
4845         const char *name;
4846         uint32_t value;
4847         int ret;
4848
4849         if (argc < 1) {
4850                 usage();
4851         }
4852
4853         name = argv[0];
4854         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4855         if (ret != 0) {
4856                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4857                 return -1;
4858         }
4859
4860         printf("%-23s = %u\n", name, value);
4861         return 0;
4862 }
4863
4864 /*
4865   set a tunable
4866  */
4867 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4868 {
4869         const char *name;
4870         uint32_t value;
4871         int ret;
4872
4873         if (argc < 2) {
4874                 usage();
4875         }
4876
4877         name = argv[0];
4878         value = strtoul(argv[1], NULL, 0);
4879
4880         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4881         if (ret == -1) {
4882                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4883                 return -1;
4884         }
4885         return 0;
4886 }
4887
4888 /*
4889   list all tunables
4890  */
4891 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4892 {
4893         uint32_t count;
4894         const char **list;
4895         int ret, i;
4896
4897         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4898         if (ret == -1) {
4899                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4900                 return -1;
4901         }
4902
4903         for (i=0;i<count;i++) {
4904                 control_getvar(ctdb, 1, &list[i]);
4905         }
4906
4907         talloc_free(list);
4908         
4909         return 0;
4910 }
4911
4912 /*
4913   display debug level on a node
4914  */
4915 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4916 {
4917         int ret;
4918         int32_t level;
4919
4920         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4921         if (ret != 0) {
4922                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4923                 return ret;
4924         } else {
4925                 if (options.machinereadable){
4926                         printf(":Name:Level:\n");
4927                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4928                 } else {
4929                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4930                 }
4931         }
4932         return 0;
4933 }
4934
4935 /*
4936   display reclock file of a node
4937  */
4938 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4939 {
4940         int ret;
4941         const char *reclock;
4942
4943         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4944         if (ret != 0) {
4945                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4946                 return ret;
4947         } else {
4948                 if (options.machinereadable){
4949                         if (reclock != NULL) {
4950                                 printf("%s", reclock);
4951                         }
4952                 } else {
4953                         if (reclock == NULL) {
4954                                 printf("No reclock file used.\n");
4955                         } else {
4956                                 printf("Reclock file:%s\n", reclock);
4957                         }
4958                 }
4959         }
4960         return 0;
4961 }
4962
4963 /*
4964   set the reclock file of a node
4965  */
4966 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4967 {
4968         int ret;
4969         const char *reclock;
4970
4971         if (argc == 0) {
4972                 reclock = NULL;
4973         } else if (argc == 1) {
4974                 reclock = argv[0];
4975         } else {
4976                 usage();
4977         }
4978
4979         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4980         if (ret != 0) {
4981                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4982                 return ret;
4983         }
4984         return 0;
4985 }
4986
4987 /*
4988   set the natgw state on/off
4989  */
4990 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4991 {
4992         int ret;
4993         uint32_t natgwstate;
4994
4995         if (argc == 0) {
4996                 usage();
4997         }
4998
4999         if (!strcmp(argv[0], "on")) {
5000                 natgwstate = 1;
5001         } else if (!strcmp(argv[0], "off")) {
5002                 natgwstate = 0;
5003         } else {
5004                 usage();
5005         }
5006
5007         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
5008         if (ret != 0) {
5009                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
5010                 return ret;
5011         }
5012
5013         return 0;
5014 }
5015
5016 /*
5017   set the lmaster role on/off
5018  */
5019 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5020 {
5021         int ret;
5022         uint32_t lmasterrole;
5023
5024         if (argc == 0) {
5025                 usage();
5026         }
5027
5028         if (!strcmp(argv[0], "on")) {
5029                 lmasterrole = 1;
5030         } else if (!strcmp(argv[0], "off")) {
5031                 lmasterrole = 0;
5032         } else {
5033                 usage();
5034         }
5035
5036         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
5037         if (ret != 0) {
5038                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
5039                 return ret;
5040         }
5041
5042         return 0;
5043 }
5044
5045 /*
5046   set the recmaster role on/off
5047  */
5048 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5049 {
5050         int ret;
5051         uint32_t recmasterrole;
5052
5053         if (argc == 0) {
5054                 usage();
5055         }
5056
5057         if (!strcmp(argv[0], "on")) {
5058                 recmasterrole = 1;
5059         } else if (!strcmp(argv[0], "off")) {
5060                 recmasterrole = 0;
5061         } else {
5062                 usage();
5063         }
5064
5065         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5066         if (ret != 0) {
5067                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5068                 return ret;
5069         }
5070
5071         return 0;
5072 }
5073
5074 /*
5075   set debug level on a node or all nodes
5076  */
5077 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5078 {
5079         int i, ret;
5080         int32_t level;
5081
5082         if (argc == 0) {
5083                 printf("You must specify the debug level. Valid levels are:\n");
5084                 for (i=0; debug_levels[i].description != NULL; i++) {
5085                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
5086                 }
5087
5088                 return 0;
5089         }
5090
5091         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
5092                 level = get_debug_by_desc(argv[0]);
5093         } else {
5094                 level = strtol(argv[0], NULL, 0);
5095         }
5096
5097         for (i=0; debug_levels[i].description != NULL; i++) {
5098                 if (level == debug_levels[i].level) {
5099                         break;
5100                 }
5101         }
5102         if (debug_levels[i].description == NULL) {
5103                 printf("Invalid debug level, must be one of\n");
5104                 for (i=0; debug_levels[i].description != NULL; i++) {
5105                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
5106                 }
5107                 return -1;
5108         }
5109
5110         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5111         if (ret != 0) {
5112                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5113         }
5114         return 0;
5115 }
5116
5117
5118 /*
5119   thaw a node
5120  */
5121 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5122 {
5123         int ret;
5124         uint32_t priority;
5125         
5126         if (argc == 1) {
5127                 priority = strtol(argv[0], NULL, 0);
5128         } else {
5129                 priority = 0;
5130         }
5131         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5132
5133         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5134         if (ret != 0) {
5135                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5136         }               
5137         return 0;
5138 }
5139
5140
5141 /*
5142   attach to a database
5143  */
5144 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5145 {
5146         const char *db_name;
5147         struct ctdb_db_context *ctdb_db;
5148         bool persistent = false;
5149
5150         if (argc < 1) {
5151                 usage();
5152         }
5153         db_name = argv[0];
5154         if (argc > 2) {
5155                 usage();
5156         }
5157         if (argc == 2) {
5158                 if (strcmp(argv[1], "persistent") != 0) {
5159                         usage();
5160                 }
5161                 persistent = true;
5162         }
5163
5164         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5165         if (ctdb_db == NULL) {
5166                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5167                 return -1;
5168         }
5169
5170         return 0;
5171 }
5172
5173 /*
5174   set db priority
5175  */
5176 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5177 {
5178         struct ctdb_db_priority db_prio;
5179         int ret;
5180
5181         if (argc < 2) {
5182                 usage();
5183         }
5184
5185         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5186         db_prio.priority = strtoul(argv[1], NULL, 0);
5187
5188         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5189         if (ret != 0) {
5190                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5191                 return -1;
5192         }
5193
5194         return 0;
5195 }
5196
5197 /*
5198   get db priority
5199  */
5200 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5201 {
5202         uint32_t db_id, priority;
5203         int ret;
5204
5205         if (argc < 1) {
5206                 usage();
5207         }
5208
5209         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5210                 return -1;
5211         }
5212
5213         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5214         if (ret != 0) {
5215                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5216                 return -1;
5217         }
5218
5219         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5220
5221         return 0;
5222 }
5223
5224 /*
5225   set the sticky records capability for a database
5226  */
5227 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5228 {
5229         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5230         uint32_t db_id;
5231         int ret;
5232
5233         if (argc < 1) {
5234                 usage();
5235         }
5236
5237         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5238                 return -1;
5239         }
5240
5241         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5242         if (ret != 0) {
5243                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5244                 talloc_free(tmp_ctx);
5245                 return -1;
5246         }
5247
5248         talloc_free(tmp_ctx);
5249         return 0;
5250 }
5251
5252 /*
5253   set the readonly capability for a database
5254  */
5255 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5256 {
5257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5258         uint32_t db_id;
5259         int ret;
5260
5261         if (argc < 1) {
5262                 usage();
5263         }
5264
5265         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5266                 return -1;
5267         }
5268
5269         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5270         if (ret != 0) {
5271                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5272                 talloc_free(tmp_ctx);
5273                 return -1;
5274         }
5275
5276         talloc_free(tmp_ctx);
5277         return 0;
5278 }
5279
5280 /*
5281   get db seqnum
5282  */
5283 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5284 {
5285         uint32_t db_id;
5286         uint64_t seqnum;
5287         int ret;
5288
5289         if (argc < 1) {
5290                 usage();
5291         }
5292
5293         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5294                 return -1;
5295         }
5296
5297         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5298         if (ret != 0) {
5299                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5300                 return -1;
5301         }
5302
5303         printf("Sequence number:%lld\n", (long long)seqnum);
5304
5305         return 0;
5306 }
5307
5308 /*
5309   run an eventscript on a node
5310  */
5311 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5312 {
5313         TDB_DATA data;
5314         int ret;
5315         int32_t res;
5316         char *errmsg;
5317         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5318
5319         if (argc != 1) {
5320                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5321                 return -1;
5322         }
5323
5324         data.dptr = (unsigned char *)discard_const(argv[0]);
5325         data.dsize = strlen((char *)data.dptr) + 1;
5326
5327         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5328
5329         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5330                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5331         if (ret != 0 || res != 0) {
5332                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5333                 talloc_free(tmp_ctx);
5334                 return -1;
5335         }
5336         talloc_free(tmp_ctx);
5337         return 0;
5338 }
5339
5340 #define DB_VERSION 1
5341 #define MAX_DB_NAME 64
5342 struct db_file_header {
5343         unsigned long version;
5344         time_t timestamp;
5345         unsigned long persistent;
5346         unsigned long size;
5347         const char name[MAX_DB_NAME];
5348 };
5349
5350 struct backup_data {
5351         struct ctdb_marshall_buffer *records;
5352         uint32_t len;
5353         uint32_t total;
5354         bool traverse_error;
5355 };
5356
5357 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5358 {
5359         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5360         struct ctdb_rec_data *rec;
5361
5362         /* add the record */
5363         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5364         if (rec == NULL) {
5365                 bd->traverse_error = true;
5366                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5367                 return -1;
5368         }
5369         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5370         if (bd->records == NULL) {
5371                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5372                 bd->traverse_error = true;
5373                 return -1;
5374         }
5375         bd->records->count++;
5376         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5377         bd->len += rec->length;
5378         talloc_free(rec);
5379
5380         bd->total++;
5381         return 0;
5382 }
5383
5384 /*
5385  * backup a database to a file 
5386  */
5387 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5388 {
5389         const char *db_name;
5390         int ret;
5391         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5392         struct db_file_header dbhdr;
5393         struct ctdb_db_context *ctdb_db;
5394         struct backup_data *bd;
5395         int fh = -1;
5396         int status = -1;
5397         const char *reason = NULL;
5398         uint32_t db_id;
5399         uint8_t flags;
5400
5401         assert_single_node_only();
5402
5403         if (argc != 2) {
5404                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5405                 return -1;
5406         }
5407
5408         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5409                 return -1;
5410         }
5411
5412         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5413                                     db_id, tmp_ctx, &reason);
5414         if (ret != 0) {
5415                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5416                                  argv[0]));
5417                 talloc_free(tmp_ctx);
5418                 return -1;
5419         }
5420         if (reason) {
5421                 uint32_t allow_unhealthy = 0;
5422
5423                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5424                                       "AllowUnhealthyDBRead",
5425                                       &allow_unhealthy);
5426
5427                 if (allow_unhealthy != 1) {
5428                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5429                                          argv[0], reason));
5430
5431                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5432                                          allow_unhealthy));
5433                         talloc_free(tmp_ctx);
5434                         return -1;
5435                 }
5436
5437                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5438                                      argv[0], argv[0]));
5439                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5440                                      "tunnable AllowUnhealthyDBRead = %u\n",
5441                                      allow_unhealthy));
5442         }
5443
5444         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5445         if (ctdb_db == NULL) {
5446                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5447                 talloc_free(tmp_ctx);
5448                 return -1;
5449         }
5450
5451
5452         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5453         if (ret == -1) {
5454                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5455                 talloc_free(tmp_ctx);
5456                 return -1;
5457         }
5458
5459
5460         bd = talloc_zero(tmp_ctx, struct backup_data);
5461         if (bd == NULL) {
5462                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5463                 talloc_free(tmp_ctx);
5464                 return -1;
5465         }
5466
5467         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5468         if (bd->records == NULL) {
5469                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5470                 talloc_free(tmp_ctx);
5471                 return -1;
5472         }
5473
5474         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5475         bd->records->db_id = ctdb_db->db_id;
5476         /* traverse the database collecting all records */
5477         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5478             bd->traverse_error) {
5479                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5480                 talloc_free(tmp_ctx);
5481                 return -1;              
5482         }
5483
5484         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5485
5486
5487         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5488         if (fh == -1) {
5489                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5490                 talloc_free(tmp_ctx);
5491                 return -1;
5492         }
5493
5494         ZERO_STRUCT(dbhdr);
5495         dbhdr.version = DB_VERSION;
5496         dbhdr.timestamp = time(NULL);
5497         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5498         dbhdr.size = bd->len;
5499         if (strlen(argv[0]) >= MAX_DB_NAME) {
5500                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5501                 goto done;
5502         }
5503         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5504         ret = write(fh, &dbhdr, sizeof(dbhdr));
5505         if (ret == -1) {
5506                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5507                 goto done;
5508         }
5509         ret = write(fh, bd->records, bd->len);
5510         if (ret == -1) {
5511                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5512                 goto done;
5513         }
5514
5515         status = 0;
5516 done:
5517         if (fh != -1) {
5518                 ret = close(fh);
5519                 if (ret == -1) {
5520                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5521                 }
5522         }
5523
5524         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5525
5526         talloc_free(tmp_ctx);
5527         return status;
5528 }
5529
5530 /*
5531  * restore a database from a file 
5532  */
5533 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5534 {
5535         int ret;
5536         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5537         TDB_DATA outdata;
5538         TDB_DATA data;
5539         struct db_file_header dbhdr;
5540         struct ctdb_db_context *ctdb_db;
5541         struct ctdb_node_map *nodemap=NULL;
5542         struct ctdb_vnn_map *vnnmap=NULL;
5543         int i, fh;
5544         struct ctdb_control_wipe_database w;
5545         uint32_t *nodes;
5546         uint32_t generation;
5547         struct tm *tm;
5548         char tbuf[100];
5549         char *dbname;
5550
5551         assert_single_node_only();
5552
5553         if (argc < 1 || argc > 2) {
5554                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5555                 return -1;
5556         }
5557
5558         fh = open(argv[0], O_RDONLY);
5559         if (fh == -1) {
5560                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5561                 talloc_free(tmp_ctx);
5562                 return -1;
5563         }
5564
5565         read(fh, &dbhdr, sizeof(dbhdr));
5566         if (dbhdr.version != DB_VERSION) {
5567                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5568                 close(fh);
5569                 talloc_free(tmp_ctx);
5570                 return -1;
5571         }
5572
5573         dbname = discard_const(dbhdr.name);
5574         if (argc == 2) {
5575                 dbname = discard_const(argv[1]);
5576         }
5577
5578         outdata.dsize = dbhdr.size;
5579         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5580         if (outdata.dptr == NULL) {
5581                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5582                 close(fh);
5583                 talloc_free(tmp_ctx);
5584                 return -1;
5585         }               
5586         read(fh, outdata.dptr, outdata.dsize);
5587         close(fh);
5588
5589         tm = localtime(&dbhdr.timestamp);
5590         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5591         printf("Restoring database '%s' from backup @ %s\n",
5592                 dbname, tbuf);
5593
5594
5595         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5596         if (ctdb_db == NULL) {
5597                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5598                 talloc_free(tmp_ctx);
5599                 return -1;
5600         }
5601
5602         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5603         if (ret != 0) {
5604                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5605                 talloc_free(tmp_ctx);
5606                 return ret;
5607         }
5608
5609
5610         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5611         if (ret != 0) {
5612                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5613                 talloc_free(tmp_ctx);
5614                 return ret;
5615         }
5616
5617         /* freeze all nodes */
5618         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5619         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5620                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5621                                         nodes, i,
5622                                         TIMELIMIT(),
5623                                         false, tdb_null,
5624                                         NULL, NULL,
5625                                         NULL) != 0) {
5626                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5627                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5628                         talloc_free(tmp_ctx);
5629                         return -1;
5630                 }
5631         }
5632
5633         generation = vnnmap->generation;
5634         data.dptr = (void *)&generation;
5635         data.dsize = sizeof(generation);
5636
5637         /* start a cluster wide transaction */
5638         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5639         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5640                                         nodes, 0,
5641                                         TIMELIMIT(), false, data,
5642                                         NULL, NULL,
5643                                         NULL) != 0) {
5644                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5645                 return -1;
5646         }
5647
5648
5649         w.db_id = ctdb_db->db_id;
5650         w.transaction_id = generation;
5651
5652         data.dptr = (void *)&w;
5653         data.dsize = sizeof(w);
5654
5655         /* wipe all the remote databases. */
5656         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5657         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5658                                         nodes, 0,
5659                                         TIMELIMIT(), false, data,
5660                                         NULL, NULL,
5661                                         NULL) != 0) {
5662                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5663                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5664                 talloc_free(tmp_ctx);
5665                 return -1;
5666         }
5667         
5668         /* push the database */
5669         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5670         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5671                                         nodes, 0,
5672                                         TIMELIMIT(), false, outdata,
5673                                         NULL, NULL,
5674                                         NULL) != 0) {
5675                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5676                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5677                 talloc_free(tmp_ctx);
5678                 return -1;
5679         }
5680
5681         data.dptr = (void *)&ctdb_db->db_id;
5682         data.dsize = sizeof(ctdb_db->db_id);
5683
5684         /* mark the database as healthy */
5685         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5686         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5687                                         nodes, 0,
5688                                         TIMELIMIT(), false, data,
5689                                         NULL, NULL,
5690                                         NULL) != 0) {
5691                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5692                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5693                 talloc_free(tmp_ctx);
5694                 return -1;
5695         }
5696
5697         data.dptr = (void *)&generation;
5698         data.dsize = sizeof(generation);
5699
5700         /* commit all the changes */
5701         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5702                                         nodes, 0,
5703                                         TIMELIMIT(), false, data,
5704                                         NULL, NULL,
5705                                         NULL) != 0) {
5706                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5707                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5708                 talloc_free(tmp_ctx);
5709                 return -1;
5710         }
5711
5712
5713         /* thaw all nodes */
5714         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5715         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5716                                         nodes, 0,
5717                                         TIMELIMIT(),
5718                                         false, tdb_null,
5719                                         NULL, NULL,
5720                                         NULL) != 0) {
5721                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5722                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5723                 talloc_free(tmp_ctx);
5724                 return -1;
5725         }
5726
5727
5728         talloc_free(tmp_ctx);
5729         return 0;
5730 }
5731
5732 /*
5733  * dump a database backup from a file
5734  */
5735 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5736 {
5737         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5738         TDB_DATA outdata;
5739         struct db_file_header dbhdr;
5740         int i, fh;
5741         struct tm *tm;
5742         char tbuf[100];
5743         struct ctdb_rec_data *rec = NULL;
5744         struct ctdb_marshall_buffer *m;
5745         struct ctdb_dump_db_context c;
5746
5747         assert_single_node_only();
5748
5749         if (argc != 1) {
5750                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5751                 return -1;
5752         }
5753
5754         fh = open(argv[0], O_RDONLY);
5755         if (fh == -1) {
5756                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5757                 talloc_free(tmp_ctx);
5758                 return -1;
5759         }
5760
5761         read(fh, &dbhdr, sizeof(dbhdr));
5762         if (dbhdr.version != DB_VERSION) {
5763                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5764                 close(fh);
5765                 talloc_free(tmp_ctx);
5766                 return -1;
5767         }
5768
5769         outdata.dsize = dbhdr.size;
5770         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5771         if (outdata.dptr == NULL) {
5772                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5773                 close(fh);
5774                 talloc_free(tmp_ctx);
5775                 return -1;
5776         }
5777         read(fh, outdata.dptr, outdata.dsize);
5778         close(fh);
5779         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5780
5781         tm = localtime(&dbhdr.timestamp);
5782         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5783         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5784                 dbhdr.name, m->db_id, tbuf);
5785
5786         ZERO_STRUCT(c);
5787         c.f = stdout;
5788         c.printemptyrecords = (bool)options.printemptyrecords;
5789         c.printdatasize = (bool)options.printdatasize;
5790         c.printlmaster = false;
5791         c.printhash = (bool)options.printhash;
5792         c.printrecordflags = (bool)options.printrecordflags;
5793
5794         for (i=0; i < m->count; i++) {
5795                 uint32_t reqid = 0;
5796                 TDB_DATA key, data;
5797
5798                 /* we do not want the header splitted, so we pass NULL*/
5799                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5800                                               NULL, &key, &data);
5801
5802                 ctdb_dumpdb_record(ctdb, key, data, &c);
5803         }
5804
5805         printf("Dumped %d records\n", i);
5806         talloc_free(tmp_ctx);
5807         return 0;
5808 }
5809
5810 /*
5811  * wipe a database from a file
5812  */
5813 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5814                           const char **argv)
5815 {
5816         const char *db_name;
5817         int ret;
5818         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5819         TDB_DATA data;
5820         struct ctdb_db_context *ctdb_db;
5821         struct ctdb_node_map *nodemap = NULL;
5822         struct ctdb_vnn_map *vnnmap = NULL;
5823         int i;
5824         struct ctdb_control_wipe_database w;
5825         uint32_t *nodes;
5826         uint32_t generation;
5827         uint8_t flags;
5828
5829         assert_single_node_only();
5830
5831         if (argc != 1) {
5832                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5833                 return -1;
5834         }
5835
5836         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5837                 return -1;
5838         }
5839
5840         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5841         if (ctdb_db == NULL) {
5842                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5843                                   argv[0]));
5844                 talloc_free(tmp_ctx);
5845                 return -1;
5846         }
5847
5848         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5849                                    &nodemap);
5850         if (ret != 0) {
5851                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5852                                   options.pnn));
5853                 talloc_free(tmp_ctx);
5854                 return ret;
5855         }
5856
5857         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5858                                   &vnnmap);
5859         if (ret != 0) {
5860                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5861                                   options.pnn));
5862                 talloc_free(tmp_ctx);
5863                 return ret;
5864         }
5865
5866         /* freeze all nodes */
5867         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5868         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5869                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5870                                                 nodes, i,
5871                                                 TIMELIMIT(),
5872                                                 false, tdb_null,
5873                                                 NULL, NULL,
5874                                                 NULL);
5875                 if (ret != 0) {
5876                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5877                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5878                                              CTDB_RECOVERY_ACTIVE);
5879                         talloc_free(tmp_ctx);
5880                         return -1;
5881                 }
5882         }
5883
5884         generation = vnnmap->generation;
5885         data.dptr = (void *)&generation;
5886         data.dsize = sizeof(generation);
5887
5888         /* start a cluster wide transaction */
5889         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5890         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5891                                         nodes, 0,
5892                                         TIMELIMIT(), false, data,
5893                                         NULL, NULL,
5894                                         NULL);
5895         if (ret!= 0) {
5896                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5897                                   "transactions.\n"));
5898                 return -1;
5899         }
5900
5901         w.db_id = ctdb_db->db_id;
5902         w.transaction_id = generation;
5903
5904         data.dptr = (void *)&w;
5905         data.dsize = sizeof(w);
5906
5907         /* wipe all the remote databases. */
5908         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5909         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5910                                         nodes, 0,
5911                                         TIMELIMIT(), false, data,
5912                                         NULL, NULL,
5913                                         NULL) != 0) {
5914                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5915                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5916                 talloc_free(tmp_ctx);
5917                 return -1;
5918         }
5919
5920         data.dptr = (void *)&ctdb_db->db_id;
5921         data.dsize = sizeof(ctdb_db->db_id);
5922
5923         /* mark the database as healthy */
5924         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5925         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5926                                         nodes, 0,
5927                                         TIMELIMIT(), false, data,
5928                                         NULL, NULL,
5929                                         NULL) != 0) {
5930                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5931                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5932                 talloc_free(tmp_ctx);
5933                 return -1;
5934         }
5935
5936         data.dptr = (void *)&generation;
5937         data.dsize = sizeof(generation);
5938
5939         /* commit all the changes */
5940         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5941                                         nodes, 0,
5942                                         TIMELIMIT(), false, data,
5943                                         NULL, NULL,
5944                                         NULL) != 0) {
5945                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5946                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5947                 talloc_free(tmp_ctx);
5948                 return -1;
5949         }
5950
5951         /* thaw all nodes */
5952         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5953         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5954                                         nodes, 0,
5955                                         TIMELIMIT(),
5956                                         false, tdb_null,
5957                                         NULL, NULL,
5958                                         NULL) != 0) {
5959                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5960                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5961                 talloc_free(tmp_ctx);
5962                 return -1;
5963         }
5964
5965         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5966
5967         talloc_free(tmp_ctx);
5968         return 0;
5969 }
5970
5971 /*
5972   dump memory usage
5973  */
5974 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5975 {
5976         TDB_DATA data;
5977         int ret;
5978         int32_t res;
5979         char *errmsg;
5980         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5981         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5982                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5983         if (ret != 0 || res != 0) {
5984                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5985                 talloc_free(tmp_ctx);
5986                 return -1;
5987         }
5988         write(1, data.dptr, data.dsize);
5989         talloc_free(tmp_ctx);
5990         return 0;
5991 }
5992
5993 /*
5994   handler for memory dumps
5995 */
5996 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5997                              TDB_DATA data, void *private_data)
5998 {
5999         write(1, data.dptr, data.dsize);
6000         exit(0);
6001 }
6002
6003 /*
6004   dump memory usage on the recovery daemon
6005  */
6006 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6007 {
6008         int ret;
6009         TDB_DATA data;
6010         struct srvid_request rd;
6011
6012         rd.pnn = ctdb_get_pnn(ctdb);
6013         rd.srvid = getpid();
6014
6015         /* register a message port for receiveing the reply so that we
6016            can receive the reply
6017         */
6018         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6019
6020
6021         data.dptr = (uint8_t *)&rd;
6022         data.dsize = sizeof(rd);
6023
6024         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6025         if (ret != 0) {
6026                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6027                 return -1;
6028         }
6029
6030         /* this loop will terminate when we have received the reply */
6031         while (1) {     
6032                 event_loop_once(ctdb->ev);
6033         }
6034
6035         return 0;
6036 }
6037
6038 /*
6039   send a message to a srvid
6040  */
6041 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6042 {
6043         unsigned long srvid;
6044         int ret;
6045         TDB_DATA data;
6046
6047         if (argc < 2) {
6048                 usage();
6049         }
6050
6051         srvid      = strtoul(argv[0], NULL, 0);
6052
6053         data.dptr = (uint8_t *)discard_const(argv[1]);
6054         data.dsize= strlen(argv[1]);
6055
6056         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6057         if (ret != 0) {
6058                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6059                 return -1;
6060         }
6061
6062         return 0;
6063 }
6064
6065 /*
6066   handler for msglisten
6067 */
6068 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6069                              TDB_DATA data, void *private_data)
6070 {
6071         int i;
6072
6073         printf("Message received: ");
6074         for (i=0;i<data.dsize;i++) {
6075                 printf("%c", data.dptr[i]);
6076         }
6077         printf("\n");
6078 }
6079
6080 /*
6081   listen for messages on a messageport
6082  */
6083 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6084 {
6085         uint64_t srvid;
6086
6087         srvid = getpid();
6088
6089         /* register a message port and listen for messages
6090         */
6091         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6092         printf("Listening for messages on srvid:%d\n", (int)srvid);
6093
6094         while (1) {     
6095                 event_loop_once(ctdb->ev);
6096         }
6097
6098         return 0;
6099 }
6100
6101 /*
6102   list all nodes in the cluster
6103   we parse the nodes file directly
6104  */
6105 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6106 {
6107         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6108         struct pnn_node *pnn_nodes;
6109         struct pnn_node *pnn_node;
6110
6111         assert_single_node_only();
6112
6113         pnn_nodes = read_nodes_file(mem_ctx);
6114         if (pnn_nodes == NULL) {
6115                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
6116                 talloc_free(mem_ctx);
6117                 return -1;
6118         }
6119
6120         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
6121                 ctdb_sock_addr addr;
6122                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
6123                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
6124                         talloc_free(mem_ctx);
6125                         return -1;
6126                 }
6127                 if (options.machinereadable){
6128                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
6129                 } else {
6130                         printf("%s\n", pnn_node->addr);
6131                 }
6132         }
6133         talloc_free(mem_ctx);
6134
6135         return 0;
6136 }
6137
6138 /*
6139   reload the nodes file on the local node
6140  */
6141 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6142 {
6143         int i, ret;
6144         int mypnn;
6145         struct ctdb_node_map *nodemap=NULL;
6146
6147         assert_single_node_only();
6148
6149         mypnn = ctdb_get_pnn(ctdb);
6150
6151         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6152         if (ret != 0) {
6153                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6154                 return ret;
6155         }
6156
6157         /* reload the nodes file on all remote nodes */
6158         for (i=0;i<nodemap->num;i++) {
6159                 if (nodemap->nodes[i].pnn == mypnn) {
6160                         continue;
6161                 }
6162                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
6163                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
6164                         nodemap->nodes[i].pnn);
6165                 if (ret != 0) {
6166                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
6167                 }
6168         }
6169
6170         /* reload the nodes file on the local node */
6171         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
6172         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
6173         if (ret != 0) {
6174                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
6175         }
6176
6177         /* initiate a recovery */
6178         control_recover(ctdb, argc, argv);
6179
6180         return 0;
6181 }
6182
6183
6184 static const struct {
6185         const char *name;
6186         int (*fn)(struct ctdb_context *, int, const char **);
6187         bool auto_all;
6188         bool without_daemon; /* can be run without daemon running ? */
6189         const char *msg;
6190         const char *args;
6191 } ctdb_commands[] = {
6192         { "version",         control_version,           true,   true,   "show version of ctdb" },
6193         { "status",          control_status,            true,   false,  "show node status" },
6194         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6195         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6196         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6197         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6198         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6199         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6200         { "statistics",      control_statistics,        false,  false, "show statistics" },
6201         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6202         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6203         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6204         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6205         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6206         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6207         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6208         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6209         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6210         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6211         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6212         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6213         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6214         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6215         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6216         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6217         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6218         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6219         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6220         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6221         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
6222         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
6223         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6224         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6225         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6226         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6227         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6228         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6229         { "stop",            control_stop,              true,   false,  "stop a node" },
6230         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6231         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6232         { "unban",           control_unban,             true,   false,  "unban a node" },
6233         { "showban",         control_showban,           true,   false,  "show ban information"},
6234         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6235         { "recover",         control_recover,           true,   false,  "force recovery" },
6236         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6237         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6238         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6239         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6240         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6241         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6242         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6243         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6244         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6245
6246         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6247
6248         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6249         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6250         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6251         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6252         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6253         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6254         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6255         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6256         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6257         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6258         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6259         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6260         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6261         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6262         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6263         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6264         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6265         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6266         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6267         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6268         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6269         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6270         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6271         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6272         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6273         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6274         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6275         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6276         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6277         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6278         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6279         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6280         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6281         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6282         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6283         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6284         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6285         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6286         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6287         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6288         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
6289         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
6290         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6291         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6292         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6293         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6294         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6295         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6296         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6297 };
6298
6299 /*
6300   show usage message
6301  */
6302 static void usage(void)
6303 {
6304         int i;
6305         printf(
6306 "Usage: ctdb [options] <control>\n" \
6307 "Options:\n" \
6308 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6309 "   -Y                 generate machinereadable output\n"
6310 "   -v                 generate verbose output\n"
6311 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6312         printf("Controls:\n");
6313         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6314                 printf("  %-15s %-27s  %s\n", 
6315                        ctdb_commands[i].name, 
6316                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6317                        ctdb_commands[i].msg);
6318         }
6319         exit(1);
6320 }
6321
6322
6323 static void ctdb_alarm(int sig)
6324 {
6325         printf("Maximum runtime exceeded - exiting\n");
6326         _exit(ERR_TIMEOUT);
6327 }
6328
6329 /*
6330   main program
6331 */
6332 int main(int argc, const char *argv[])
6333 {
6334         struct ctdb_context *ctdb;
6335         char *nodestring = NULL;
6336         struct poptOption popt_options[] = {
6337                 POPT_AUTOHELP
6338                 POPT_CTDB_CMDLINE
6339                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6340                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6341                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
6342                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6343                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6344                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6345                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6346                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6347                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6348                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6349                 POPT_TABLEEND
6350         };
6351         int opt;
6352         const char **extra_argv;
6353         int extra_argc = 0;
6354         int ret=-1, i;
6355         poptContext pc;
6356         struct event_context *ev;
6357         const char *control;
6358
6359         setlinebuf(stdout);
6360         
6361         /* set some defaults */
6362         options.maxruntime = 0;
6363         options.timelimit = 10;
6364         options.pnn = CTDB_CURRENT_NODE;
6365
6366         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6367
6368         while ((opt = poptGetNextOpt(pc)) != -1) {
6369                 switch (opt) {
6370                 default:
6371                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6372                                 poptBadOption(pc, 0), poptStrerror(opt)));
6373                         exit(1);
6374                 }
6375         }
6376
6377         /* setup the remaining options for the main program to use */
6378         extra_argv = poptGetArgs(pc);
6379         if (extra_argv) {
6380                 extra_argv++;
6381                 while (extra_argv[extra_argc]) extra_argc++;
6382         }
6383
6384         if (extra_argc < 1) {
6385                 usage();
6386         }
6387
6388         if (options.maxruntime == 0) {
6389                 const char *ctdb_timeout;
6390                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6391                 if (ctdb_timeout != NULL) {
6392                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6393                 } else {
6394                         /* default timeout is 120 seconds */
6395                         options.maxruntime = 120;
6396                 }
6397         }
6398
6399         signal(SIGALRM, ctdb_alarm);
6400         alarm(options.maxruntime);
6401
6402         control = extra_argv[0];
6403
6404         /* Default value for CTDB_BASE - don't override */
6405         setenv("CTDB_BASE", ETCDIR "/ctdb", 0);
6406
6407         ev = event_context_init(NULL);
6408         if (!ev) {
6409                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6410                 exit(1);
6411         }
6412
6413         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6414                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6415                         break;
6416                 }
6417         }
6418
6419         if (i == ARRAY_SIZE(ctdb_commands)) {
6420                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6421                 exit(1);
6422         }
6423
6424         if (ctdb_commands[i].without_daemon == true) {
6425                 if (nodestring != NULL) {
6426                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6427                         exit(1);
6428                 }
6429                 close(2);
6430                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6431         }
6432
6433         /* initialise ctdb */
6434         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6435
6436         if (ctdb == NULL) {
6437                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6438                 exit(1);
6439         }
6440
6441         /* setup the node number(s) to contact */
6442         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6443                               &options.nodes, &options.pnn)) {
6444                 usage();
6445         }
6446
6447         if (options.pnn == CTDB_CURRENT_NODE) {
6448                 options.pnn = options.nodes[0];
6449         }
6450
6451         if (ctdb_commands[i].auto_all && 
6452             ((options.pnn == CTDB_BROADCAST_ALL) ||
6453              (options.pnn == CTDB_MULTICAST))) {
6454                 int j;
6455
6456                 ret = 0;
6457                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6458                         options.pnn = options.nodes[j];
6459                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6460                 }
6461         } else {
6462                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6463         }
6464
6465         talloc_free(ctdb);
6466         talloc_free(ev);
6467         (void)poptFreeContext(pc);
6468
6469         return ret;
6470
6471 }